# Import and Map Business Terms to Data Headers


Before executing this notebook on IBM Cloud , you need to:<br>
1) When you import this project on an IBM Cloud environment, a project access token should be inserted at the top of this notebook as a code cell. <br>
If you do not see the cell above, Insert a project token: Click on **More -> Insert project token** in the top-right menu section and run the cell <br>

![ws-project.mov](https://media.giphy.com/media/jSVxX2spqwWF9unYrs/giphy.gif)
2) Provide your IBM Cloud API key in the subsequent cell<br>
3) You can then step through the notebook execution cell by cell, by selecting Shift-Enter. Or you can execute the entire notebook by selecting **Cell -> Run All** from the menu.<br>




#### Insert IBM Cloud API key
Your Cloud API key can be generated by going to the <a href="https://cloud.ibm.com/iam/apikeys" target="_blank" rel="noopener noreferrer">API Keys section of the Cloud console</a>. From that page, scroll down to the API Keys section, and click Create an IBM Cloud API key. Give your key a name and click Create, then copy the created key and paste it below. 

Cloud API key will be used to authenticate Watson Knowledge Catalog services.

In [1]:
ibmcloud_api_key=''

## Introduction
This notebook imports a business glossary for the industry accelerator into Watson Knowledge Catalog (WKC) for use in governing data assets. It also connects the data assets used in the accelerator to the WKC business terms.

In the first part of the notebook category and business terms are imported and then business terms are published into Watson Knowledge Catalog. The category and business terms csvs files are included with the project.

In the second part of the notebook we programmatically publish a dataset into a catalog and map business terms to the dataset column headers. The business terms and their mappings are specified in a csv file included with the project. The user must first ensure that the catalog exists and the imported business terms have been published.

The user can also assign business terms to column headers manually or by using the Data Discovery capability within Cloud Pak for Data. 

This notebook is optional. The analytics project runs as expected even if this notebook is not used. 

**Note that as only Admin users can import terms, this notebook should be run by an Admin user only.** User must have <a href="https://dataplatform.cloud.ibm.com/docs/content/wsj/catalog/roles-wkcop.html" target="_blank" rel="noopener noreferrer">the permission to create governance artifacts.



The user should create a catalog and enter the following before running the rest of the notebook: 

 **catalog_name :** Name of the catalog that we would like to publish the csv to. This catalog is created based on the instructions above or an existing catalog.

In [3]:
catalog_name = 'Ind_Acc'

In [3]:
# imports for the rest APIs interactions with WKC
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import json
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
from pandas.io.json import json_normalize
import pandas as pd
import os
import time 

s=requests.session()

## User Inputs
1. **wkciamURL**: The url used to authenticate the ibm cloud api key.
2. **wkcURL**: The base url used to call the apis of data platform.
2. **categories_csv**: Name of the csv file containing the categories.
3. **terms_csv**: Name of the csv file containing the business terms and their definitions.

The user does not need to change the code cell below, unless they run this notebook on a different environment or changed the name of the csv file with categories and business terms.


In [4]:
# for dev, use https://iam.test.cloud.ibm.com/
# for prod, use https://iam.cloud.ibm.com/
wkciamURL="https://iam.cloud.ibm.com/"
wkcURLauth = wkciamURL+"identity/token"

if os.environ['RUNTIME_ENV_REGION']=='us-south':
    region=""
else:
    region=os.environ['RUNTIME_ENV_REGION']+"."

# for dev, use "https://api."+region+"dataplatform.dev.cloud.ibm.com/" 
# for prod use "https://api."+region+"dataplatform.cloud.ibm.com/"
wkcURL = "https://api."+region+"dataplatform.cloud.ibm.com/"


categories_csv="utilities-payment-risk-prediction-glossary-categories.csv"
terms_csv="utilities-payment-risk-prediction-glossary-terms.csv"




We also create additional variables. The user does not need to change the code cell below, unless they change the business terms category name or the name of the csv file with mappings.

1. **category_name :** Name of the business term category corresponding to the project.
2. **terms_file :** Name of the csv file containing the list of mappings between column headers and business terms.
3. **data_asset_file_to_publish :** Name of the csv files that will be published into the catalog and for which we map business terms.

