In [47]:
import json
import os
import time
import pandas as pd
from pandas.core.common import SettingWithCopyWarning
import warnings
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from requests.adapters import HTTPAdapter
from ibm_watson_studio_lib import access_project_or_space
from getpass import getpass

warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
wslib = access_project_or_space()


class WatsonDataAPI:
    def __init__(self, cpd_cluster_host):
        self.cpd_cluster_host = cpd_cluster_host

    def get_catalog_id(self, catalog_name):
        # 헤더
        headers = {
            'Content-Type': "application/json",
            'Authorization': "Bearer "+self.token
        }

        # 카탈로그 엔드포인트
        try:
            resp = self.s.get(
                f"{self.cpd_cluster_host}/v2/catalogs", 
                verify=False, 
                headers=headers
            )
        except requests.exceptions.RequestException as e:
            raise SystemExit(e)

        catalogs = json.loads(resp.text)['catalogs']
        catalog_id = ''
        for metadata in catalogs:
            if metadata['entity']['name'] == catalog_name:
                catalog_id = metadata['metadata']['guid']
                return catalog_id

        if catalog_id == '':
            print(f"The provided catalog name cannot be found. Please ensure that catalog[{catalog_name}] exists")
            return None

    def get_category_id(self, category_name):
        category_list = [each.strip() for each in category_name.split('>>')]
        headers = {
            'Content-Type': "application/json",
            'Authorization': "Bearer "+self.token,
            'Cache-Control': "no-cache",
            'Connection': "keep-alive"
        }
        search_body = {
            "size": 1000,
            "_source": ["artifact_id","metadata.name","categories"],
            "query": {
                "match": {"metadata.artifact_type": "category"}
            }
        }
        try:
            resp = self.s.post(
                f"{self.cpd_cluster_host}/v3/search",
                verify=False,
                json=search_body,
                headers=headers
            )
        except requests.exceptions.RequestException as e:
            raise SystemExit(e)

        resp_json = json.loads(resp.text)
        ret = dict()
        category_exists = False
        for each in resp_json['rows']:
            if each['metadata']['name'] == category_list[-1]:
                if 'primary_category_name' in each['categories'] and len(category_list)>1 and each['categories']['primary_category_name']==category_list[0]:
                    category_id=each['artifact_id']
                    parent_category_name = each['categories']['primary_category_name']
                    category_exists = True
                    break
                if 'primary_category_name' not in each['categories'] and len(category_list)==1:
                    category_id=each['artifact_id']
                    parent_category_name = None
                    category_exists = True
                    break

        if not category_exists:
            print(f"The provided category name cannot be found. Please ensure that category[{category_name}] exists")
        else:
            ret = {
                "category_name": category_name,
                "category_id": category_id, 
                "parent_category_name": parent_category_name
            }
        return ret


    def get_assets_name2id_in_catalog(self, catalog_name):
        catalog_id = self.get_catalog_id(catalog_name)
        search_body={"query":"*:*","limit":200}
        headers = {
            'Content-Type': "application/json",
            'Authorization': "Bearer "+self.token
        }
        resp = self.s.post(
            f"{self.cpd_cluster_host}/v2/asset_types/asset/search?catalog_id="+catalog_id,
            json=search_body,
            verify=False, 
            headers=headers
        )
        resp_json = json.loads(resp.text)
        ret = []
        for each in resp_json['results']:
            ret.append((each['metadata']['name'], each['metadata']['asset_id']))
        return dict(ret)

    def get_terms_id(self, category_name):
        category = self.get_category_id(category_name)
        if len(category)==0:
            return None
        headers = {
            'Content-Type': "application/json",
            'Authorization': "Bearer "+self.token,
            'Cache-Control': "no-cache",
            'Connection': "keep-alive"
        }

        search_body={
            "size": 300, 
            "from": 0, 
            "_source": [
                "artifact_id",
                "metadata.artifact_type",
                "metadata.name",
                "metadata.description",
                "categories",
                "entity.artifacts"],
            "query": {
                "bool": {
                    "filter": {
                        "bool": {
                            "minimum_should_match": 1,
                            "should": [
                                {
                                    "term": {"categories.primary_category_id": category["category_id"]}
                                },
                            ],
                            "must_not": {
                                "terms": {
                                    "metadata.artifact_type": ["category"]
                                }
                            }
                        }
                    }
                }
            }
        }
        try:
            resp = self.s.post(
                f"{self.cpd_cluster_host}/v3/search",
                headers=headers,
                json=search_body,
                verify=False
            )
        except requests.exceptions.RequestException as e:
            raise SystemExit(e)
        resp_json = json.loads(resp.text)['rows']
        df_terms = pd.json_normalize(resp_json)
        if not df_terms.empty: 
            df_terms = df_terms[['entity.artifacts.global_id','metadata.name','categories.primary_category_name',"metadata.artifact_type"]]
            df_terms['parent_category_name'] = category['parent_category_name']
            return df_terms
        else:
            return None
    
    def create_attribute(self, catalog_name, asset_name):
        headers = {
            'Content-Type': "application/json",
            'Authorization': "Bearer "+self.token
        }
        search_body={
            "name": "column_info",
            "entity": {
            }
        }
        catalog_id = self.get_catalog_id(catalog_name)
        asset_name2id = self.get_assets_name2id_in_catalog(catalog_name)
        asset_id = asset_name2id[asset_name]

        try:
            resp=self.s.post(
                f"{self.cpd_cluster_host}/v2/assets/{asset_id}/attributes?catalog_id={catalog_id}",
                json=search_body,
                headers=headers,
                verify=False,
                timeout=self.timeout
            )
        except requests.exceptions.RequestException as e:
            print('Fail to generate attribute')
            raise SystemExit(e)
    def get_attribute(self, catalog_name, asset_name):
        headers = {
            'Content-Type': "application/json",
            'Authorization': "Bearer "+self.token
        }
        search_body={
            "name": "column_info",
            "entity": {
            }
        }
        catalog_id = self.get_catalog_id(catalog_name)
        asset_name2id = self.get_assets_name2id_in_catalog(catalog_name)
        asset_id = asset_name2id[asset_name]
        
        try: 
            resp = self.s.get(
                f"{self.cpd_cluster_host}/v2/assets/{asset_id}?catalog_id={catalog_id}",
                json=search_body,
                headers=headers,
                verify=False,
                timeout=self.timeout
            )
        except requests.exceptions.RequestException as e:
            print('Fail to get asset info')
            raise SystemExit(e)
    def patch_attribute(self, catalog_name, asset_name, column_name, bizterm_name):
        payload=[
            {
                "op": "add",
                "path": "/"+column_name,
                "value": {
                    "column_terms":[
                        {
                            "term_display_name": df_terms['metadata.name'],
                            "term_id": df_terms["entity.artifacts.global_id"]
                        }
                    ]
                },
                "attribute":"column_info"
            }
        ]
        url = f"{self.cpd_cluster_host}/v2/assets/"+catalog_asset_id+"/attributes/column_info?catalog_id="+catalog_id
        print(payload)
        try:
            patch_attribute=self.s.patch(url,json=payload,headers=headers,verify=False,timeout=self.timeout)
            json.loads(patch_attribute.text)
            map_terms.loc[idx,'Done']='Y'
        except requests.exceptions.RequestException as e:
            print('Fail to patch attribute')
            raise SystemExit(e)

    def map_terms_to_asset(self, map_terms_csv):
        headers = {
            'Content-Type': "application/json",
            'Authorization': "Bearer "+self.token
        }
        catalog2id = dict()
        catalog2assetids = dict()
        category2termsdf = dict()

        map_terms = pd.read_csv(map_terms_csv)
        map_terms= map_terms.sort_values(by=['Catalog','DataAsset']).reset_index(drop=True)

        cnt = 0
        while not (map_terms['Done'] == 'Y').all() and cnt <= self.max_retries_map:
            cnt+=1
            print(f"try {cnt}"+'*'*120)
            for idx, rows in map_terms.iterrows():
                catalog_name = rows.Catalog
                category_name = rows.Category
                data_asset = rows.DataAsset
                start = time.time()
                if catalog_name not in catalog2id.keys():
                    catalog2id[catalog_name] = self.get_catalog_id(rows.Catalog)
                catalog_id = catalog2id[rows.Catalog]
                if catalog_id is not None and rows.Done =='N':
                    print(f"{idx}-[BizTerm:{rows.BusinessTerm}][Category:{category_name}] =>[Column:{rows.ColumnHeader}][DataAsset:{rows.DataAsset}][Catalog:{catalog_name}]")
                    if category_name not in category2termsdf.keys():
                        category2termsdf[category_name] = self.get_terms_id(category_name)
                    df_terms = category2termsdf[category_name]
                    df_terms = df_terms[(df_terms['metadata.name']==rows.BusinessTerm) & (df_terms['metadata.artifact_type']=='glossary_term')].copy()
                    df_terms['ColumnHeader']=rows.ColumnHeader

                    if catalog_name not in catalog2assetids.keys():
                        catalog2assetids[catalog_name]=self.get_asset_id_in_catalog(catalog_name)
                    catalog_asset_ids = catalog2assetids[catalog_name]
                    
                    catalog_asset_id = catalog_asset_ids[catalog_asset_ids['name']==data_asset].asset_id.values[0]
                    search_body={
                        "name": "column_info",
                        "entity":{
                        }
                    }
                    # 카탈로그의 에셋에 어트리뷰트 생성
                    try: 
                        resp=self.s.post(
                            f"{self.cpd_cluster_host}/v2/assets/{catalog_asset_id}/attributes?catalog_id={catalog_id}",
                            json=search_body,
                            headers=headers,
                            verify=False,
                            timeout=self.timeout
                        )
                    except requests.exceptions.RequestException as e:
                        print('Fail to generate attribute')
                        raise SystemExit(e)

                    # 카탈로그의 에셋에 어트리뷰트 정보 
                    try: 
                        resp = self.s.get(
                            f"{self.cpd_cluster_host}/v2/assets/{catalog_asset_id}?catalog_id={catalog_id}",
                            json=search_body,
                            headers=headers,
                            verify=False,
                            timeout=self.timeout
                        )
                    except requests.exceptions.RequestException as e:
                        print('Fail to get asset info')
                        raise SystemExit(e)

                    # 카탈로그의 에셋에 비즈텀 붙임
                    resp_json = json.loads(resp.text)
                    if 'data_asset' in resp_json['entity'].keys():
                        # print(rows)
                        print(rows["BusinessTerm"],"is mapped to",rows.ColumnHeader.strip(), 'in' ,data_asset,'of',catalog_name)
                        # print(df_terms)
                        payload=[{"op":"add",
                                  "path":"/"+rows.ColumnHeader.strip(),
                                  "value":{
                                      "column_terms":[{
                                          "term_display_name":df_terms['metadata.name'],
                                          "term_id":df_terms["entity.artifacts.global_id"]}
                                      ]},
                                  "attribute":"column_info"}]
                        url = f"{self.cpd_cluster_host}/v2/assets/"+catalog_asset_id+"/attributes/column_info?catalog_id="+catalog_id
                        print(payload)
                        try:
                            patch_attribute=self.s.patch(url,json=payload,headers=headers,verify=False,timeout=self.timeout)
                            json.loads(patch_attribute.text)
                            map_terms.loc[idx,'Done']='Y'
                        except requests.exceptions.RequestException as e:
                            print('Fail to patch attribute')
                            raise SystemExit(e)
                        print("="*120)
                end = time.time()
                elapsed_time = end - start
                print(f"mapping is done!!")
            end = time.time()
            print(f"Business term mapping is done and elapsed time is {elapsed_time}s at {cnt} try")
            print("\n")
        # wslib.save_data(map_terms_csv, map_terms.to_csv(index=False).encode(), overwrite=True)
