In [132]:
import psycopg2
from psycopg2 import extras
import pandas as pd
import json
import urllib.parse
from sqlalchemy import create_engine
import re
from tabulate import tabulate
import logging
import sqlalchemy

In [137]:
logging.basicConfig(level=logging.INFO)

params = {
    'checklist_ids': (373013004340813824, 1),
    'use_case_ids': (1660291903, 1660291904)
}

db_config = {
    'postgres': {
        'source': {
            'host': 'localhost',
            'database': 'tenshi_dwi',
            'username': 'postgres',
            'password': 'postgres',
        },
        'destination': {
            'host': 'localhost',
            'database': 'report_tenshi',
            'username': 'postgres',
            'password': 'postgres',
        },
        'jaas': {
            'host': 'localhost',
            'database': 'tenshi_jaas',
            'username': 'postgres',
            'password': 'postgres',
        },
    }
}

table_names = {
    # 'area_cleaning_processes': 'area_cleaning_processes',
    'facilities': 'facilities',
    'facility_employees': 'facility_employees',
    # 'products': 'products',
    # 'shifts': 'shifts',
    'step_attribute_data_types': 'step_attribute_data_types',
    # 'vendors': 'vendors',
    # 'area_cleaning_process_stages': 'area_cleaning_process_stages',
    # 'facility_locations': 'facility_locations',
    # 'manufacturing_processes': 'manufacturing_processes',
    'processes': 'processes',
    # 'production_batches': 'production_batches',
    # 'raw_materials': 'raw_materials',
    'stages': 'stages',
    'steps': 'steps',
    # 'batch_material_usage': 'batch_material_usage',
    'equipment': 'equipment',
    # 'equipment_cleaning_processes': 'equipment_cleaning_processes',
    'executed_steps': 'executed_steps',
    # 'manufacturing_stages': 'manufacturing_stages',
    # 'product_materials': 'product_materials',
    # 'raw_material_lot_details': 'raw_material_lot_details',
    'step_attributes': 'step_attributes',
    # 'equipment_cleaning_process_stages': 'equipment_cleaning_process_stages',
    'executed_step_exceptions': 'executed_step_exceptions',
    'executed_step_measurements': 'executed_step_measurements'
}

if_exists = 'append'
display_table = False


In [138]:
def get_postgres_connection(postgres_config):
    username = urllib.parse.quote_plus(postgres_config['username'])
    password = urllib.parse.quote_plus(postgres_config['password'])
    return psycopg2.connect(f"host={postgres_config['host']} dbname={postgres_config['database']} user={username} password={password}")

def get_postgres_engine(postgres_config):
    username = urllib.parse.quote_plus(postgres_config['username'])
    password = urllib.parse.quote_plus(postgres_config['password'])
    connection_string = (f"postgresql+psycopg2://{username}:{password}@"
                         f"{postgres_config['host']}/"
                         f"{postgres_config['database']}")
    return create_engine(connection_string, echo=False)

# source_connection = get_postgres_connection(db_config['postgres']['source'])
source_engine = get_postgres_engine(db_config['postgres']['source'])
# destination_connection = get_postgres_connection(db_config['postgres']['destination'])
destination_engine = get_postgres_engine(db_config['postgres']['destination'])
jaas_engine = get_postgres_engine(db_config['postgres']['jaas'])

In [139]:
def get_next_id(df: pd.DataFrame, column: str):
    max_df_id = df[column].max()
    if pd.isna(max_df_id):
        return 1
    return max_df_id + 1

def display(df: pd.DataFrame, override=False):
    if display_table or override:
        print(tabulate(df, headers='keys', tablefmt='pretty'))

In [146]:
FACILITY_QUERY = """
SELECT id, name FROM facilities WHERE id != -1
"""
facility_dtypes =  {'id': 'Int64'}
facility_df = pd.read_sql(FACILITY_QUERY, source_engine, params=params, dtype=facility_dtypes)
new_facility_df = facility_df.copy()
new_facility_df.rename(columns={'id': 'facility_id', 'name': 'facility_name'}, inplace=True)
display(new_facility_df)

+---+-------------+---------------+
|   | facility_id | facility_name |
+---+-------------+---------------+
| 0 | 1665458801  | Tenshi Kaizen |
| 1 | 1693389002  |     R & D     |
+---+-------------+---------------+


In [78]:
new_facility_df.to_sql(table_names['facilities'], destination_engine, if_exists=if_exists, index=False)

2

In [149]:
USER_QUERY = """
SELECT u.id, u.first_name, u.last_name, u.employee_id as internal_employee_id, r."name" as role  FROM users u JOIN user_roles_mapping urm ON urm.users_id = u.id JOIN roles r ON urm.roles_id = r.id
"""
user_dtypes =  {'id': 'Int64'}
user_df = pd.read_sql(USER_QUERY, jaas_engine, params=params, dtype=user_dtypes)
new_user_df = user_df.copy()
new_user_df.rename(columns={'id': 'employee_id'}, inplace=True)
new_user_df.rename(columns={'role': 'employee_role'}, inplace=True)
new_user_df['employee_role'] = new_user_df['employee_role'].str.replace('_', ' ').str.title()
display(new_user_df)

