# DRAFT- IBM CPD DV Connection - DRAFT


### Where to find this sample online
You can find a copy of this notebook at https://github.com/Db2-DTE-POC/db2dmc.

### First we will import a few helper classes
We need to pull in a few standard Python libraries so that we can work with REST, JSON and a library called Pandas. Pandas lets us work with DataFrames, which are a very powerful way to work with tabular data in Python. 

In [1]:
# Import the class libraries 
import requests
import ssl
import json
from pprint import pprint
from requests import Response
import pandas as pd
import time
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
from IPython.display import IFrame
from IPython.display import display, HTML
from pandas.io.json import json_normalize
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt

### The Db2 Class
Next we will create a Db2 helper class that will encapsulate the Rest API calls that we can use to directly access the Db2 Data Management Console service without having to use the user interface. 

To access the service we need to first authenticate with the service and create a reusable token that we can use for each call to the service. This ensures that we don't have to provide a userID and password each time we run a command. The token makes sure this is secure. 

Each request is constructed of several parts. First, the URL and the API identify how to connect to the service. Second the REST service request that identifies the request and the options. For example '/metrics/applications/connections/current/list'. And finally some complex requests also include a JSON payload. For example running SQL includes a JSON object that identifies the script, statement delimiters, the maximum number of rows in the results set as well as what do if a statement fails.