In [5]:
category_name = "Utilities Payment Risk Prediction"
terms_file = "utilities-payment-risk-prediction-map-terms.csv" 
data_asset_file_to_publish = 'Bill Payment View.csv'

### Authentication

IBM Cloud API key is used to authenticate and generate the bearer token in the below cell.

In [6]:
# Headers

headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json'}

# Payload with ibm cloud api key
payload={
    'apikey': ibmcloud_api_key,
    'grant_type': 'urn:ibm:params:oauth:grant-type:apikey'
}
try:
    authresponse = s.post(wkcURLauth, headers=headers,data=payload,verify=False)
    if authresponse.status_code in (200,202):
        print("Authentication Successful")
        accessToken=json.loads(authresponse.text)['access_token']
    else:
        print("Authentication unsuccessful, check your inputs")
        

except:
    print("The below error has occurred. Please ensure that api key entered is correct and user has access to watson knowledge catalog")
    raise

Authentication Successful


### Check WKC Plan

The below cell checks users WKC plan using users account id.

In [7]:
try:
    headers = {
        'Content-Type': 'application/json',
        'IAM-Apikey': ibmcloud_api_key,
        'Authorization': 'Bearer '+accessToken
    }

    response = s.get(wkciamURL+'v1/apikeys/details', headers=headers)

    account_id=json.loads(response.text)['account_id']

    headers = {
        'Authorization': 'Bearer '+accessToken,
    }

    response = s.get(wkcURL+'v2/entitlements?bss_account_id='+account_id, headers=headers)
    wkc_plan=json.loads(response.text)['entitlements']['data_catalog']['plan_name']
    print("Watson Knowldge Catalog Plan:",wkc_plan)
    if(wkc_plan=="Lite"):
        print("Lite plan detected - will attempt to publish 5 business terms only.")
    elif(wkc_plan=="Standard"):
        print("Standard plan detected - will attempt to publish 150 business terms.")
except:
    print("The below error has occurred. Please ensure that api key entered is correct and user has access to watson knowledge catalog")
    raise

Watson Knowldge Catalog Plan: Standard
Standard plan detected - will attempt to publish 150 business terms.


## 1. Import the Categories and Business terms 
If the user is successfully authenticated, then we complete the following steps 
1. Read the business categories file and import the category `Utilities Payment Risk Prediction` under `Industry Accelerators`
2. Read the business terms file and import the business terms with definitions and related/part of terms for `Utilities Payment Risk Prediction`.
3. The imported business terms will be saved as a draft, publish the business terms programmatically. 

#### Import the Category

Below cell reads catgories csv file and import the category `Utilities Payment Risk Prediction` under `Industry Accelerators`.  

In [13]:
f = open(categories_csv, 'w+b')
f.write(project.get_file(categories_csv).getbuffer())
f.close()

# Create a header
importheaders = {
    'accept': 'application/json',
    'Authorization': 'Bearer '+accessToken,  
    'Content-Type': 'multipart/form-data'
}

# Specify merge option 
params = (
    ('merge_option', 'all'),
)

# remove the index if necessary
time.sleep(10)
pd.read_csv(categories_csv).to_csv(categories_csv,index=False)

# Specify the file name to import
files = {
    'file': (categories_csv, open(categories_csv, 'rb')),
}

try:
    categoryimport_url = wkcURL+'v3/governance_artifact_types/category/import'
    cat_import = s.post(categoryimport_url, headers=importheaders, params=params, files=files)
    if cat_import.status_code in (200,202):
        print("Category import has",json.loads(cat_import.text)['status'])
    else:
        print("Category import is unsuccessful,", json.loads(cat_import.text)['errors'][0]['message'])
except:
    print("The below error has occurred. Please ensure that categories csv file exists")
    raise



Category import has SUCCEEDED


#### Import Business terms

Read the business terms file and import the business terms with definitions and related/part of terms for `Utilities Payment Risk Prediction`. `Lite` plan users can import only 5 business terms into watson knowledge catalog. `Standard` plan users can import upto 150 business terms. If the user already has business terms, delete them before running below cell. 