+-----+--------------------+--------------------+----------------------+----------------------+-------------------+
|     |    employee_id     |     first_name     |      last_name       | internal_employee_id |   employee_role   |
+-----+--------------------+--------------------+----------------------+----------------------+-------------------+
|  0  |     1665458800     |     Prashanth      |          V           |        100226        |   Account Owner   |
|  1  |     1665458801     |      Kalpesh       |       Benuskar       |        TNN005        |   System Admin    |
|  2  | 279112794432868352 |      Armisha       |        Thumar        |        TNN022        |  Facility Admin   |
|  3  | 279114944969969664 |       Sachin       |        Mourya        |       TNNI001        |  Facility Admin   |
|  4  | 279167281465921536 |     SUBRAMANYA     |        SHETTY        |         3230         |     Operator      |
|  5  | 279211279823933440 |    Veerabhadra     |          V           |

In [80]:
new_user_df.to_sql(table_names['facility_employees'], destination_engine, if_exists=if_exists, index=False)

177

In [148]:
CHECKLIST_QUERY = """
SELECT c.id as checklist_id, c.name, c.code, uc.id as use_case_id, uc.name as use_case_name, cfm.facilities_id  
FROM checklists c JOIN use_cases uc ON uc.id = c.use_cases_id JOIN checklist_facility_mapping cfm ON cfm.checklists_id = c.id WHERE c.state = 'PUBLISHED' AND c.archived = FALSE AND c.use_cases_id IN %(use_case_ids)s AND c.id IN %(checklist_ids)s
"""
checklist_dtype = {'checklist_id': 'Int64', 'use_case_id': 'Int64', 'facilities_id': 'Int64'}
checklist_df = pd.read_sql(CHECKLIST_QUERY, source_engine, params=params, dtype=checklist_dtype)
new_process_df = checklist_df.copy()
new_process_df.rename(columns={'checklist_id': 'id', 'use_case_name': 'process_type', 'name': 'process_name',
                               'facilities_id': 'facility_id'}, inplace=True)
new_process_df.drop('code', axis=1, inplace=True)
new_process_df.drop('use_case_id', axis=1, inplace=True)
display(new_process_df)

+---+--------------------+---------------------------------------------------+--------------------+-------------+
|   |         id         |                   process_name                    |    process_type    | facility_id |
+---+--------------------+---------------------------------------------------+--------------------+-------------+
| 0 | 373013004340813824 | Cleaning checklist for Sampling/ Dispensing booth | Equipment Cleaning | 1665458801  |
+---+--------------------+---------------------------------------------------+--------------------+-------------+


In [82]:
new_process_df.to_sql(table_names['processes'], destination_engine, if_exists=if_exists, index=False)

1

In [150]:
STAGE_QUERY = """
SELECT s.id as stage_id, s."name", s.checklists_id as checklist_id, s.order_tree FROM stages s JOIN checklists c ON c.id = s.checklists_id WHERE s.archived = FALSE AND c.id IN %(checklist_ids)s ORDER BY c.id, s.order_tree
"""
stage_dtype = {'stage_id': 'Int64', 'checklist_id': 'Int64'}
stage_df = pd.read_sql(STAGE_QUERY, source_engine, params=params, dtype=stage_dtype)
display(stage_df)

+---+--------------------+-------------------------------+--------------------+------------+
|   |      stage_id      |             name              |    checklist_id    | order_tree |
+---+--------------------+-------------------------------+--------------------+------------+
| 0 | 373013004378562561 | Product and Equipment details | 373013004340813824 |     1      |
| 1 | 373013142618628096 |         Prerequisite          | 373013004340813824 |     2      |
| 2 | 373013143281328128 |      Cleaning procedure       | 373013004340813824 |     3      |
| 3 | 373013143948222464 |     Verification by PR/WH     | 373013004340813824 |     4      |
| 4 | 373013144690614272 |     Verification by IPQA      | 373013004340813824 |     5      |
| 5 | 373013146125066240 | Cleaning verification result  | 373013004340813824 |     6      |
+---+--------------------+-------------------------------+--------------------+------------+


In [151]:
new_stage_df = stage_df.copy()
new_stage_df.rename(columns={'stage_id': 'id', 'name': 'stage_name', 'checklist_id': 'process_id'}, inplace=True)
new_stage_df.drop('order_tree', axis=1, inplace=True)
new_stage_df['stage_type'] = ''
display(new_stage_df)

+---+--------------------+-------------------------------+--------------------+------------+
|   |         id         |          stage_name           |     process_id     | stage_type |
+---+--------------------+-------------------------------+--------------------+------------+
| 0 | 373013004378562561 | Product and Equipment details | 373013004340813824 |            |
| 1 | 373013142618628096 |         Prerequisite          | 373013004340813824 |            |
| 2 | 373013143281328128 |      Cleaning procedure       | 373013004340813824 |            |
| 3 | 373013143948222464 |     Verification by PR/WH     | 373013004340813824 |            |
| 4 | 373013144690614272 |     Verification by IPQA      | 373013004340813824 |            |
| 5 | 373013146125066240 | Cleaning verification result  | 373013004340813824 |            |
+---+--------------------+-------------------------------+--------------------+------------+


In [85]:
new_stage_df.to_sql(table_names['stages'], destination_engine, if_exists=if_exists, index=False)

6

In [153]:
TASK_QUERY = """
SELECT t.id as task_id, t.name, t.order_tree, t.stages_id as stage_id, s.checklists_id as checklist_id FROM tasks t JOIN stages s ON s.id = t.stages_id JOIN checklists c ON c.id = s.checklists_id WHERE t.archived = FALSE AND s.archived = FALSE AND c.id IN %(checklist_ids)s ORDER BY c.id, s.order_tree, t.order_tree 
"""
task_dtype = {'task_id': 'Int64', 'stage_id': 'Int64', 'checklist_id': 'Int64'}
task_df = pd.read_sql(TASK_QUERY, source_engine, params=params, dtype=task_dtype)
display(task_df)