class MapTerm_JSON(WatsonDataAPI):
    def __init__(self, cpd_cluster_host, info_json, max_retries_api_call=30, timeout=0.1, max_retries_map=5):
        super().__init__(cpd_cluster_host)

        def get_token(info_json):
            # load_file(info_json)
            f = open(info_json)
            info = json.load(f)

            headers = {
                'cache-control': 'no-cache',
                'content-type': 'application/json'
            }

            payload = json.dumps({"username":info["username"], "password":info["password"]})
            try:
                authresponse = self.s.post(f'{self.cpd_cluster_host}/icp4d-api/v1/authorize', headers=headers, data=payload, verify=False)
                token=json.loads(authresponse.text)['token']
                return token
            except:
                print("Fail to get token")

        self.timeout = timeout
        self.max_retries_map = max_retries_map

        session = requests.Session()
        session.mount('https://',HTTPAdapter(max_retries=max_retries_api_call))

        self.s = session
        self.token = get_token(info_json) 

In [48]:
inst1 = MapTerm_JSON(
    cpd_cluster_host='https://cpd-zen.apps.infra.cp4dex.com',
    info_json="info.json", 
    max_retries_map=3,
    timeout=1
)

In [49]:
ret = inst1.get_asset_id_in_catalog('Test Catalog')
ret

