In [485]:
import pandas as pd
import numpy as np
import json
import os
import urllib3
import csv
import logging
import re
import uuid
import shortuuid
from pathlib import Path
from idmc_utils import generate_taskflow, package_import
from urllib.parse import quote

# Set the pandas options
pd.set_option('display.max_columns', None)

# Initialise the job

In [486]:
# Initialise the log file
logging.basicConfig(
    filename='logs/console.log',
    level=logging.DEBUG,
    format='%(asctime)s:%(levelname)s:%(message)s',
)

In [487]:
# Read the config file
logging.info('Reading the config file')
with open('config/config.json', 'r') as infile:
    config = json.load(infile)


# Read the Inputs

In [488]:
# Read the execution plans
logging.info('Reading the execution plans')
dfPlans = pd.read_csv('in/plans.csv', dtype='str', encoding='utf-8', na_filter=False)
dfPlans['plan_step_order'] = dfPlans['plan_step_order'].astype(int)

#TODO remove the below filter once done with testing
test_plans = [
    'Echo Employee Snapshot Oracle R12.1.3', 
    'Echo GL Balance Refresh', 
    'Echo Account Dimension', 
    'Human Resources - Oracle R1213 - Flexfield', 
    'Common-LoadDayDimension Universal',
    'Process_Quality_OracleR1213'
    #'Financials_Universal'
    ]
dfPlans = dfPlans[dfPlans['plan_name'].isin(test_plans)]


In [489]:
# Read the parameter values file
dfParamVals = pd.read_csv('in/param_values.csv', dtype='str', encoding='utf-8', na_filter=False)
dfParamVals['sequence'] = dfParamVals['sequence'].astype(float)
dfParamVals = dfParamVals.sort_values(['obj_wid', 'sequence'])

# Read the task parameter files
dfTaskParams = pd.read_csv('in/params-task.csv', dtype='str', encoding='utf-8', na_filter=False)
dfTaskParams = dfTaskParams.merge(dfPlans, how='inner', on='step_wid')
dfTaskParams = dfTaskParams[['name', 'value', 'type_cd', 'step_wid', 'datatype', 'context_type',
       'inactive_flag', 'comments', 'plan_name', 'step_name', 'param_wid']]
dfTaskParams = dfTaskParams.drop_duplicates()
dfTaskParams['param_filename'] = dfTaskParams['plan_name'].apply(lambda x: re.sub(r'\W+', '_', x) + '.param')

# String join the multi line values
dfParamVals = dfParamVals.groupby(['obj_wid','obj_type'])
dfParamVals = dfParamVals['textfield'].apply(lambda x: ''.join(x))
dfParamVals = pd.DataFrame(dfParamVals)
dfParamVals = dfParamVals.reset_index()
dfParamVals['param_values'] = dfParamVals['textfield'].str.split('_dac_sep_')
dfParamVals = dfParamVals.explode('param_values')
dfParamVals['value_type'] = dfParamVals['param_values'].str.extract(r'^(\S+)=')
dfParamVals['value'] = dfParamVals['param_values'].str.extract(r'^\S+=(.*)$')
dfParamVals = dfParamVals[~dfParamVals['value_type'].isna()]
dfParamVals = dfParamVals.reset_index(drop=True)
dfParamVals = dfParamVals.pivot(index=['obj_wid', 'obj_type', 'textfield'], columns=['value_type'], values=['value'])
dfParamVals = dfParamVals.reset_index()
dfParamVals.columns = ['_'.join(col).strip() for col in dfParamVals.columns.values]
dfParamVals['is_static'] = dfParamVals['value_Runtime'].apply(lambda x: False if x == 'true' else True)
dfParamVals = dfParamVals.rename(columns={'obj_wid_': 'param_wid', 'obj_type_': 'param_type', 'textfield_': 'raw_text'})

# Join the values onto the params
dfTaskParams = dfTaskParams.merge(dfParamVals, how='left', on='param_wid')