In [14]:
f = open(terms_csv, 'w+b')
f.write(project.get_file(terms_csv).getbuffer())
if wkc_plan=="Lite":
    

    filter_rows_by_account_type=pd.read_csv(terms_csv)
    map_terms_file = project.get_file(terms_file)
    map_terms_file.seek(0)
    map_terms = pd.read_csv(map_terms_file)
    filter_rows_by_account_type=filter_rows_by_account_type[filter_rows_by_account_type['Name'].isin(map_terms['Business Terms'].tolist())].head()
    filter_rows_by_account_type.to_csv(terms_csv,index=False)
print("Watson Knowldge Catalog Plan:",wkc_plan)
print("Maximum number of business terms allowed to import and publish:",json.loads(response.text)['entitlements']['data_catalog']['properties']['business_glossary']['max_terms'])

f.close()

pd.read_csv(terms_csv).to_csv(terms_csv,index=False)

files = {
    'file': (terms_csv, open(terms_csv, 'rb')),
}

try:
    termsimport_url =wkcURL+'v3/governance_artifact_types/glossary_term/import'
    term_import = s.post(termsimport_url, headers=importheaders, params=params, files=files)
    if term_import.status_code in (200,202):
        wf_json=json.loads(term_import.text)
        print("Terms import has",wf_json['status'])
        print("Number of Business terms imported:",max(json.loads(term_import.text)['operations_count']['glossary_term']['IMPORT_MODIFY'],json.loads(term_import.text)['operations_count']['glossary_term']['IMPORT_CREATE']))
    else:
        print("Import of Business Terms failed.")
        raise Exception(json.loads(term_import.text)['errors'][0]['message'])
except:
    print("The below error has occurred. Please check your WKC plan and Business Terms csvs")
    print(json.loads(term_import.text)['messages']['resources'])
    raise



Watson Knowldge Catalog Plan: Standard
Maximum number of business terms allowed to import and publish: 150
Terms import has SUCCEEDED
Number of Business terms imported: 150


#### Fetch imported terms draft and publish the terms

If the business terms are successfully imported in the previous cell, the imported business terms have been saved in draft status. The below cell publishes the business terms.

In [36]:
headers = {
'Content-Type': "application/json",
'Authorization': "Bearer "+accessToken

}
try:
    wf_url=wkcURL+"v3/workflows/"+wf_json['workflow_id']+"?includeUserTasks=true"
    wf_response=s.get(wf_url,headers=headers)
    if wf_response.status_code in (200,202):
        idtopublish=json.loads(wf_response.text)['entity']['user_tasks'][0]['metadata']['task_id']
except:
    print("The below error has occurred. Please ensure that terms are imported correctly")
    raise    
    
    
## Publish the Business terms
data = {
    "action": "complete",
    "form_properties": [
        {
            "id": "action",
            "value": "#publish"
        }
    ]
}
publish_url=wkcURL+'v3/workflow_user_tasks/'+idtopublish+'/actions'


try:
    publish = s.post(publish_url, headers=headers, json=data, verify=False)

    if publish.status_code in (200,202,204):
        print("The Business terms are published.")
except:
    print("The below error has occurred. Please ensure that terms are imported correctly")
    raise
    


The Business terms are published.


When the Business terms are published, navigate to **Governance -> Categories -> Industry Accelerators** to find the category and business terms pertaining to this accelerator.


## 2. Map Business Terms to Data Headers
In this part of the notebook, we take the published business terms and map them to the dataset column headers
### Create Catalog

The dataset must first be published into a catalog. The catalog must be manually created. Under **Catalogs** in the navigation menu, select **All Catalogs** and select **New Catalog**. Enter the name for the catalog and the description if necessary and create the catalog. If the user has already created the catalog this step can be skipped and the existing catalog name should be specified in the code cell below.


## Map Business Terms to Headers

We complete the following steps to map the business terms to column headers:

1. Check if the Category `Utilities Payment Risk Prediction` exists in the parent category `Industry Accelerators`.
2. Load the business terms from the `Utilities Payment Risk Prediction` subcategory into a dataframe.
3. Publish the specified dataset into the catalog.
4. Assign business terms to the dataset column headers.

### 1. Check for the Category

