# Map Business Terms to Data Headers

## Introduction

In this notebook we programmatically publish a dataset into a catalog and map business terms to the dataset column headers. The business terms and their mappings are specified in a csv file included with the project. 

This notebook is optional. The analytics project runs as expected even if this notebook is not used. 

**Note that as only Admin users can import terms, this notebook should be run by an Admin user only.**

**This project contains Sample Materials, provided under license. <br>
Licensed Materials - Property of IBM. <br>
© Copyright IBM Corp. 2019, 2020. All Rights Reserved. <br>
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.<br>**

## Prerequisites

### Create Category and Sub-category

Naviagte to Governance -> Categories -> New category -> Provide name of the category -> Under `Subcategories`, click on `Create category` -> Provide name of the subcategory -> Note both category and sub-category names

Example: If you create category as `Category_main` and subcategory as `Category_sub`, you need to make sure the following in `customer-segmentation-glossary-terms.csv` file.

- Column `Category` has path as `Category_main >> Category_sub`.
- Column `Part of Terms` has `Category_main >> Category_sub` path before the terms name.


### Upload Business Terms File

Business term mapping is provided in the file `customer-segmentation-glossary-terms.csv`. 

Navigate to Governance -> Business terms -> Add business term -> Import from file -> Click on `Add file` -> Upload the file -> Publish


### Create Catalog

Navigate to catalog -> All Catalogs -> Create Catalog -> Enter the name for the new catalog and the description if necessary and create the catalog. 

If the user has already created the catalog this step can be skipped and the existing catalog name should be specified in the code cell below.

In [1]:
# imports for the rest APIs interactions with WKC
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import json
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
from pandas.io.json import json_normalize
import pandas as pd
import os

# use this library for reading and saving data in CP4D
from project_lib import Project
project = Project()

## User Inputs

The user must enter the following before running the rest of the notebook: 
1. **host :** host url of the cluster we are working on.
2. **catalog_name :** Name of the catalog that we would like to publish the csv to. This catalog is created based on the instructions above or an existing catalog.
3. **category_name :** Name of the business term category corresponding to the project. This catalog is created based on the instructions above or an existing category.

We also create additional variables. The user does not need to change the code cell below, unless they change the business terms category name or the name of the csv file with mappings.

1. **terms_file :** Name of the csv file containing the list of mappings between column headers and business terms.
2. **csv_file_to_publish :** Name of the csv files that will be published into the catalog and for which we map business terms.

In [2]:
import os
accessToken = os.environ['USER_ACCESS_TOKEN']
host = 'https://********.com/'
# For example, you can customize the catalog_name as 'Industry Accelerators'
catalog_name = 'Industry Accelerators'
# Customize the category_name as 'Effective Farming - Monitor Crop Growth'
category_name = 'Effective Farming - Monitor Crop Growth'

In [3]:
terms_file = "effective-farming-monitor-crop-growth-map-terms.csv" 
csv_file_to_publish = ['jfk_weather.csv']

Create a requests session and use the same session throughout the notebook. 

In [4]:
# Creates requests session and stores in `s`
s = requests.Session()

## Authentication

Generate a token and validate the token on this cluster.

In [5]:
accessToken = os.environ['USER_ACCESS_TOKEN']

## Map Business Terms to Headers

We complete the following steps to map the business terms to column headers:

1. Check if the Category, `Effective Farming - Monitor Crop Growth`, exists in the parent category `Industry Accelerators`.
2. Load the business terms from the `Effective Farming - Monitor Crop Growth` subcategory into a dataframe.
3. Publish the specified dataset into the catalog.
4. Assign business terms to the dataset column headers.

### 1. Check for the Category

Below cell fetches all the categories present in the cluster and stores category id of the category `Effective Farming - Monitor Crop Growth`.

In [6]:
search_url=host+"v3/search"
try:
    headers = {
        'Content-Type': "application/json",
        'Authorization': "Bearer "+accessToken,
        'Cache-Control': "no-cache",
        'Connection': "keep-alive"
        }
    
    search_body = {
        "size": 1000,
        "_source": ["artifact_id","metadata.name"],
       "query": {    
               "match": {"metadata.artifact_type": "category"}
       }
    }
    parent_cat = s.post(search_url, verify=False,  json=search_body, headers=headers)
    # Check if Industry accelerator category exists and load its id into a variable `parent_id`
    if parent_cat.status_code == 200:
        category=json.loads(parent_cat.text)
        for i in category['rows']:
            if i['metadata']['name']== category_name:
                print("Category ",category_name,"exists")
                exists_category=True
                category_id=i['artifact_id'] 
            
                   