+----+--------------------+-------------------------------------------------------------------------------------+------------+--------------------+--------------------+
|    |      task_id       |                                        name                                         | order_tree |      stage_id      |    checklist_id    |
+----+--------------------+-------------------------------------------------------------------------------------+------------+--------------------+--------------------+
| 0  | 373013004378562562 |                            Product and Equipment details                            |     1      | 373013004378562561 | 373013004340813824 |
| 1  | 373039997950222336 |                                    Prerequisites                                    |     1      | 373013142618628096 | 373013004340813824 |
| 2  | 373042088244535296 |                            Pre-preparation for cleaning                             |     1      | 373013143281328128 | 3730130

In [155]:
PARAMETER_QUERY = """
SELECT p.id as parameter_id, p."label" AS name, p."data", p."type", p.order_tree, p.tasks_id as task_id, t.stages_id as stage_id, s.checklists_id as checklist_id FROM parameters p JOIN tasks t ON t.id = p.tasks_id JOIN stages s ON s.id = t.stages_id JOIN checklists c ON c.id = s.checklists_id WHERE p.archived = FALSE  AND t.archived = FALSE AND s.archived = FALSE AND c.id IN %(checklist_ids)s ORDER BY c.id, s.order_tree, t.order_tree, p.order_tree
"""
parameter_dtype = {'parameter_id': 'Int64', 'task_id': 'Int64', 'stage_id': 'Int64', 'checklist_id': 'Int64'}
parameter_df = pd.read_sql(PARAMETER_QUERY, source_engine, params=params, dtype=parameter_dtype)
display(parameter_df)

+----+--------------------+----------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------+--------------------+--------------------+-

In [157]:
new_step_df = task_df.copy()
display(new_step_df)

parameter_types = ('INSTRUCTION', 'MATERIAL')
# Filter to just instructions 
instruction_df = parameter_df[parameter_df['type'] == 'INSTRUCTION']

# Compile regex once 
regex = re.compile(r'<.*?>')

# Remove HTML tags in a vectorized manner and create a new 'clean_text' column
instruction_df['instruction'] = instruction_df['data'].apply(lambda x: re.sub(regex, '', x['text']))

# Group by 'task_id' and 'type', then join the texts together
grouped = instruction_df.groupby(['task_id', 'type'])['instruction'].apply('\n'.join).reset_index()

# Filter out only the 'INSTRUCTION' type
instructions = grouped[grouped['type'] == 'INSTRUCTION']

new_step_df = new_step_df.merge(instructions[['task_id', 'instruction']], on='task_id', how='left')

new_step_df.rename(columns={'task_id': 'id', 'name': 'step_name'}, inplace=True)
new_step_df.drop('order_tree', axis=1, inplace=True)
new_step_df.drop('checklist_id', axis=1, inplace=True)

display(new_step_df)

+----+--------------------+-------------------------------------------------------------------------------------+--------------------+-------------------------------------------------------------------------------------------------------------------------+
|    |         id         |                                      step_name                                      |      stage_id      |                                                       instruction                                                       |
+----+--------------------+-------------------------------------------------------------------------------------+--------------------+-------------------------------------------------------------------------------------------------------------------------+
| 0  | 373013004378562562 |                            Product and Equipment details                            | 373013004378562561 |                                                           nan                                 

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  instruction_df['instruction'] = instruction_df['data'].apply(lambda x: re.sub(regex, '', x['text']))


In [90]:
new_step_df.to_sql(table_names['steps'], destination_engine, if_exists=if_exists, index=False)

14

In [159]:
TASK_EXECUTION_QUERY = """
SELECT te.id AS id, t.id AS task_id, te.reason as reason, TO_TIMESTAMP(te.started_at) AS started_at, TO_TIMESTAMP(te.ended_at) AS ended_at, te.state AS state, te.started_by FROM task_executions te JOIN tasks t ON t.id = te.tasks_id JOIN stages s ON s.id = t.stages_id JOIN checklists c ON c.id = s.checklists_id JOIN jobs j ON j.id = te.jobs_id left join users u ON te.started_by = u.id WHERE t.archived = FALSE AND s.archived = FALSE AND c.id IN %(checklist_ids)s ORDER BY c.id, s.order_tree, t.order_tree
"""
task_execution_dtype = {'id': 'Int64', 'task_id': 'Int64', 'started_by': 'Int64'}
task_execution_df = pd.read_sql(TASK_EXECUTION_QUERY, source_engine, params=params, dtype=task_execution_dtype)
display(task_execution_df, True)

+------+--------------------+--------------------+--------------------------------+---------------------------+---------------------------+--------------------------+--------------------+
|      |         id         |      task_id       |             reason             |        started_at         |         ended_at          |          state           |     started_by     |
+------+--------------------+--------------------+--------------------------------+---------------------------+---------------------------+--------------------------+--------------------+
|  0   | 391874377808207872 | 373013004378562562 |                                | 2023-08-18 08:52:05+00:00 | 2023-08-18 08:55:12+00:00 |        COMPLETED         | 388202066123022336 |
|  1   | 404868767976742913 | 373013004378562562 |                                | 2023-09-23 05:24:03+00:00 | 2023-09-23 05:24:15+00:00 |        COMPLETED         | 388202066123022336 |
|  2   | 404877899999961089 | 373013004378562562 |          