[('CDB_P_CD_USG', '02bb5086-d6bc-4454-bfdf-4529cbaf2b43'),
 ('CDB_P_LN', '17ef7b3a-6a1c-492f-b501-8411819c10df'),
 ('PostgreSQL CreDB', 'a1421610-e26d-4eeb-84cd-cb263adf29d4'),
 ('CDB_A_ID', '2eee39c6-bb84-4020-b361-64ad3c3347d6'),
 ('CDB_A_DESC', 'bcd48099-2445-4b34-80da-a45b72513842')]

In [50]:
dict(ret)

{'CDB_P_CD_USG': '02bb5086-d6bc-4454-bfdf-4529cbaf2b43',
 'CDB_P_LN': '17ef7b3a-6a1c-492f-b501-8411819c10df',
 'PostgreSQL CreDB': 'a1421610-e26d-4eeb-84cd-cb263adf29d4',
 'CDB_A_ID': '2eee39c6-bb84-4020-b361-64ad3c3347d6',
 'CDB_A_DESC': 'bcd48099-2445-4b34-80da-a45b72513842'}

In [42]:
# ret = inst1.get_terms_id('CreDB >> SubCategoryA')
# ret.head()
ret = inst1.get_asset_id_in_catalog('Test Catalog')
ret.head()