In [39]:
search_url=wkcURL+"v3/search"
try:
    headers = {
        'Content-Type': "application/json",
        'Authorization': "Bearer "+accessToken,
        'Cache-Control': "no-cache",
        'Connection': "keep-alive"
        }
    
    search_body = {
        "size": 1000,
        "_source": ["artifact_id","metadata.name"],
       "query": {    
               "match": {"metadata.artifact_type": "category"}
       }
    }
    parent_cat = s.post(search_url, verify=False,  json=search_body, headers=headers)
    
    
    
    # Check if Industry accelerator category exists and load its id into a variable `parent_id`
    if parent_cat.status_code == 200:
        category=json.loads(parent_cat.text)
        for i in category['rows']:
            
            if i['metadata']['name']== category_name:
                print("Category ",category_name,"exists")

                exists_category=True
                cat_id=i['artifact_id'] 
                category_id=cat_id[cat_id.index('_')+1:][:cat_id.index('_')]
            
                   
except:
    print("The below error has occurred. " + "Please ensure that category, '" + category_name + "', exists.")
    raise ValueError(parent_cat.text)

Category  Utilities Payment Risk Prediction exists


### 2. Load category Business Terms into Dataframe 

Get all of the terms in the `Utilities Payment Risk Prediction` category and store them in the `df_terms` dataframe.

In [41]:

time.sleep(10)
# Create a payload for the post request, This payload contains information on size of the terms, source, category and subcategory ids
payload={"size":300,"from":0,"_source":["artifact_id","metadata.artifact_type","metadata.name","metadata.description","categories","entity.artifacts"],"query":{"bool":{"filter":{"bool":{"minimum_should_match":1,"should":[{"term":{"categories.primary_category_id":category_id}},{"term":{"categories.secondary_category_ids":category_id}}],"must_not":{"terms":{"metadata.artifact_type":["category"]}}}}}}}
# create a post request with above payload 
wf=s.post(wkcURL+"v3/search",headers=headers,json=payload,verify=False)
# it will return all the terms , load these terms into a dataframe
wf_json=json.loads(wf.text)['rows']
df_terms=pd.json_normalize(wf_json)

df_terms=df_terms[['entity.artifacts.global_id','metadata.name']]

In [42]:
# terms dataframe looks as below
df_terms.head()

Unnamed: 0,entity.artifacts.global_id,metadata.name
0,62d74dc3-fef7-4041-81da-3b0c2cd3f56e_f5b834a0-...,Education Level
1,62d74dc3-fef7-4041-81da-3b0c2cd3f56e_71368f15-...,Customer Status
2,62d74dc3-fef7-4041-81da-3b0c2cd3f56e_8e73bf2d-...,Customer Agreement Payment Cycle
3,62d74dc3-fef7-4041-81da-3b0c2cd3f56e_3f97dec8-...,Closed Customer Agreements
4,62d74dc3-fef7-4041-81da-3b0c2cd3f56e_d2a18eee-...,Communication Processing Time


### 3. Publish Dataset into Catalog

Get the ID of the catalog that was specified in the user inputs at the beginning of this notebook.

In [43]:
## Get catalog that created and its id by providing name of the catalog created, wich should be same as the one entered in the previous cells
catalog_endpoint=wkcURL+"v2/catalogs"
# Create new header for the requests
headers = {
'Content-Type': "application/json",
'Authorization': "Bearer "+accessToken

}

# endpoint to get all the catalogs 
get_catalog=s.get(catalog_endpoint,verify=False, headers=headers)


## Find the catalog created with specific name and store name and id of it into catalog_name and catalog_id respectively
try:
    get_catalog_json=json.loads(get_catalog.text)['catalogs']
except:
    print("The below error has occurred. Please ensure that catalog, '" + catalog_name + "', exists")
    raise
    
catalog_id = ''
for metadata in get_catalog_json:
    if metadata['entity']['name']==catalog_name:
        catalog_id=metadata['metadata']['guid']
        print("catalog_id for",catalog_name, catalog_id)

if catalog_id == '':
    print("The provided catalog name cannot be found. Please ensure that catalog, '" + catalog_name + "', exists")
    raise ValueError("Catalog cannot be found")

catalog_id for Ind_Acc f559304b-2f00-417e-969d-560e07787113


Get the project id. All project assets can be accessed using this project id.