except:
    print("The below error has occurred. " + "Please ensure that category, '" + category_name + "', exists.")
    raise ValueError(parent_cat.text)

Category  Effective Farming - Monitor Crop Growth exists


### 2. Load Subcategory Business Terms into Dataframe 

Get all of the terms in the `Effective Farming - Monitor Crop Growth` subcategory and store them in the `df_terms` dataframe.

In [7]:
# Create a payload for the post request, This payload contains information on size of the terms, source, category and subcategory ids
payload={"size":300,"from":0,"_source":["artifact_id","metadata.artifact_type","metadata.name","metadata.description","categories","entity.artifacts"],"query":{"bool":{"filter":{"bool":{"minimum_should_match":1,"should":[{"term":{"categories.primary_category_id":category_id}},{"term":{"categories.secondary_category_ids":category_id}}],"must_not":{"terms":{"metadata.artifact_type":["category"]}}}}}}}
# create a post request with above payload 
wf=s.post(host+"v3/search",headers=headers,json=payload,verify=False)
# it will return all the terms , load these terms into a dataframe
wf_json=json.loads(wf.text)['rows']
df_terms=pd.json_normalize(wf_json)
df_terms.head()

Unnamed: 0,artifact_id,_score,metadata.artifact_type,metadata.name,metadata.description,categories.secondary_category_ids,categories.last_updated_at,categories.secondary_category_global_ids,categories.primary_category_global_id,categories.primary_category_id,categories.primary_category_name,categories.secondary_category_names,entity.artifacts.synonym_global_ids,entity.artifacts.synonyms,entity.artifacts.global_id,entity.artifacts.effective_start_date,entity.artifacts.version_id,entity.artifacts.abbreviation,entity.artifacts.artifact_id
0,a6e251fc-0426-4f3e-9395-1838c1fd2b82,0.0,glossary_term,DAILYSnowfall,DAILYSnowfall,[],1627959056757,[],5d2d5419-0032-4c64-90e2-ce68c6997bb5_df55cd18-...,df55cd18-d85e-46b5-9e82-80b3031e7367,Effective Farming - Monitor Crop Growth,[],[],[],5d2d5419-0032-4c64-90e2-ce68c6997bb5_a6e251fc-...,2021-08-03T02:50:42.410Z,2c4ae536-769c-4eb6-af74-b53b644d179e_0,[],a6e251fc-0426-4f3e-9395-1838c1fd2b82
1,eb701c21-d63d-4a43-834c-36ca03c98ec2,0.0,glossary_term,MonthlyTotalSnowfall,MonthlyTotalSnowfall,[],1627959061741,[],5d2d5419-0032-4c64-90e2-ce68c6997bb5_df55cd18-...,df55cd18-d85e-46b5-9e82-80b3031e7367,Effective Farming - Monitor Crop Growth,[],[],[],5d2d5419-0032-4c64-90e2-ce68c6997bb5_eb701c21-...,2021-08-03T02:50:42.409Z,8a4fdaad-b218-4703-9620-f3cd01211ad8_0,[],eb701c21-d63d-4a43-834c-36ca03c98ec2
2,75578dbd-fbac-435b-957b-75befa6ae1a2,0.0,glossary_term,MonthlyGreatestSnowDepthDate,MonthlyGreatestSnowDepthDate,[],1627959061500,[],5d2d5419-0032-4c64-90e2-ce68c6997bb5_df55cd18-...,df55cd18-d85e-46b5-9e82-80b3031e7367,Effective Farming - Monitor Crop Growth,[],[],[],5d2d5419-0032-4c64-90e2-ce68c6997bb5_75578dbd-...,2021-08-03T02:50:42.404Z,80951194-d889-4772-94c2-5763db6260ed_0,[],75578dbd-fbac-435b-957b-75befa6ae1a2
3,eee35fbb-4f81-44b7-a760-fafb0ab831d6,0.0,glossary_term,MonthlyMinimumTemp,MonthlyMinimumTemp,[],1627959062056,[],5d2d5419-0032-4c64-90e2-ce68c6997bb5_df55cd18-...,df55cd18-d85e-46b5-9e82-80b3031e7367,Effective Farming - Monitor Crop Growth,[],[],[],5d2d5419-0032-4c64-90e2-ce68c6997bb5_eee35fbb-...,2021-08-03T02:50:42.412Z,955e24c1-9ba7-4e19-bb08-195911fbb332_0,[],eee35fbb-4f81-44b7-a760-fafb0ab831d6
4,88822638-585c-4b62-9f10-bce52be0ecf3,0.0,glossary_term,LONGITUDE,LONGITUDE,[],1627959062837,[],5d2d5419-0032-4c64-90e2-ce68c6997bb5_df55cd18-...,df55cd18-d85e-46b5-9e82-80b3031e7367,Effective Farming - Monitor Crop Growth,[],[],[],5d2d5419-0032-4c64-90e2-ce68c6997bb5_88822638-...,2021-08-03T02:50:42.410Z,a7618894-b360-43cb-b0fd-44f57f80bf6e_0,[],88822638-585c-4b62-9f10-bce52be0ecf3