In [490]:
# Read the execution parameter files
dfExecParams = pd.read_csv('in/params-exec.csv', dtype='str', encoding='utf-8', na_filter=False)
dfExecParams = dfExecParams.merge(dfPlans, how='inner', on='plan_wid')
dfExecParams = dfExecParams[['param_wid', 'name', 'type_cd', 'plan_wid', 'datatype', 'context_type',
       'inactive_flag', 'comments', 'plan_name']]
dfExecParams = dfExecParams.drop_duplicates()
dfExecParams['param_filename'] = dfExecParams['plan_name'].apply(lambda x: re.sub(r'\W+', '_', x) + '.param')
dfExecParams['name'] = dfExecParams['name'].apply(lambda x: x if x.startswith('$$') else '$$' + x if len(x) > 0 else x)

# Join the values onto the params
dfExecParams = dfExecParams.merge(dfParamVals, how='left', on='param_wid')

# Lookup the Converted Mapping Task IDs

In [491]:
# Login to IDMC
logging.info('Logging into IDMC')
http = urllib3.PoolManager()

data = '{ "username": "' + config['idmc']['user'] + '", "password": "' + config['idmc']['password'] + '" }'

url = 'https://' + config['idmc']['host'] + '/saas/public/core/v3/login'
r = http.request(
    'POST', 
    url,
    timeout=3000,
    body=data,
    headers={
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }
    )
    
# Convert the response into a datframe
result = json.loads(r.data.decode('utf-8'))
sessionID = result['userInfo']['sessionId']


In [492]:
# Get the secure agent ID
logging.info('Getting the secure agent ID')
url = 'https://' + config['idmc']['pod'] + '.' + config['idmc']['host'] + '/saas/api/v2/runtimeEnvironment/name/' + quote(config['idmc']['agentGroupName'])
r = http.request(
    'GET', 
    url,
    timeout=3000,
    headers={
            'Accept': 'application/json',
            'icSessionId': sessionID
        }
    )
    
# Convert the response into a datframe
result = json.loads(r.data.decode('utf-8'))
agentGroupID = result['id']
agentGroupGUID = result['federatedId']
agentGroupName = config['idmc']['agentGroupName']

In [493]:
# Initialise the tasks data frame
logging.info('Getting a list of the mapping tasks')
limit = 200
i = 0
dfTasks = pd.DataFrame()

# Page through mapping task queries
while True:

    # Get a list of the mapping tasks
    url = 'https://' + config['idmc']['pod'] + '.' + config['idmc']['host'] + '/saas/public/core/v3/objects?q=type==%27MTT%27%20and%20location==%27' + config['idmc']['folderPath'] + '%27&limit=' + str(limit) + '&skip=' + str(i)
    r = http.request(
        'GET', 
        url,
        timeout=3000,
        headers={
                'Accept': 'application/json',
                'INFA-SESSION-ID': sessionID
            }
        )
        
    # Convert the response into a datframe
    result = json.loads(r.data.decode('utf-8'))
    dfTmp = pd.json_normalize(result)
    dfResp = dfTmp.copy()
    taskCount = dfResp.iloc[0]['count']
    dfResp = dfResp['objects'].explode()
    dfResp = pd.DataFrame(dfResp)
    dfResp = pd.json_normalize(dfResp['objects'])
    dfTasks = pd.concat([dfTasks, dfResp], ignore_index=True)

    # Break if all records have been returned
    i = i + limit
    if i > taskCount:
        print(f'Finished. dfCount = { len(dfTasks) }; i = { i }; taskCount = { taskCount }')
        break

    print(f'Looping. dfCount = { len(dfTasks) }; i = { i }; taskCount = { taskCount }')

# Clean up the list of returned tasks
dfTasks = dfTasks.drop(columns=['tags'])
dfTasks = dfTasks.drop_duplicates()