In [44]:
# Enter project_id manually if referring to a different project
project_id=os.environ['PROJECT_ID']

Get all existing csv files in the project folder and store the names of these files. 

In [45]:
# payload 
payload={"query":"*:*","limit":200}
# endpoint to access all the project assets in the project folder 
asset_url=wkcURL+"v2/asset_types/asset/search?project_id="+project_id
get_asset=s.post(asset_url,json=payload,verify=False)

Next we get the asset id of the dataset to be published to the catalog.

In [46]:
# Get asset ids of all csv files to be published in to the catalog and store the asset ids in an array

project_asset_id=[]
# Payload to query all project assets
payload={"query":"*:*","limit":200}

get_asset=s.post(wkcURL+"v2/asset_types/asset/search?project_id="+project_id,json=payload,verify=False, headers=headers)
get_asset_json=json.loads(get_asset.text)
for j in get_asset_json['results']:
    if j['metadata']['name']==data_asset_file_to_publish:
        print("Asset id of",data_asset_file_to_publish,":",j['metadata']['asset_id'])
        project_asset_id.append(j['metadata']['asset_id'])

Asset id of Bill Payment View.csv : 2200f45a-9c1d-4265-b50f-928c2f4ab213


Using the asset ID for the dataset, upload the dataset into the catalog using the post request below. Get the new asset ID of the newly published dataset.

In [47]:
print("ASSET ID's of the published assets")
# Creates a empty dictionary
catalog_asset_ids={}
for asset_id in project_asset_id:
    #for  each asset in the project , publish them into the catalog 
    # pyload to publish the asset
    payload={"mode":0,"catalog_id":catalog_id,"metadata":{}}
    # endpoint to publish asset
    asset_publish_url=wkcURL+"v2/assets/"+asset_id+"/publish?project_id="+project_id
    # Post request with endpoint, heaeder and payload
    publishasset=requests.post(asset_publish_url,json=payload,headers=headers,verify=False)
    # api endpoint returns below text
    publishasset_json=json.loads(publishasset.text)
    # extract csv file published and its asset id and append it to the dictionary
    catalog_asset_ids[publishasset_json['metadata']['name']]=publishasset_json['asset_id']
    
print(catalog_asset_ids)

ASSET ID's of the published assets
{'Bill Payment View.csv': '9ce52300-34cf-4557-8438-bfbc03c9396a'}


### 4. Assign Business Terms to Column Headers

Read in the file with business terms and their associated column headers and view a sample of the data.

In [48]:
map_terms_file = project.get_file(terms_file)
map_terms_file.seek(0)
map_terms = pd.read_csv(map_terms_file)

print(map_terms.shape)
map_terms.head()

(71, 4)


Unnamed: 0,Business Terms,Column_header,Table,File
0,Account Number,ACCOUNT_ID,ENERGY and UTILITIES ACCELERATORS,ACCOUNT
1,Address Line 1 Description,APPT_NBR,ENERGY and UTILITIES ACCELERATORS,LOCATION
2,Base Usage,BASE_USAGE,ENERGY and UTILITIES ACCELERATORS,INVOICE
3,Bill Alternate Charge,ALTERNATE_CHARGE,ENERGY and UTILITIES ACCELERATORS,INVOICE
4,Bill Alternate Usage,ALTERNATE_USAGE,ENERGY and UTILITIES ACCELERATORS,INVOICE


In [49]:
df_terms.head()



Unnamed: 0,entity.artifacts.global_id,metadata.name
0,62d74dc3-fef7-4041-81da-3b0c2cd3f56e_f5b834a0-...,Education Level
1,62d74dc3-fef7-4041-81da-3b0c2cd3f56e_71368f15-...,Customer Status
2,62d74dc3-fef7-4041-81da-3b0c2cd3f56e_8e73bf2d-...,Customer Agreement Payment Cycle
3,62d74dc3-fef7-4041-81da-3b0c2cd3f56e_3f97dec8-...,Closed Customer Agreements
4,62d74dc3-fef7-4041-81da-3b0c2cd3f56e_d2a18eee-...,Communication Processing Time


Join the `df_terms` and `map_terms` dataframes and map each column header to a business term. The code below loops through each file in the catalog (one file in our case) and performs the following tasks:

1. Create a dataframe with column headers in the catalog and associated business term and term ids.
2. Fetch catalog asset id for each csv in the catalog.
3. Create a column_info attribute for all the files in the catalog.
4. Map column header to the business terms. 

In [50]:
# For every file in the map terms csv do the following
# Join the csv with specified file name with the published terms to get its term id
# drop if any duplicates found to avoid multiple mappings for the same term

#map_terms=map_terms[map_terms['File']==file]
map_terms=map_terms.sort_values(by=['File','Column_header'])
Terms_Headers=pd.merge(map_terms,df_terms,left_on='Business Terms',right_on='metadata.name',how='inner')
Terms_Headers=Terms_Headers.drop_duplicates()

for file in catalog_asset_ids:#map_terms.File.unique():
    # Catalog asset id of the particular csvs
    # for each file name in the map_terms if the csv with this file name exists, get its asset_id from the catalog and use the post request publish create column_info attribute
    # This column info attribute is necessary to map the busines terms to column to header
    

    catalog_asset_id=catalog_asset_ids[file]
    print(file,  catalog_asset_id)
    #### 
    payload={"name": "column_info",
       "entity":{
                  #"sample_size":50
               }
    }
    t=requests.post(wkcURL+"v2/assets/"+catalog_asset_id+"/attributes?catalog_id="+catalog_id,json=payload,headers=headers,verify=False)
    #print(t.text)
    # For each column header in the file map its corresponding business term retrieved from the above join in the dataframe

    i=0
    for index, rows in Terms_Headers.iterrows(): 
        i+=1
        print(i,rows.Column_header.strip(), "is mapped to", rows['Business Terms'])
        # Create list for the current row 
        # Below payload is used for the patch request to map the  header to business terms
        payload=[{"op":"add","path":"/"+rows.Column_header.strip(),"value":{"column_terms":[{"term_display_name":rows['Business Terms'],"term_id":rows["entity.artifacts.global_id"]}]},"attribute":"column_info"}]
    #
        # Endpoint for patch request
        url=wkcURL+"v2/assets/"+catalog_asset_id+"/attributes/column_info?catalog_id="+catalog_id
    # patch request to map busines terms to column header using term_id
        patch_attribute=s.patch(url,json=payload,headers=headers,verify=False)
    #
        json.loads(patch_attribute.text) 

Bill Payment View.csv 9ce52300-34cf-4557-8438-bfbc03c9396a
1 ACCOUNT_ID is mapped to Account Number
2 CREDIT_HISTORY is mapped to Credit Risk Score
3 CREDIT_HISTORY is mapped to Credit Risk Score
4 BUILDING_TYPE is mapped to Customer Building Type
5 AGE is mapped to Date Of Birth
6 DATE_OF_BIRTH is mapped to Date Of Birth
7 CALL_CENTER_RESPONSE is mapped to Customer Communication
8 EMAIL_RESPONSE is mapped to Customer Communication
9 SMART_METER_COMMENTS is mapped to Customer Communication
10 CLTV is mapped to Customer Lifetime Value
11 CLTV_RATIO is mapped to Customer Lifetime Value
12 CLTV_RATIO is mapped to Customer Acquisition Cost
13 COMPLAINTS is mapped to Customer Complaints
14 ISSUE is mapped to Customer Complaints
15 ISSUE_CD is mapped to Customer Complaints
16 ISSUE_ID is mapped to Customer Complaints
17 CREDIT_RATING is mapped to Customer Credit Authority Level
18 CST_SEGMENT_ID is mapped to Customer Market Segment
19 CURRENT_CONTRACT_ID is mapped to Customer Agreement Numbe

The specified dataset is now published to the catalog and its column headers are mapped to their associated business terms. 

Navigate to the business terms or the catalog data asset to view the mappings created.

The associated business term for the column header is displayed.

In [51]:
s.close()

<hr>

Sample Materials, provided under <a href="https://github.com/IBM/Industry-Accelerators/blob/master/CPD%20SaaS/LICENSE" target="_blank" rel="noopener noreferrer">license.</a> <br>
Licensed Materials - Property of IBM. <br>
© Copyright IBM Corp. 2020, 2021. All Rights Reserved. <br>
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp. <br>