In [8]:
payload

{'size': 300,
 'from': 0,
 '_source': ['artifact_id',
  'metadata.artifact_type',
  'metadata.name',
  'metadata.description',
  'categories',
  'entity.artifacts'],
 'query': {'bool': {'filter': {'bool': {'minimum_should_match': 1,
     'should': [{'term': {'categories.primary_category_id': 'df55cd18-d85e-46b5-9e82-80b3031e7367'}},
      {'term': {'categories.secondary_category_ids': 'df55cd18-d85e-46b5-9e82-80b3031e7367'}}],
     'must_not': {'terms': {'metadata.artifact_type': ['category']}}}}}}}

### 3. Publish Dataset into Catalog

Get the ID of the catalog that was specified in the user inputs at the beginning of this notebook.

In [9]:
## Get catalog that created and its id by providing name of the catalog created, wich should be same as the one entered in the previous cells
catalog_endpoint=host+"v2/catalogs"
# Create new header for the requests
headers = {
'Content-Type': "application/json",
'Authorization': "Bearer "+accessToken

}

# endpoint to get all the catalogs 
get_catalog=s.get(catalog_endpoint,verify=False, headers=headers)


## Find the catalog created with specific name and store name and id of it into catalog_name and catalog_id respectively
try:
    get_catalog_json=json.loads(get_catalog.text)['catalogs']
except:
    print("The below error has occurred. Please ensure that catalog, '" + catalog_name + "', exists")
    raise
    
catalog_id = ''
for metadata in get_catalog_json:
    if metadata['entity']['name']==catalog_name:
        catalog_id=metadata['metadata']['guid']
        print("catalog_id for",catalog_name, catalog_id)

if catalog_id == '':
    print("The provided catalog name cannot be found. Please ensure that catalog, '" + catalog_name + "', exists")
    raise ValueError("Catalog cannot be found")

catalog_id for Industry Accelerators 1aef8835-531c-41fa-8dff-4a67f7619aa5


Get the project id. All project assets can be accessed using this project id.

In [10]:
project_id=os.environ['PROJECT_ID']

Get all existing csv files in the project folder and store the names of these files. 

In [11]:
# payload 
payload={"query":"*:*","limit":200}
# endpoint to access all the project assets in the project folder 
asset_url=host+"/v2/asset_types/asset/search?project_id="+project_id
get_asset=s.post(asset_url,json=payload,verify=False)

Next we get the asset id of the dataset to be published to the catalog.

In [12]:
# Get asset ids of all csv files to be published in to the catalog and store the asset ids in an array

project_asset_id=[]
# Payload to query all project assets
payload={"query":"*:*","limit":200}

get_asset=s.post(host+"v2/asset_types/asset/search?project_id="+project_id,json=payload,verify=False, headers=headers)
get_asset_json=json.loads(get_asset.text)
for j in get_asset_json['results']:
    if j['metadata']['name'] in csv_file_to_publish:
        print("Asset id of",j['metadata']['name'],":",j['metadata']['asset_id'])
        project_asset_id.append(j['metadata']['asset_id'])

Asset id of jfk_weather.csv : a4ced745-9f85-429d-9d2b-3070860401db


### Search if the catalog already has the asset. Delete existing similar asset before publishing

In [13]:
# Search for list of assets in the catalog
asset_list_in_catalog = s.post(host+"v2/asset_types/asset/search?catalog_id="+catalog_id,json=payload,verify=False, headers=headers)
# Store the result in json
asset_list_result = json.loads(asset_list_in_catalog.text)
for result in asset_list_result['results']:
    # Get asset name
    if result['metadata']['name'] in csv_file_to_publish:
        # Get the asset id
        retrieve_asset_id = result['metadata']['asset_id']
        # Delete the asset
        asset_to_delete_url=host+"v2/assets/"+retrieve_asset_id+"?catalog_id="+catalog_id
        delete_result=requests.delete(asset_to_delete_url,headers=headers,verify=False)

### Publish Asset to the Catalog

Using the asset ID for the dataset, upload the dataset into the catalog using the post request below. Get the new asset ID of the newly published dataset.