Looping. dfCount = 200; i = 200; taskCount = 465
Looping. dfCount = 400; i = 400; taskCount = 465
Finished. dfCount = 465; i = 600; taskCount = 465


In [494]:
# Join the IDMC info onto the plans
dfTasks['step_name'] = dfTasks['path'].apply(lambda x: os.path.basename(x))
dfTasks = dfTasks[['step_name','id','path']]
dfTasks = dfTasks.rename(columns={'id': 'infa_id', 'path': 'infa_path'})
dfPlans = dfPlans.merge(dfTasks, how='left', on='step_name')
dfPlans['agent_id'] = agentGroupID
dfPlans['agent_guid'] = agentGroupGUID
dfPlans['agent_name'] = agentGroupName
dfPlans['script_dir'] = config['local']['scriptsDir']
dfPlans['script_args'] = '' # TODO placeholder for any args that need to be passed to the step script

In [495]:
# Log an error if any plans did not find an existing matching task
dfMissing = dfPlans[(dfPlans['infa_id'] == '') | (dfPlans['infa_id'].isna())].copy()
if len(dfMissing.index) > 0:
    logging.error('Some plans are missing a converted mapping task. Please see "out/missing_tasks.csv" for more details')
    dfMissing.to_csv('out/missing_tasks.csv', index=False, quoting=csv.QUOTE_ALL)

# Generate the Parameter Files

In [496]:
logging.info('Generating the parameter files...')
task_files = dfTaskParams['param_filename'].to_list()
exec_files = dfExecParams['param_filename'].to_list()
combined_files = task_files + exec_files
print(combined_files)
unique_files = list(set(combined_files))

for filename in unique_files:
    
    logging.info(f'Generating the parameter file "out/{ filename }"...')
    unique_sections = dfTaskParams[dfTaskParams['param_filename'] == filename]['step_name'].unique()

    # Set the param file header
    lines = ['#USE_SECTIONS', '']

    for section in unique_sections:

        dfSection = dfTaskParams[( dfTaskParams['param_filename'] == filename ) & ( dfTaskParams['step_name'] == section )].copy()

        # Append the section header
        step_name = dfSection.iloc[0]["step_name"]
        lines.append(f'[Default].[{ step_name }]')

        # Add the parameter values
        for idx, row  in dfSection.iterrows():
            param_name = row['name']
            if row['is_static'] and row['datatype'] == 'Text':
                param_value = row['value_Text']
            elif row['is_static'] and row['datatype'] == 'Timestamp' and row['value_Function'] == 'TO_CUSTOM':
                #TODO add date conversion using the format
                param_format = row['value_Formatter']
                param_value = row['value_Value']
            elif row['datatype'] == 'SQL':
                param_value = row['value_SQL']
            else:
                param_value = row['value']
            lines.append(f"{ param_name }={ param_value }")
        lines.append('')

    # Add the global footer
    lines.append('[Global]')
    dfGlobal = dfExecParams[dfExecParams['param_filename'] == filename].copy()
    for idx, row in dfGlobal.iterrows():
        param_name = row['name']
        if row['is_static'] and row['datatype'] == 'Text':
            param_value = row['value_Text']
        elif row['is_static'] and row['datatype'] == 'Timestamp' and row['value_Function'] == 'TO_CUSTOM':
            #TODO add date conversion using the format
            param_format = row['value_Formatter']
            param_value = row['value_Value']
        elif row['datatype'] == 'SQL':
            param_value = row['value_SQL']
        else:
            param_value = row['value']
        lines.append(f"{ param_name }={ param_value }")

    lines.append('')

    with open(f'out/{ filename }', 'w', encoding='utf-8') as file:
        file.write('\n'.join(lines))


['Echo_Employee_Snapshot_Oracle_R12_1_3.param', 'Echo_Employee_Snapshot_Oracle_R12_1_3.param', 'Echo_Employee_Snapshot_Oracle_R12_1_3.param', 'Echo_Employee_Snapshot_Oracle_R12_1_3.param']