The full set of APIs are documents as part of the Db2 Data Management Console user interface. In this hands on lab you can connect to that directly through this link: [Db2 Data Management Console RESTful APIs](http://localhost:11080/dbapi/api/index_enterprise.html). 

In [2]:
# Run the Db2 Class library
# Used to construct and reuse an Autentication Key
# Used to construct RESTAPI URLs and JSON payloads
class Db2():
    
    def __init__(self, url, verify = False, proxies=None, ):
        self.url = url
        self.proxies = proxies
        self.verify = verify

    def authenticate(self, api, userid, password):
        
        credentials = {'username':userid, 'password':password}
        r = requests.post(self.url+api+'/preauth/signin', verify=self.verify, json=credentials, proxies=self.proxies)
        if (r.status_code == 200):
            bearerToken = "Bearer " + r.cookies["ibm-private-cloud-session"]
            print(bearerToken)
            self.headers = {'Content-Type':"application/json", 'Accept':"application/json", 'Authorization': bearerToken, 'Cache-Control': "no-cache"}
        else:
            print ('Unable to authenticate, no bearer token obtained')
        
    def printResponse(self, r, code):
        if (r.status_code == code):
            pprint(r.json())
        else:
            print (r.status_code)
            print (r.content)
    
    def getRequest(self, api, json=None):
        return requests.get(self.url+api, verify = self.verify, headers=self.headers, proxies = self.proxies, json=json)

    def postRequest(self, api, json=None):
        return requests.post(self.url+api, verify = self.verify, headers=self.headers, proxies = self.proxies, json=json) 
    
    def deleteRequest(self, api, json=None):
        return requests.delete(self.url+api, verify = self.verify, headers=self.headers, proxies = self.proxies, json=json) 
        
    def getStatusCode(self, response):
        return (response.status_code)

    def getJSON(self, response):
        return (response.json())
    
    def getDataSources(self):
        return self.getRequest('/icp4data-databases/dv/icp4d-test/dvapiserver/v1/dv/datasource_nodes')
    
    def getSchemas(self):
        return self.getRequest('/icp4data-databases/dv/icp4d-test/dbapi/v4/schemas')
    
    def runSQL(self, script, limit=10, separator=';', stopOnError=False):
        sqlJob = {'commands': script, 'limit':limit, 'separator':separator, 'stop_on_error':str(stopOnError)}
        return self.postRequest('/icp4data-databases/dv/icp4d-test/dbapi/v4/sql_jobs',sqlJob)
        
    def getSQLJobResult(self, jobid):
        return self.getRequest('/icp4data-databases/dv/icp4d-test/dbapi/v4/sql_jobs/'+jobid)
       
    def getSearchViewList(self, searchtext, show_systems="false"):
        return self.getRequest('/icp4data-databases/dv/icp4d-test/dbapi/v4/admin/schemas/obj_type/view?search_name='+searchtext+'&show_systems='+str(show_systems)+'&rows_return=200');
    
    def getSearchTableList(self, searchtext):
        return self.getRequest('/icp4data-databases/dv/icp4d-test/dbapi/v4/admin/schemas/obj_type/table?search_name='+searchtext+'&show_systems=true&rows_return=100');
               
    def postSearchObjects(self, obj_type, search_text, rows_return=100, show_systems='false', is_ascend='true'):     
        json = {"search_name":search_text,"rows_return":rows_return,"show_systems":show_systems,"obj_type":obj_type,"filters_match":"ALL","filters":[]}       
        return self.postRequest('/icp4data-databases/dv/icp4d-test/dbapi/v4/admin/'+str(obj_type)+'s',json);
            
    def getTablesInSchema(self, schema):
        return self.getRequest('/icp4data-databases/dv/icp4d-test/dbapi/v4/schemas/'+str(schema)+'/tables'); 
    
    def getVirtualizedTables(self):
        return self.getRequest('/icp4data-databases/dv/icp4d-test/dvapiserver/v1/dv/mydata/tables')

    def getVirtualizedViews(self):
        return self.getRequest('/icp4data-databases/dv/icp4d-test/dvapiserver/v1/dv/mydata/views')
    
    def grantPrivledgeToRole(self, objectName, objectSchema, roleToGrant):
        json =   {"objectName":objectName,"objectSchema":objectSchema,"roleToGrant":roleToGrant}
        return self.postRequest('/icp4data-databases/dv/icp4d-test/dvapiserver/v1/privileges/roles',json);
 
    def getRole(self, role):
        return self.getRequest('/icp4data-databases/dv/icp4d-test/dvapiserver/v1/privileges/objects/role/'+str(role));
    
    def foldData(self, sourceName, sourceTableDef, sources ):
        json = {"sourceName":sourceName,"sourceTableDef":sourceTableDef,"sources":sources}
        return self.postRequest('/icp4data-databases/dv/icp4d-test/dvapiserver/v1/dv/virtualize/tables', json);

    def addUser(self, username, displayName, email, user_roles, password):
        json = {"username":username,"displayName":displayName,"email":email,"user_roles":user_roles,"password":password}
        return self.postRequest('/api/v1/usermgmt/v1/user', json);
    
    def dropUser(self, username):
        return self.deleteRequest('/api/v1/usermgmt/v1/users/'+str(username));
   
    def getUsers(self):
        return self.getRequest('/api/v1/usermgmt/v1/usermgmt/users');
    
    def getUsersDF(self):
        r = self.getUsers()
        if (databaseAPI.getStatusCode(r)==200):
            json = databaseAPI.getJSON(r)
            df = pd.DataFrame(json_normalize(json))
            return df
        else:
            print(databaseAPI.getStatusCode(r));
    
    def addUserToDV(self, display_name, role, usersDF):
        userrow = (usersDF.loc[usersDF['displayName'] == display_name])
        uid = userrow['uid'].values[0]
        username = userrow['username'].values[0]
        
        json = {"users":[{"uid":uid,"username":username,"display_name":display_name,"role":role}],"serviceInstanceID":"1573915078292"}
        return self.postRequest('/zen-data/v2/serviceInstance/users', json);
    
    def dropUserFromDV(self, display_name, usersDF):
        userrow = (usersDF.loc[usersDF['displayName'] == display_name])
        uid = userrow['uid'].values[0]
        
        json = {"users":[uid],"serviceInstanceID":"1573915078292"}
        return self.deleteRequest('/zen-data/v2/serviceInstance/users', json);
    


In [3]:
def runSQL(sqlText):

    # Run the SQL Script and return the runID for later reference 
    runID = databaseAPI.getJSON(databaseAPI.runSQL(sqlText))['id'] 

    # See if there are any results yet for this job
    json = databaseAPI.getJSON(databaseAPI.getSQLJobResult(runID))

    # If the REST call returns an error return the json with the error to the calling routine
    if 'errors' in json :
        return json
    # Append the results from each statement in the script to the overall combined JSON result set
    fulljson = json

    while json['results'] != [] or (json['status'] != "completed" and json['status'] != "failed") :
        json = databaseAPI.getJSON(databaseAPI.getSQLJobResult(runID))

        # Get the results from each statement as they return and append the results to the full JSON 
        for results in json['results'] :
            fulljson['results'].append(results)
        # Wait 250 ms for more results
        time.sleep(0.25) 
    return fulljson

print('runSQL routine defined')

runSQL routine defined


In [4]:
def displayResults(json):

    for results in json['results']:
        print('Statement: '+str(results['index'])+': '+results['command'])
        print('Runtime ms: '+str(results['runtime_seconds']*1000))
        if 'error' in results : 
            print(results['error'])
        elif 'rows' in results :
            df = pd.DataFrame(results['rows'],columns=results['columns'])
            print(df)
        else :
            print('No errors. Row Affected: '+str(results['rows_affected']))
        print()
print('displayResults routine defined')

displayResults routine defined


## Establishing a Connection to the Console

### Example Connections
To connect to the Db2 Data Management Console service you need to provide the URL, the service name (v1) and profile the console user name and password. For this lab we are assuming that the following values are used for the connection:
* Userid: admin
* Password: password

In [5]:
# Connect to the Db2 Data Management Console service
Console  = 'https://services-uscentral.skytap.com:9152'
user     = 'admin'
password = 'password'

# Set up the required connection
databaseAPI = Db2(Console)
api = '/v1'
databaseAPI.authenticate(api, user, password)
database = Console

Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFkbWluIiwic3ViIjoiYWRtaW4iLCJpc3MiOiJLTk9YU1NPIiwiYXVkIjoiRFNYIiwicm9sZSI6IkFkbWluIiwicGVybWlzc2lvbnMiOlsiYWRtaW5pc3RyYXRvciIsImNhbl9wcm92aXNpb24iLCJtYW5hZ2VfY2F0YWxvZyIsInZpcnR1YWxpemVfdHJhbnNmb3JtIiwibWFuYWdlX2luZm9ybWF0aW9uX2Fzc2V0cyIsIm1hbmFnZV9xdWFsaXR5IiwibWFuYWdlX2Rpc2NvdmVyeSIsIm1hbmFnZV9tZXRhZGF0YV9pbXBvcnQiLCJtYW5hZ2VfY2F0ZWdvcmllcyIsImF1dGhvcl9nb3Zlcm5hbmNlX2FydGlmYWN0cyIsIm1hbmFnZV9nb3Zlcm5hbmNlX3dvcmtmbG93Il0sInVpZCI6IjEwMDAzMzA5OTkiLCJhdXRoZW50aWNhdG9yIjoiZGVmYXVsdCIsImRpc3BsYXlfbmFtZSI6ImFkbWluIiwiaWF0IjoxNTg0NzA3NzgzLCJleHAiOjE1ODQ3NTA5ODN9.hPkKjA-Fc7iB9CZ92n76xzM7SJmu_oFChcrObWOJ8-TTLie8fHmxleKcRc8d1amCa63TLPB_xnyxnAPzXKg0csrbwSfUexAQ-kT0Up5CYA-C7VZhFh7tOcn3XSl79nW5E2M6onlB_sI7W49_SNtjYVweHnvXq-b0JtZupX-KsR7ZUh4a-Bv8ulDM0ncotCb9LqIGBm96xZ2ZD5zte060VKCpCysfSTU1fhNZbK8RNwejX1dRHt2DnDPQkEPaz14Gsk7Lt-LE2vFebdy8sDyI4_rSuw8k8h-tXgog-U1QJP5L3oXnmeK3MX7FzJOkbpG5Cth5ZKTQF2WvUOBqSPLAaw


In [6]:
r = databaseAPI.getDataSources()
if (databaseAPI.getStatusCode(r)==200):
    json = databaseAPI.getJSON(r)
    df = pd.DataFrame(json_normalize(json))
    print(', '.join(list(df)))
    display(df)
else:
    print(databaseAPI.getStatusCode(r))  

agent_class, dataSources, dscount, hostname, is_docker, node_description, node_name, os_user, port


Unnamed: 0,agent_class,dataSources,dscount,hostname,is_docker,node_description,node_name,os_user,port
0,H,,0,dv-0,N,Not specified,AdminNode,bigsql,6414
1,H,"[{'cid': 'DVM10060', 'dbname': 'SQL92', 'srcho...",2,dv-0,Y,Not specified,qpendpoint_3:6417,bigsql,6417
2,H,"[{'cid': 'INFOR10146', 'dbname': 'STOCKS', 'sr...",1,dv-0,Y,Not specified,qpendpoint_1:6415,bigsql,6415
3,H,"[{'cid': 'DB210200', 'dbname': 'azdb', 'srchos...",2,dv-0,Y,Not specified,qpendpoint_2:6416,bigsql,6416
4,H,"[{'cid': 'MONGO10213', 'dbname': 'mongo_onprem...",2,dv-0,Y,Not specified,qpendpoint_5:6419,bigsql,6419
5,H,"[{'cid': 'DB210113', 'dbname': 'BLUDB', 'srcho...",1,dv-0,Y,Not specified,qpendpoint_4:6418,bigsql,6418


In [None]:
roles = ['DV_ENGINEER','DV_STEWARD','DV_USER']
for role in roles:
    r = databaseAPI.getRole(role)
    if (databaseAPI.getStatusCode(r)==200):
        json = databaseAPI.getJSON(r)
        df = pd.DataFrame(json_normalize(json['objects']))
        print(', '.join(list(df)))
        display(df)
    else:
        print(databaseAPI.getStatusCode(r))  

In [None]:
r = databaseAPI.getUsers()
if (databaseAPI.getStatusCode(r)==200):
    json = databaseAPI.getJSON(r)
    df = pd.DataFrame(json_normalize(json))
    print(', '.join(list(df)))
    display(df[['uid','username','displayName']])
else:
    print(databaseAPI.getStatusCode(r))

In [11]:
# Add x Users and Engineers to the DV Service
ids = 10
userList = {'UserRoot':['LABUSER','LABDATAENGINEER'],'Role':['User','Engineer']}
userListDF = pd.DataFrame(userList) 

df = databaseAPI.getUsersDF() # Get existing list of users to get the uid

for x in range(0, ids):
    for row in range(0, len(userListDF)):
        display_name = userListDF['UserRoot'].iloc[row]+str(x)
        role = userListDF['Role'].iloc[row]
        
        r = databaseAPI.addUserToDV(display_name, role, df)
        if (databaseAPI.getStatusCode(r)==200):
            print('User: '+display_name+' added to Data Virtualization Service')
        else:
            print(databaseAPI.getStatusCode(r))

User: LABUSER0 added to Data Virtualization Service
User: LABDATAENGINEER0 added to Data Virtualization Service
User: LABUSER1 added to Data Virtualization Service
User: LABDATAENGINEER1 added to Data Virtualization Service


In [11]:
# Add x Users and Engineers to the DV Service
ids = 10
userList = {'UserRoot':['LABUSER','LABDATAENGINEER'],'Role':['User','Engineer']}
userListDF = pd.DataFrame(userList) 

df = databaseAPI.getUsersDF() # Get existing list of users to get the uid

for x in range(0, ids):
    for row in range(0, len(userListDF)):
        display_name = userListDF['UserRoot'].iloc[row]+str(x)
        role = userListDF['Role'].iloc[row]
        
        r = databaseAPI.addUserToDV(display_name, role, df)
        if (databaseAPI.getStatusCode(r)==200):
            print('User: '+display_name+' added to Data Virtualization Service')
        else:
            print(databaseAPI.getStatusCode(r))

User: LABUSER0 added to Data Virtualization Service
User: LABDATAENGINEER0 added to Data Virtualization Service
User: LABUSER1 added to Data Virtualization Service
User: LABDATAENGINEER1 added to Data Virtualization Service


In [10]:
display_name = 'LABDATAENGINEER2'
df = databaseAPI.getUsersDF()

r = databaseAPI.dropUserFromDV(display_name, df)
if (databaseAPI.getStatusCode(r)==200):
    print('User: '+display_name+' removed from Data Virtualization Service')
else:
    print(databaseAPI.getStatusCode(r))

User: LABDATAENGINEER2 removed from Data Virtualization Service


In [None]:
r = databaseAPI.getVirtualizedTables()
if (databaseAPI.getStatusCode(r)==200):
    json = databaseAPI.getJSON(r)
    df = pd.DataFrame(json_normalize(json['tables']))
    print(', '.join(list(df)))
    display(df)
else:
    print(databaseAPI.getStatusCode(r))

In [None]:
objectName = 'VOLUME'
objectSchema = 'TRADING'
roleToGrant = 'DV_ENGINEER'
# roleToGrant = 'DV_STEWARD'
# roleToGrant = 'DV_WORKER'
r = databaseAPI.grantPrivledgeToRole(objectName, objectSchema, roleToGrant)
if (databaseAPI.getStatusCode(r)==200):
    print('Access granted')
else:
    print(databaseAPI.getStatusCode(r))

In [None]:
# Add User to CPD
username = "LABUSER1"
displayName = "LABUSER1"
email = "kohlmann@ca.ibm.com"
user_roles = ["Data Scientist"]
password = 'password'
r = databaseAPI.addUser(username, displayName, email, user_roles, password)
if (databaseAPI.getStatusCode(r)==201):
    print('User Added')
else:
    print(databaseAPI.getStatusCode(r))

In [None]:
# Add x Data Scientists, LABUSERS

ids = 10
userList = {'UserRoot':['LABUSER','LABDATAENGINEER'],'Role':[['Data Scientist','Developer'],['Data Engineer']]}
userListDF = pd.DataFrame(userList) 
email = 'kohlmann@ca.ibm.com'
password = 'password'

for x in range(0, ids):
    for row in range(0, len(userListDF)):
        username = userListDF['UserRoot'].iloc[row]+str(x)
        user_role = userListDF['Role'].iloc[row]
        displayName = username
        r = databaseAPI.addUser(username, displayName, email, user_role, password)
        if (databaseAPI.getStatusCode(r)==201):
            print('User: '+username+' Added as a '+str(user_role))
        else:
            print(databaseAPI.getStatusCode(r))

In [None]:
# Drop 10 LAB USERS to CPD
usernames = ["LABUSER1","LABUSER2","LABUSER3","LABUSER4","LABUSER5","LABUSER6","LABUSER7","LABUSER8","LABUSER9","LABUSER10"]
for username in usernames:
    r = databaseAPI.dropUser(username)
    if (databaseAPI.getStatusCode(r)==200):
        print('User: '+username+' Dropped')
    else:
        print(databaseAPI.getStatusCode(r))

In [None]:
sourceName = "STOCK_TRANSACTIONS"
sources = ["DB210113:AWS","DB210113:DVDEMO"]
sourceTableDef[{"columnName":"QUANTITY","columnType":"INTEGER"},{"columnName":"CUSTID","columnType":"INTEGER"},{"columnName":"TX_NO","columnType":"INTEGER"},{"columnName":"SYMBOL","columnType":"VARCHAR(255)"},{"columnName":"_ID","columnType":"VARCHAR(24)"},{"columnName":"TX_DATE","columnType":"DATE"},{"columnName":"PRICE","columnType":"DECIMAL(12,2)"}],"virtualSchema":"FOLDING","virtualName":"STOCK_TRANSACTIONS_TEST","virtualTableDef":[{"columnName":"QUANTITY","columnType":"INTEGER"},{"columnName":"CUSTID","columnType":"INTEGER"},{"columnName":"TX_NO","columnType":"INTEGER"},{"columnName":"SYMBOL","columnType":"VARCHAR(255)"},{"columnName":"_ID","columnType":"VARCHAR(24)"},{"columnName":"TX_DATE","columnType":"DATE"},{"columnName":"PRICE","columnType":"DECIMAL(12,2)"}]

# r = databaseAPI.foldData(sourceName, sourceTableDef, sources)
if (databaseAPI.getStatusCode(r)==200):
    print('Access granted')
else:
    print(databaseAPI.getStatusCode(r))

In [None]:
r = databaseAPI.getVirtualizedViews()
if (databaseAPI.getStatusCode(r)==200):
    json = databaseAPI.getJSON(r)
    df = pd.DataFrame(json_normalize(json['views']))
    print(', '.join(list(df)))
    display(df)
else:
    print(databaseAPI.getStatusCode(r))

In [None]:
sqlText = \
'''
WITH MAX_VOLUME(AMOUNT) AS (
  SELECT MAX(VOLUME) FROM FOLDING.STOCK_HISTORY
    WHERE SYMBOL = 'DJIA'
),
HIGHDATE(TX_DATE) AS (
  SELECT TX_DATE FROM FOLDING.STOCK_HISTORY, MAX_VOLUME M
    WHERE SYMBOL = 'DJIA' AND VOLUME = M.AMOUNT
),
CUSTOMERS_IN_OHIO(CUSTID) AS (
  SELECT C.CUSTID FROM DVDEMO.CUSTOMERS C 
    WHERE C.STATE = 'OH'
),
TOTAL_BUY(CUSTID,TOTAL) AS (
  SELECT C.CUSTID, SUM(SH.QUANTITY * SH.PRICE) 
    FROM CUSTOMERS_IN_OHIO C, FOLDING.STOCK_TRANSACTIONS SH, HIGHDATE HD
  WHERE SH.CUSTID = C.CUSTID AND
        SH.TX_DATE = HD.TX_DATE AND 
        QUANTITY > 0 
  GROUP BY C.CUSTID
)
SELECT C.LASTNAME, T.TOTAL 
  FROM DVDEMO.CUSTOMERS C, TOTAL_BUY T
WHERE C.CUSTID = T.CUSTID
  ORDER BY TOTAL DESC
FETCH FIRST 5 ROWS ONLY;
'''

displayResults(runSQL(sqlText))

In [None]:
repeat = 1
sqlText = 'SELECT * FROM TRADING.OHIO'

for x in range(0, repeat):
    print('Repetition number: '+str(x))
    runSQL(sqlText)
print('done')

## Object Exploration

### List the Available Schemas in the Database

You can get the list of schemas through a REST service call. In this example the service call text was defined in the Db2 class at the start of the notebook. By default it includes both user and catalog schemas. 

If the call is successful it will return a 200 status code. The API call returns a JSON structure that we turn into a DataFrame using the normalize function. You can then list the columns of data available in the Data Frame and display the first 10 rows in the data frame. 

Many of the examples below list the columns available in the dataframe to make it easier for you to adapt the examples to your own needs. 

In [None]:
r = databaseAPI.getSchemas()
if (databaseAPI.getStatusCode(r)==200):
    json = databaseAPI.getJSON(r)
    df = pd.DataFrame(json_normalize(json['resources']))
    print(', '.join(list(df)))
    display(df[['name']].head(10))
else:
    print(databaseAPI.getStatusCode(r))   

In [None]:
# Search for tables across all schemas that match simple search critera 
# Display the first 100
# Switch between searching tables or views
object = 'view'
# object = 'table'
r = databaseAPI.postSearchObjects(object,"TRADING",10,'false','false')
if (databaseAPI.getStatusCode(r)==200):
    json = databaseAPI.getJSON(r)
    df = pd.DataFrame(json_normalize(json))
    print('Columns:')
    print(', '.join(list(df)))
    display(df[[object+'_name']].head(100))
else:
    print("RC: "+str(databaseAPI.getStatusCode(r)))

In [None]:
sqlText = \
'''
SELECT * FROM TRADING.TOPFIVE;
'''

displayResults(runSQL(sqlText))

### Next Steps
Try the [Analysing SQL Workloads](http://localhost:8888/notebooks/Db2_Data_Management_Console_SQL.ipynb). It contains extensive examples on how to run workloads that contain multiple SQL Statements across multiple databases and then measure their performance. 

Also try building some of your own reports based on the examples in this hands on lab. There are additional functions included in the Db2 class that we haven't explored yet in this lab. You can also include the Db2 class into your own notebook by including the [dmc_setup notebook](http://localhost:8888/notebooks/dmc_setup.ipynb)

#### Credits: IBM 2019, Peter Kohlmann [kohlmann@ca.ibm.com]