In [14]:
print("ASSET ID's of the published assets")
# Creates a empty dictionary
catalog_asset_ids={}
for asset_id in project_asset_id:
    #for  each asset in the project , publish them into the catalog 
    # pyload to publish the asset
    payload={"mode":0,"catalog_id":catalog_id,"metadata":{}}
    # endpoint to publish asset
    asset_publish_url=host+"v2/assets/"+asset_id+"/publish?project_id="+project_id
    # Post request with endpoint, heaeder and payload
    publishasset=requests.post(asset_publish_url,json=payload,headers=headers,verify=False)
    # api endpoint returns below text
    publishasset_json=json.loads(publishasset.text)
    # extract csv file published and its asset id and append it to the dictionary
    catalog_asset_ids[publishasset_json['metadata']['name']]=publishasset_json['asset_id']
    
print(catalog_asset_ids)

ASSET ID's of the published assets
{'jfk_weather.csv': '2d6e622c-9d85-4798-993a-7a415a54a37d'}


### 4. Assign Business Terms to Column Headers

Read in the file with business terms and their associated column headers and view a sample of the data.

In [15]:
map_terms_file = project.get_file(terms_file)
map_terms_file.seek(0)
map_terms = pd.read_csv(map_terms_file)

In [16]:
print(map_terms.shape)
map_terms.head()

(90, 3)


Unnamed: 0,Business Terms,Column_header,File
0,STATION,STATION,jfk_weather
1,STATION_NAME,STATION_NAME,jfk_weather
2,ELEVATION,ELEVATION,jfk_weather
3,LATITUDE,LATITUDE,jfk_weather
4,LONGITUDE,LONGITUDE,jfk_weather


In [17]:
pd.set_option('display.max_columns', 2000)
pd.set_option('display.max_rows', 100)
pd.set_option('display.min_rows', 100)
pd.set_option('display.expand_frame_repr', True)
pd.set_option('display.max_colwidth', None)

Join the `df_terms` and `map_terms` dataframes and map each column header to a business term. The code below loops through each file in the catalog and performs the following tasks:

1. Create a dataframe with column headers in the catalog and associated business term and term ids.
2. Fetch catalog asset id for each csv in the catalog.
3. Create a column_info attribute for all the files in the catalog.
4. Map column header to the business terms. 

In [18]:

# For every file in the map terms csv do the following
# Join the csv with specified file name with the published terms to get its term id
# drop if any duplicates found to avoid multiple mappings for the same term

#map_terms=map_terms[map_terms['File']==file]
map_terms=map_terms.sort_values(by=['File','Column_header'])
Terms_Headers=pd.merge(map_terms,df_terms,left_on='Business Terms',right_on='metadata.name',how='inner')
Terms_Headers=Terms_Headers.astype(str).drop_duplicates()

for file in catalog_asset_ids:#map_terms.File.unique():
    # Catalog asset id of the particular csvs
    # for each file name in the map_terms if the csv with this file name exists, get its asset_id from the catalog and use the post request publish create column_info attribute
    # This column info attribute is necessary to map the busines terms to column to header
    
    Terms_Headers_new=Terms_Headers[Terms_Headers.File==file.upper().replace(".CSV","")].copy()
    catalog_asset_id=catalog_asset_ids[file]
    print(file,  catalog_asset_id)
    #### 
    payload={"name": "column_info",
       "entity":{
                  #"sample_size":50
               }
    }
    t=requests.post(host+"v2/assets/"+catalog_asset_id+"/attributes?catalog_id="+catalog_id,json=payload,headers=headers,verify=False)
    #print(t.text)
    # For each column header in the file map its corresponding business term retrieved from the above join in the dataframe

    i=0
    for index, rows in Terms_Headers_new.iterrows(): 
        i+=1
        print(i,rows.Column_header.strip(), "is mapped to", rows['Business Terms'])
        # Create list for the current row 
        # Below payload is used for the patch request to map the  header to business terms
        payload=[{"op":"add","path":"/"+rows.Column_header.strip(),"value":{"column_terms":[{"term_display_name":rows['Business Terms'],"term_id":rows["entity.artifacts.global_id"]}]},"attribute":"column_info"}]
    #
        # Endpoint for patch request
        url=host+"v2/assets/"+catalog_asset_id+"/attributes/column_info?catalog_id="+catalog_id
    # patch request to map busines terms to column header using term_id
        patch_attribute=s.patch(url,json=payload,headers=headers,verify=False)
    #
        json.loads(patch_attribute.text)

jfk_weather.csv 2d6e622c-9d85-4798-993a-7a415a54a37d


In [19]:
!cd /home/wsuser/work && ls

The specified dataset is now published to the catalog and its column headers are mapped to their associated business terms. 

Navigate to below path to verify the mappings created above. <br>

**All Catalogs --> New Catalog --> csv file --> any column header from the above list**.

The associated business term for the column header is displayed.