# Generate the Taskflows

In [497]:
logging.info('Generating the taskflows...')

# Filter out any missing tasks
dfPlansFil = dfPlans.copy()
dfPlansFil = dfPlansFil[(dfPlansFil['infa_id'] != '') & (dfPlansFil['infa_id'].notna())]

#TODO Remove the below filter after testing is finished
#dfPlansFil = dfPlansFil[dfPlansFil['step_name']== 'SDE_Universal_APTermsDimension']

planIds = dfPlansFil['plan_wid'].unique()

for planId in planIds:

    # Get the plan for the current id
    dfPlan = dfPlansFil[dfPlansFil['plan_wid'] == planId].copy()

    # Generate the unique identifiers
    dfPlan['dac2idmc_step_id'] = dfPlan.apply(lambda x: "task" + shortuuid.uuid()[:8], axis=1)
    map_order_uuid = { step_order: "group" + shortuuid.uuid()[:8] for step_order in dfPlan['plan_step_order'].unique() }
    dfPlan = dfPlan.sort_values(['plan_wid', 'plan_step_order', 'step_name'])
    dfPlan['sequence'] = range(len(dfPlan))
    dfPlan['dac2idmc_group_id'] = dfPlan['plan_step_order'].map(map_order_uuid)
    dfPlan['dac2idmc_next_id'] = dfPlan['dac2idmc_step_id'].shift(-1)

    # Get the next group IDs and sort the plans
    dfGroups = dfPlan[['plan_step_order','dac2idmc_group_id']].copy()
    dfGroups = dfGroups.drop_duplicates()
    dfGroups = dfGroups.sort_values(['plan_step_order'])
    dfGroups['dac2idmc_next_group'] = dfGroups['dac2idmc_group_id'].shift(-1)
    dfGroups = dfGroups[['dac2idmc_group_id','dac2idmc_next_group']]

    dfPlan = dfPlan.merge(dfGroups, how='inner', on='dac2idmc_group_id')
    dfPlan = dfPlan.sort_values(['sequence'])

    #TODO Uncomment the below lines for troubleshooting
    #print(planId)
    #dfPlan.to_excel(f'out/{ planId }.xlsx')

    # Generate the taskflow ID
    taskflowID = shortuuid.uuid()
    taskflowName = dfPlan.iloc[0]['plan_name']
    taskflowName = re.sub(r'\W+', '_', taskflowName)

    logging.info(f'Create workspace tree "tmp/{ taskflowName }/Explore/Default"')

    # Create the workspace directories
    treePath = Path(f'tmp/{ taskflowName }/Explore/Default')
    treePath.mkdir(parents=True, exist_ok=True)

    logging.info(f'Generating taskflow "{ taskflowName }"')
    generate_taskflow.generate_taskflow(taskflowID, taskflowName, dfPlan, config)

    logging.info(f'Packaging taskflow "{ taskflowName }"')
    package_import.package_import(taskflowID, taskflowName, dfPlan)

    logging.info(f'Done!')

logging.info('All taskflows have been generated!')

# Testing below this line

In [498]:
unique_files

['Echo_Employee_Snapshot_Oracle_R12_1_3.param']

In [499]:
combined_files

['Echo_Employee_Snapshot_Oracle_R12_1_3.param',
 'Echo_Employee_Snapshot_Oracle_R12_1_3.param',
 'Echo_Employee_Snapshot_Oracle_R12_1_3.param',
 'Echo_Employee_Snapshot_Oracle_R12_1_3.param']

In [500]:
dfTaskParams['param_filename'].unique()

array(['Echo_Employee_Snapshot_Oracle_R12_1_3.param'], dtype=object)

In [501]:
dfExecParams['param_filename'].unique()

array(['Echo_Employee_Snapshot_Oracle_R12_1_3.param'], dtype=object)