Unnamed: 0,name,asset_id
0,CDB_P_CD_USG,02bb5086-d6bc-4454-bfdf-4529cbaf2b43
1,CDB_P_LN,17ef7b3a-6a1c-492f-b501-8411819c10df
2,PostgreSQL CreDB,a1421610-e26d-4eeb-84cd-cb263adf29d4
3,CDB_A_ID,2eee39c6-bb84-4020-b361-64ad3c3347d6
4,CDB_A_DESC,bcd48099-2445-4b34-80da-a45b72513842


In [43]:
ret

Unnamed: 0,name,asset_id
0,CDB_P_CD_USG,02bb5086-d6bc-4454-bfdf-4529cbaf2b43
1,CDB_P_LN,17ef7b3a-6a1c-492f-b501-8411819c10df
2,PostgreSQL CreDB,a1421610-e26d-4eeb-84cd-cb263adf29d4
3,CDB_A_ID,2eee39c6-bb84-4020-b361-64ad3c3347d6
4,CDB_A_DESC,bcd48099-2445-4b34-80da-a45b72513842


In [46]:
dict(zip(ret.name,ret.asset_id))['CDB_P_CD_USG']

'02bb5086-d6bc-4454-bfdf-4529cbaf2b43'

In [38]:
ret = inst1.map_terms_to_asset('map-term-demo-copy.csv')

try 1************************************************************************************************************************
0-[BizTerm:차주일련번호][Category:CreDB >> SubCategoryA] =>[Column:join_sn][DataAsset:CDB_A_DESC][Catalog:Test Catalog]
차주일련번호 is mapped to join_sn in CDB_A_DESC of Test Catalog


Traceback (most recent call last):
  File "_pydevd_bundle/pydevd_cython.pyx", line 1078, in _pydevd_bundle.pydevd_cython.PyDBFrame.trace_dispatch
  File "_pydevd_bundle/pydevd_cython.pyx", line 297, in _pydevd_bundle.pydevd_cython.PyDBFrame.do_wait_suspend
  File "/opt/conda/envs/Python-3.10/lib/python3.10/site-packages/debugpy/_vendored/pydevd/pydevd.py", line 1976, in do_wait_suspend
    keep_suspended = self._do_wait_suspend(thread, frame, event, arg, suspend_type, from_this_thread, frames_tracker)
  File "/opt/conda/envs/Python-3.10/lib/python3.10/site-packages/debugpy/_vendored/pydevd/pydevd.py", line 2011, in _do_wait_suspend
    time.sleep(0.01)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
payload