In [162]:
PARAMETER_EXECUTION_QUERY = """
SELECT pv.id, te.id AS task_execution_id, pv.parameters_id, pv.value, pv.choices, p."type" AS parameter_type, to_timestamp(pv.modified_at) AS modified_at, te.tasks_id as task_id FROM parameter_values pv JOIN parameters p ON p.id = pv.parameters_id LEFT JOIN tasks t ON t.id = p.tasks_id LEFT JOIN task_executions te ON te.tasks_id = t.id AND te.jobs_id = pv.jobs_id LEFT JOIN jobs j ON j.id = te.jobs_id AND j.id = pv.jobs_id WHERE p.archived = FALSE AND t.archived = FALSE AND j.checklists_id IN %(checklist_ids)s
"""
parameter_execution_dtype = {'id': 'Int64', 'task_execution_id': 'Int64', 'parameters_id': 'Int64', 'task_id': 'Int64'}
parameter_execution_df = pd.read_sql(PARAMETER_EXECUTION_QUERY, source_engine, params=params, dtype=parameter_execution_dtype)
display(parameter_execution_df)

+------+--------------------+--------------------+--------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+---------------------------+--------------------+
|      |         id         | task_execution_id  |   parameters_id    |               value                |                                                                                                                                                               choices                                                                                                                                                                | parameter_type |        modified_at        |      task_id     

In [93]:
executed_step_measurement_df = parameter_execution_df.copy()
display(parameter_execution_df)

In [94]:
new_step_attribute_df = pd.DataFrame(
    columns=['id', 'step_id', 'data_type_id', 'attribute_label', 'resource_id', 'expected_value1', 'expected_value2',
             'comparison_operator', 'resource_type', 'parameter_id', 'reference_id'])

new_step_attribute_data_types_df = pd.DataFrame(
    columns=['data_type_id', 'measurement_type', 'measurement_unit', 'measurement_description'])

def append_to_attribute_related_df(new_rows_attribute_data_types, new_rows_attribute, attribute_data_types_df, attribute_df):
    new_rows_data_types_df = pd.DataFrame(new_rows_attribute_data_types)
    new_rows_attribute_df = pd.DataFrame(new_rows_attribute)
    attribute_data_types_df = pd.concat([attribute_data_types_df, new_rows_data_types_df], ignore_index=True)
    attribute_df = pd.concat([attribute_df, new_rows_attribute_df], ignore_index=True)
    return  attribute_data_types_df, attribute_df

def create_row_attribute_data_type(data_type_id, measurement_type, measurement_unit, measurement_description):
    return {
        'data_type_id': data_type_id,
        'measurement_type': measurement_type,
        'measurement_unit': measurement_unit,
        'measurement_description': measurement_description
    }

def create_row_attribute(attribute_id, parameter_id, step_id, data_type_id, attribute_label, expected_value1, expected_value2, comparison_operator, resource_id, resource_type, reference_id):
    return {
        'id': attribute_id,
        'step_id': step_id,
        'data_type_id': data_type_id,
        'attribute_label': attribute_label,
        'expected_value1': expected_value1,
        'expected_value2': expected_value2,
        'comparison_operator': comparison_operator,
        'resource_id': resource_id,
        'resource_type': resource_type,
        'parameter_id': parameter_id,
        'reference_id': reference_id
    }

def create_rows_for_attribute_related_data(parameter, identifier):
    new_rows_attribute_data_type = []
    new_rows_attribute = []
    measurement_unit = expected_value1 = expected_value2 = comparison_operator = resource_id = resource_type = reference_id = measurement_type = None
    parameter_type, step_id, name = parameter['type'], parameter['task_id'], parameter['name']
    if parameter_type not in (['SINGLE_SELECT', 'CHECKLIST', 'MULTISELECT']):
        is_parameter_type_handled = True
        if parameter_type == 'NUMBER':
            measurement_type = 'integer'
        elif parameter_type == 'SHOULD_BE':
            measurement_type = 'float'
            operator = parameter['data']['operator']
            if operator == 'EQUAL_TO':
                expected_value1 = parameter['data']['value']
                comparison_operator = '='
            elif operator == 'LESS_THAN':
                expected_value1 = parameter['data']['value']
                comparison_operator = '<'
            elif operator == 'LESS_THAN_EQUAL_TO':
                expected_value1 = parameter['data']['value']
                comparison_operator = '<='
            elif operator == 'MORE_THAN':
                expected_value1 = parameter['data']['value']
                comparison_operator = '>'
            elif operator == 'MORE_THAN_EQUAL_TO':
                expected_value1 = parameter['data']['value']
                comparison_operator = '>='
            elif operator == 'BETWEEN':                
                expected_value1 = parameter['data']['lowerValue']
                expected_value2 = parameter['data']['upperValue']
                comparison_operator = 'between'
        elif parameter_type == 'SINGLE_LINE' or parameter_type == 'MULTI_LINE':
            measurement_type = 'text'
        elif parameter_type == 'DATE' or parameter_type == 'DATE_TIME':
            measurement_type = parameter_type.lower()
        elif parameter_type == 'YES_NO':
            measurement_type = 'boolean'
        elif parameter_type == 'RESOURCE':
            measurement_type = 'text'
            resource_type = parameter['data']['collection']
        else:
            print(f"Parameter type: {parameter_type} is not implemented")
            is_parameter_type_handled = False
        if is_parameter_type_handled:    
            new_row_attribute_data_type = create_row_attribute_data_type(identifier, measurement_type, measurement_unit, name)
            new_row_attribute = create_row_attribute(identifier, parameter['parameter_id'], step_id, new_row_attribute_data_type['data_type_id'], name, expected_value1, expected_value2, comparison_operator, resource_id, resource_type, reference_id)
            new_rows_attribute_data_type.append(new_row_attribute_data_type)
            new_rows_attribute.append(new_row_attribute)
    else:
        if parameter_type == 'SINGLE_SELECT' or parameter_type == 'CHECKLIST' or parameter_type == 'MULTISELECT':
            measurement_type = 'boolean'
            for choice in row['data']:
                name = parameter['name'] + ' - ' + choice['name']
                reference_id = choice['id']
                new_row_attribute_data_type = create_row_attribute_data_type(identifier, measurement_type, measurement_unit, name)
                new_row_attribute = create_row_attribute(identifier, parameter['parameter_id'], step_id, new_row_attribute_data_type['data_type_id'], name, expected_value1, expected_value2, comparison_operator, resource_id, resource_type, reference_id)
                new_rows_attribute_data_type.append(new_row_attribute_data_type)
                new_rows_attribute.append(new_row_attribute)
                identifier += 1
        else:
            print(f"Parameter type: {parameter_type} is not implemented.")
                
    return new_rows_attribute_data_type, new_rows_attribute

next_id = get_next_id(new_step_attribute_data_types_df, 'data_type_id')
relevant_parameter_df = parameter_df[~parameter_df['type'].isin(['INSTRUCTION', 'MATERIAL', 'MEDIA', 'SIGNATURE', 'FILE_UPLOAD'])]
# relevant_parameter_df.rename(columns={'parameter_id' : 'id'}, inplace=True)
for index, row in relevant_parameter_df.iterrows():
    attribute_data_types, attributes = create_rows_for_attribute_related_data(row, next_id)
    if len(attribute_data_types) != 0 and len(attributes) != 0:
        new_step_attribute_data_types_df, new_step_attribute_df = append_to_attribute_related_df(attribute_data_types, attributes, new_step_attribute_data_types_df, new_step_attribute_df)
    next_id += len(attribute_data_types)

# display(new_step_attribute_data_types_df)
display(new_step_attribute_df)

In [95]:
new_step_attribute_data_types_df.to_sql(table_names['step_attribute_data_types'], destination_engine, if_exists=if_exists, index=False)

80

In [96]:
new_step_attribute_df.to_sql(table_names['step_attributes'], destination_engine, if_exists=if_exists, index=False)

80

In [107]:
executed_step_df = task_execution_df.copy()
executed_step_df.rename(columns={'id': 'execution_id', 'task_id': 'step_id', 'started_at': 'execution_start_time', 'ended_at': 'execution_end_time', 'state': 'status', 'started_by': 'executed_by_employee_id'  }, inplace=True)
executed_step_df['batch_id'] = None
executed_step_df.drop('reason', axis=1, inplace=True)
display(executed_step_df,True)

+------+--------------------+--------------------+---------------------------+---------------------------+--------------------------+-------------------------+----------+
|      |    execution_id    |      step_id       |   execution_start_time    |    execution_end_time     |          status          | executed_by_employee_id | batch_id |
+------+--------------------+--------------------+---------------------------+---------------------------+--------------------------+-------------------------+----------+
|  0   | 391874377808207872 | 373013004378562562 | 2023-08-18 08:52:05+00:00 | 2023-08-18 08:55:12+00:00 |        COMPLETED         | 3.8820206612302234e+17  |          |
|  1   | 404868767976742913 | 373013004378562562 | 2023-09-23 05:24:03+00:00 | 2023-09-23 05:24:15+00:00 |        COMPLETED         | 3.8820206612302234e+17  |          |
|  2   | 404877899999961089 | 373013004378562562 | 2023-09-23 06:00:22+00:00 | 2023-09-23 06:01:06+00:00 |        COMPLETED         | 3.882020661

In [70]:
executed_step_df.to_sql(table_names['executed_steps'], destination_engine, if_exists=if_exists, index=False)

IntegrityError: (psycopg2.errors.ForeignKeyViolation) insert or update on table "executed_steps" violates foreign key constraint "fk_executed_steps_employee"
DETAIL:  Key (executed_by_employee_id)=(388202066123022340) is not present in table "facility_employees".

[SQL: INSERT INTO executed_steps (execution_id, step_id, execution_start_time, execution_end_time, status, executed_by_employee_id, batch_id) VALUES (%(execution_id__0)s, %(step_id__0)s, %(execution_start_time__0)s, %(execution_end_time__0)s, %(status__0)s ... 172021 characters truncated ... , %(execution_end_time__999)s, %(status__999)s, %(executed_by_employee_id__999)s, %(batch_id__999)s)]
[parameters: {'status__0': 'COMPLETED', 'execution_end_time__0': datetime.datetime(2023, 8, 18, 8, 55, 12, tzinfo=datetime.timezone.utc), 'executed_by_employee_id__0': 3.8820206612302234e+17, 'execution_start_time__0': datetime.datetime(2023, 8, 18, 8, 52, 5, tzinfo=datetime.timezone.utc), 'execution_id__0': 391874377808207872, 'batch_id__0': None, 'step_id__0': 373013004378562562, 'status__1': 'COMPLETED', 'execution_end_time__1': datetime.datetime(2023, 9, 23, 5, 24, 15, tzinfo=datetime.timezone.utc), 'executed_by_employee_id__1': 3.8820206612302234e+17, 'execution_start_time__1': datetime.datetime(2023, 9, 23, 5, 24, 3, tzinfo=datetime.timezone.utc), 'execution_id__1': 404868767976742913, 'batch_id__1': None, 'step_id__1': 373013004378562562, 'status__2': 'COMPLETED', 'execution_end_time__2': datetime.datetime(2023, 9, 23, 6, 1, 6, tzinfo=datetime.timezone.utc), 'executed_by_employee_id__2': 3.8820206612302234e+17, 'execution_start_time__2': datetime.datetime(2023, 9, 23, 6, 0, 22, tzinfo=datetime.timezone.utc), 'execution_id__2': 404877899999961089, 'batch_id__2': None, 'step_id__2': 373013004378562562, 'status__3': 'COMPLETED', 'execution_end_time__3': datetime.datetime(2023, 9, 5, 16, 23, 17, tzinfo=datetime.timezone.utc), 'executed_by_employee_id__3': 3.8820206612302234e+17, 'execution_start_time__3': datetime.datetime(2023, 9, 5, 16, 20, 39, tzinfo=datetime.timezone.utc), 'execution_id__3': 398495534738268161, 'batch_id__3': None, 'step_id__3': 373013004378562562, 'status__4': 'COMPLETED', 'execution_end_time__4': datetime.datetime(2023, 9, 5, 15, 21, 26, tzinfo=datetime.timezone.utc), 'executed_by_employee_id__4': 3.8820206612302234e+17, 'execution_start_time__4': datetime.datetime(2023, 9, 5, 15, 21, 2, tzinfo=datetime.timezone.utc), 'execution_id__4': 398495429943582721, 'batch_id__4': None, 'step_id__4': 373013004378562562, 'status__5': 'COMPLETED', 'execution_end_time__5': datetime.datetime(2023, 9, 23, 9, 3, 48, tzinfo=datetime.timezone.utc), 'executed_by_employee_id__5': 3.8820206612302234e+17, 'execution_start_time__5': datetime.datetime(2023, 9, 23, 9, 3, 4, tzinfo=datetime.timezone.utc), 'execution_id__5': 404923513861267457, 'batch_id__5': None, 'step_id__5': 373013004378562562, 'status__6': 'COMPLETED', 'execution_end_time__6': datetime.datetime(2023, 9, 6, 13, 12, 12, tzinfo=datetime.timezone.utc), 'executed_by_employee_id__6': 3.8820206612302234e+17, 'execution_start_time__6': datetime.datetime(2023, 9, 6, 13, 11, 24, tzinfo=datetime.timezone.utc), 'execution_id__6': 398824643053969409, 'batch_id__6': None, 'step_id__6': 373013004378562562, 'status__7': 'COMPLETED' ... 6900 parameters truncated ... 'step_id__992': 373021236992008192, 'status__993': 'COMPLETED', 'execution_end_time__993': datetime.datetime(2023, 9, 5, 4, 5, 7, tzinfo=datetime.timezone.utc), 'executed_by_employee_id__993': 3.5889892846350336e+17, 'execution_start_time__993': datetime.datetime(2023, 9, 5, 4, 4, 59, tzinfo=datetime.timezone.utc), 'execution_id__993': 388198568711020544, 'batch_id__993': None, 'step_id__993': 373021236992008192, 'status__994': 'COMPLETED_WITH_EXCEPTION', 'execution_end_time__994': datetime.datetime(2023, 10, 12, 5, 7, 14, tzinfo=datetime.timezone.utc), 'executed_by_employee_id__994': 2.7916670872112742e+17, 'execution_start_time__994': datetime.datetime(2023, 10, 12, 5, 7, 11, tzinfo=datetime.timezone.utc), 'execution_id__994': 403863853674749962, 'batch_id__994': None, 'step_id__994': 373021236992008192, 'status__995': 'NOT_STARTED', 'execution_end_time__995': None, 'executed_by_employee_id__995': None, 'execution_start_time__995': None, 'execution_id__995': 411747636843339786, 'batch_id__995': None, 'step_id__995': 373021236992008192, 'status__996': 'NOT_STARTED', 'execution_end_time__996': None, 'executed_by_employee_id__996': None, 'execution_start_time__996': None, 'execution_id__996': 411747578852892682, 'batch_id__996': None, 'step_id__996': 373021236992008192, 'status__997': 'COMPLETED_WITH_EXCEPTION', 'execution_end_time__997': datetime.datetime(2023, 10, 12, 5, 17, 35, tzinfo=datetime.timezone.utc), 'executed_by_employee_id__997': 2.7916670872112742e+17, 'execution_start_time__997': datetime.datetime(2023, 10, 12, 5, 17, 27, tzinfo=datetime.timezone.utc), 'execution_id__997': 407503511868629002, 'batch_id__997': None, 'step_id__997': 373021236992008192, 'status__998': 'COMPLETED_WITH_EXCEPTION', 'execution_end_time__998': datetime.datetime(2023, 10, 12, 4, 35, 10, tzinfo=datetime.timezone.utc), 'executed_by_employee_id__998': 2.7916670872112742e+17, 'execution_start_time__998': datetime.datetime(2023, 10, 12, 4, 35, 7, tzinfo=datetime.timezone.utc), 'execution_id__998': 411425350986719242, 'batch_id__998': None, 'step_id__998': 373021236992008192, 'status__999': 'COMPLETED_WITH_EXCEPTION', 'execution_end_time__999': datetime.datetime(2023, 10, 11, 7, 14, 12, tzinfo=datetime.timezone.utc), 'executed_by_employee_id__999': 2.7916670872112742e+17, 'execution_start_time__999': datetime.datetime(2023, 10, 11, 7, 14, 9, tzinfo=datetime.timezone.utc), 'execution_id__999': 411389721867411466, 'batch_id__999': None, 'step_id__999': 373021236992008192}]
(Background on this error at: https://sqlalche.me/e/20/gkpj)

In [71]:
executed_step_exception_df = task_execution_df.copy()
executed_step_exception_df = executed_step_exception_df[executed_step_exception_df['state'] == 'COMPLETED_WITH_EXCEPTION']
executed_step_exception_df['exception_id'] = executed_step_exception_df['id']
executed_step_exception_df.rename(columns={'id': 'execution_id', 'reason': 'description', 'ended_at': 'exception_time'}, inplace=True)
executed_step_exception_df.drop('task_id', axis=1, inplace=True)
executed_step_exception_df.drop('state', axis=1, inplace=True)
executed_step_exception_df.drop('started_at', axis=1, inplace=True)
executed_step_exception_df.drop('started_by', axis=1, inplace=True)
display(executed_step_exception_df)

In [72]:
executed_step_exception_df.to_sql(table_names['executed_step_exceptions'], destination_engine, if_exists=if_exists, index=False)

IntegrityError: (psycopg2.errors.ForeignKeyViolation) insert or update on table "executed_step_exceptions" violates foreign key constraint "fk_executed_step_exceptions_execution"
DETAIL:  Key (execution_id)=(404868767976742914) is not present in table "executed_steps".

[SQL: INSERT INTO executed_step_exceptions (execution_id, description, exception_time, exception_id) VALUES (%(execution_id__0)s, %(description__0)s, %(exception_time__0)s, %(exception_id__0)s), (%(execution_id__1)s, %(description__1)s, %(exception_time__1 ... 39305 characters truncated ... 19)s), (%(execution_id__420)s, %(description__420)s, %(exception_time__420)s, %(exception_id__420)s)]
[parameters: {'exception_time__0': datetime.datetime(2023, 9, 23, 5, 24, 55, tzinfo=datetime.timezone.utc), 'exception_id__0': 404868767976742914, 'description__0': 'nylon brush not applicable ', 'execution_id__0': 404868767976742914, 'exception_time__1': datetime.datetime(2023, 9, 23, 6, 2, 18, tzinfo=datetime.timezone.utc), 'exception_id__1': 404877899999961090, 'description__1': 'nylon brush not applicable ', 'execution_id__1': 404877899999961090, 'exception_time__2': datetime.datetime(2023, 9, 5, 16, 24, 44, tzinfo=datetime.timezone.utc), 'exception_id__2': 398495534738268162, 'description__2': 'Nylon brush not applicable ', 'execution_id__2': 398495534738268162, 'exception_time__3': datetime.datetime(2023, 9, 5, 15, 22, 48, tzinfo=datetime.timezone.utc), 'exception_id__3': 398495429943582722, 'description__3': 'Nylon brush not applicable ', 'execution_id__3': 398495429943582722, 'exception_time__4': datetime.datetime(2023, 9, 23, 9, 4, 32, tzinfo=datetime.timezone.utc), 'exception_id__4': 404923513861267458, 'description__4': 'nylon brush not applicable ', 'execution_id__4': 404923513861267458, 'exception_time__5': datetime.datetime(2023, 9, 6, 13, 15, 6, tzinfo=datetime.timezone.utc), 'exception_id__5': 398824643053969410, 'description__5': 'nylon brush not applicable ', 'execution_id__5': 398824643053969410, 'exception_time__6': datetime.datetime(2023, 9, 6, 13, 58, 11, tzinfo=datetime.timezone.utc), 'exception_id__6': 398824532076879874, 'description__6': 'nylon brush not applicable ', 'execution_id__6': 398824532076879874, 'exception_time__7': datetime.datetime(2023, 8, 23, 6, 39, 36, tzinfo=datetime.timezone.utc), 'exception_id__7': 393653172895703042, 'description__7': 'nylon brush not applicable ', 'execution_id__7': 393653172895703042, 'exception_time__8': datetime.datetime(2023, 8, 23, 7, 24, 28, tzinfo=datetime.timezone.utc), 'exception_id__8': 393663842412355586, 'description__8': 'nylon brush not applicable ', 'execution_id__8': 393663842412355586, 'exception_time__9': datetime.datetime(2023, 8, 24, 11, 10, 52, tzinfo=datetime.timezone.utc), 'exception_id__9': 394083277174431746, 'description__9': 'nylon brush not applicable ', 'execution_id__9': 394083277174431746, 'exception_time__10': datetime.datetime(2023, 9, 25, 5, 54, 49, tzinfo=datetime.timezone.utc), 'exception_id__10': 405600499780771842, 'description__10': 'nylon brush not applicable ', 'execution_id__10': 405600499780771842, 'exception_time__11': datetime.datetime(2023, 9, 25, 6, 32, 3, tzinfo=datetime.timezone.utc), 'exception_id__11': 405600415945023490, 'description__11': 'nylon brush not applicable ', 'execution_id__11': 405600415945023490, 'exception_time__12': datetime.datetime(2023, 9, 8, 5, 7, 29, tzinfo=datetime.timezone.utc), 'exception_id__12': 399428182168002562 ... 1584 parameters truncated ... 'description__408': 'Not applicable ', 'execution_id__408': 406373999487442958, 'exception_time__409': datetime.datetime(2023, 10, 12, 5, 3, 34, tzinfo=datetime.timezone.utc), 'exception_id__409': 406373930470170638, 'description__409': 'Not applicable ', 'execution_id__409': 406373930470170638, 'exception_time__410': datetime.datetime(2023, 9, 11, 4, 34, 28, tzinfo=datetime.timezone.utc), 'exception_id__410': 399918688169844750, 'description__410': 'Not Applicable ', 'execution_id__410': 399918688169844750, 'exception_time__411': datetime.datetime(2023, 9, 11, 3, 44, 22, tzinfo=datetime.timezone.utc), 'exception_id__411': 399918772429217806, 'description__411': 'Not Applicable ', 'execution_id__411': 399918772429217806, 'exception_time__412': datetime.datetime(2023, 9, 8, 8, 49, 43, tzinfo=datetime.timezone.utc), 'exception_id__412': 399428273268285454, 'description__412': 'Not Applicable ', 'execution_id__412': 399428273268285454, 'exception_time__413': datetime.datetime(2023, 9, 8, 8, 53, 12, tzinfo=datetime.timezone.utc), 'exception_id__413': 399428182168002574, 'description__413': 'Not Applicable ', 'execution_id__413': 399428182168002574, 'exception_time__414': datetime.datetime(2023, 8, 26, 5, 19, 31, tzinfo=datetime.timezone.utc), 'exception_id__414': 394367332482768910, 'description__414': 'Not applicable ', 'execution_id__414': 394367332482768910, 'exception_time__415': datetime.datetime(2023, 8, 26, 5, 33, 3, tzinfo=datetime.timezone.utc), 'exception_id__415': 394354230299156494, 'description__415': 'not applicable ', 'execution_id__415': 394354230299156494, 'exception_time__416': datetime.datetime(2023, 8, 24, 14, 20, 57, tzinfo=datetime.timezone.utc), 'exception_id__416': 394083277174431758, 'description__416': 'Not Applicable ', 'execution_id__416': 394083277174431758, 'exception_time__417': datetime.datetime(2023, 9, 7, 3, 50, 47, tzinfo=datetime.timezone.utc), 'exception_id__417': 398824532076879886, 'description__417': 'NA', 'execution_id__417': 398824532076879886, 'exception_time__418': datetime.datetime(2023, 9, 7, 3, 55, 50, tzinfo=datetime.timezone.utc), 'exception_id__418': 398824643053969422, 'description__418': 'NA', 'execution_id__418': 398824643053969422, 'exception_time__419': datetime.datetime(2023, 9, 5, 18, 19, 4, tzinfo=datetime.timezone.utc), 'exception_id__419': 398495429947777027, 'description__419': 'NA', 'execution_id__419': 398495429947777027, 'exception_time__420': datetime.datetime(2023, 9, 5, 18, 15, 40, tzinfo=datetime.timezone.utc), 'exception_id__420': 398495534738268174, 'description__420': 'NA', 'execution_id__420': 398495534738268174}]
(Background on this error at: https://sqlalche.me/e/20/gkpj)

In [None]:
executed_step_measurement_df = pd.DataFrame(columns=['measurement_id', 'execution_id', 'step_attribute_id', 'resource_id', 'measurement_value', 'recorded_time', 'resource_type'])

def create_row_executed_step_measurement(measurement_id, execution_id, step_attribute_id, resource_id, measurement_value, recorded_time, resource_type): 
    return {
        'measurement_id': measurement_id,
        'execution_id': execution_id,
        'step_attribute_id': step_attribute_id,
        'resource_id': resource_id,
        'measurement_value': measurement_value,
        'recorded_time': recorded_time,
        'resource_type': resource_type
    }    

new_rows_executed_step_measurement = []
for index, row in parameter_execution_df.iterrows():
    parameter_type = row['parameter_type']
    measurement_id = row['id']
    execution_id = row['task_execution_id']
    recorded_time = row['modified_at']
    step_id = row['task_id']
    step_attribute_id = resource_id = measurement_value = resource_type = None
    if parameter_type in (['SINGLE_SELECT', 'CHECKLIST', 'MULTISELECT', 'YES_NO', 'RESOURCE']):
        if parameter_type == 'RESOURCE':
            resource_type = row['choices']
            pass
        else:
            if row['choices'] is not None:
                for choice in row['choices']:
                    # Very complicated
                    pass
    else:
        step_attribute_id = new_step_attribute_df.loc[(new_step_attribute_df['step_id'] == step_id)]['id']
        measurement_value = row['value']
        # step_attribute_id = step_id
        new_row_executed_step_measurement = create_row_executed_step_measurement(measurement_id, execution_id, step_attribute_id, resource_id, measurement_value, recorded_time, resource_type)
        new_rows_executed_step_measurement.append(new_row_executed_step_measurement)

if len(new_rows_executed_step_measurement) != 0:
    new_rows_executed_step_measurement_df = pd.DataFrame(new_rows_executed_step_measurement)
    executed_step_measurement_df = pd.concat([executed_step_measurement_df, new_rows_executed_step_measurement_df], ignore_index=True)
#             
# executed_step_measurement_df
new_rows_executed_step_measurement