In [59]:
import os
import psycopg2
import pandas as pd
from mysql import connector
from pandas import DataFrame
from datetime import datetime
from dags.helpers.connections import Mysql, Postgresql

In [60]:
POSTGRE_HOST, POSTGRE_PORT, POSTGRE_DB_NAME, POSTGRE_USER, POSTGRE_PASSWORD = 'localhost', '5432', 'postgres', 'postgres', ''
MYSQL_HOST, MYSQL_PORT, MYSQL_DB_NAME, MYSQL_USER, MYSQL_PASSWORD = 'localhost', '3306', 'mmis', 'root', 'KhaiPhan2906'

def get_reference_table() -> DataFrame:
    """
    Description: Downloads reference table from PostgreSQL database.
    """
    ETL_MANAGER_DB = 'inventory_etl_manager'
    conn_obj = Postgresql(host=POSTGRE_HOST, port=POSTGRE_PORT, db_name=POSTGRE_DB_NAME, user_name=POSTGRE_USER, password=POSTGRE_PASSWORD)
    
    query = f"""SELECT * FROM {ETL_MANAGER_DB}.database_flow_reference_table"""
    ref_table = conn_obj.execute_query(query=query, return_data=True)
    conn_obj.close_connection()  # close connection to greenplum db
    return ref_table

ref_table = get_reference_table()

def extract():
    mysql = Mysql(host=MYSQL_HOST, port=MYSQL_PORT, db_name=MYSQL_DB_NAME, user_name=MYSQL_USER, password=MYSQL_PASSWORD)
    dfs = {}

    #print (ref_table)
    #print(ref_table['key_fields'].values[0])
    #print (f"select {ref_table['key_fields']} from {ref_table['source_schema']}.{ref_table['source_table']};")

    for idx, row in ref_table.iterrows():
        source_schema = row['source_schema']
        source_table = row['source_table']
        key_fields = row['key_fields']
        #query = f"select city, country, birthdate, yearly_income, gender, total_children, education, occupation from mmis.customer;"
        query = f"select {key_fields} from {source_schema}.{source_table};"
        print (source_table)
        data_df = mysql.execute_query(query=query, return_data=True)
        dfs[source_table] = data_df

    return dfs

dfs = extract()
#print(dfs['customer'].head(5))

Successfully connected to postgres!
Query successful
Connection is successfully terminated!
Successfully connected to mmis!
customer
Query successful
promotion
Query successful
product
Query successful
product_class
Query successful
store
Query successful
warehouse
Query successful
time_by_day
Query successful
inventory_fact_1997
Query successful
inventory_fact_1998
Query successful


In [61]:
def age(birthday):
    return 1999 - birthday.year - ((1, 1) < (birthday.month, birthday.day))

def age_category(age):
    if age in range (17,25):
        return "17 - 25"
    elif age in range (26, 35):
        return "26 - 35"
    elif age in range (36, 50):
        return "36 - 50"
    elif age in range (51, 64):
        return "51 - 64"
    elif age in range (65, 75):
        return "65 - 75"
    else:
        return ">75"

def transform_customer(data_df):
    data_df.dropna()
    data_df['age'] = data_df['birthdate'].apply(age)
    data_df['age_category'] = data_df['age'].apply(age_category)

    data_df = data_df.drop('birthdate', axis=1)

    #change position of age column
    temp_column = data_df.pop('age')
    data_df.insert(3, 'age', temp_column)
    #change position of age_category column
    temp_column = data_df.pop('age_category')
    data_df.insert(4, 'age_category', temp_column)

    return data_df


def transform_product(product_df: DataFrame, product_class_df: DataFrame):
    product_df.dropna()
    product_class_df.dropna()
    product_df = product_df.merge(product_class_df, how="inner", on=['product_class_id'])
    return product_df.drop(columns=['product_class_id'])

def transform_promotion(promotion_df: DataFrame):
    options = [110, 0] 
    promotion_df = promotion_df[promotion_df['promotion_district_id'].isin(options)]
    promotion_df = promotion_df.drop(columns=['promotion_district_id'])
    return promotion_df

def transform_store(store_df: DataFrame):
    store_df = store_df.drop(columns=['region_id'])
    return store_df

def transform_warehouse(warehouse_df: DataFrame):
    warehouse_df = warehouse_df.drop(columns=['warehouse_class_id'])
    return warehouse_df

def transform_time(time_df: DataFrame):
    return time_df

def transform_inventory(inventory_df: DataFrame, time_df: DataFrame, promotion_df: DataFrame):
    inventory_df = inventory_df[inventory_df['is_deleted'] == '0']
    inventory_df = inventory_df.drop(columns=['is_deleted'])
    inventory_time_df = inventory_df.merge(time_df, how="inner", on=['time_id'])

    #find promotion_id match time of inventory fact
    promotion_id_list = []
    for i_idx, i_row in inventory_time_df.iterrows():
        inventory_time = i_row['the_date']
        found = False
        for idx, row in promotion_df.iterrows():
            start_time = row['start_date']
            end_time = row['end_date']
            if (inventory_time >= start_time) and (inventory_time <= end_time):
                found = True
                promotion_id_list.append(row['promotion_id'])
                break
        if not found: 
            promotion_id_list.append(0)
    
    
    promotion_id_dict = {'promotion_id': promotion_id_list}
    inventory_df['promotion_id'] = promotion_id_list
    return inventory_df


def transform(dfs):
    transformed_dfs = {}
    transformed_dfs['customers'] = transform_customer(data_df=dfs['customer'])
    transformed_dfs['products'] = transform_product(dfs['product'], dfs['product_class'])
    transformed_dfs['promotions'] = transform_promotion(dfs['promotion'])
    transformed_dfs['stores'] = transform_store(dfs['store'])
    transformed_dfs['warehouses'] = transform_warehouse(dfs['warehouse'])
    transformed_dfs['time_by_day'] = transform_time(dfs['time_by_day'])
    transformed_dfs['inventory_fact'] = transform_inventory(dfs['inventory_fact_1997'], dfs['time_by_day'], transformed_dfs['promotions'])
    transformed_dfs['inventory_fact'] = pd.concat([transformed_dfs['inventory_fact'], transform_inventory(dfs['inventory_fact_1998'], dfs['time_by_day'], transformed_dfs['promotions'])], axis=0)
    return transformed_dfs


processed_df = transform(dfs)
#print(processed_df['customers'].head(5))

In [62]:
inventory_fact = processed_df['inventory_fact']
print(inventory_fact[inventory_fact['time_id'] == 369])

      product_id  time_id  warehouse_id  store_id  units_ordered  \
0            350      369             2         2             42   
2           1397      369             2         2             87   
3            267      369             2         2             94   
4           1270      369             2         2             44   
5            234      369             2         2             12   
6           1370      369             2         2             15   
7           1493      369             2         2             32   
8            710      369             2         2             30   
9            354      369             2         2             16   
10           277      369             2         2             49   
11            94      369             2         2             95   
2776        1147      369            17        17             12   
2777         498      369            17        17             12   
2778        1301      369            17        1

In [63]:
def load (processed_dfs):
    psql = Postgresql(host=POSTGRE_HOST, port=POSTGRE_PORT, db_name=POSTGRE_DB_NAME, user_name=POSTGRE_USER, password=POSTGRE_PASSWORD)

    for idx, row in ref_table.iterrows():
        destination_schema = row['destination_schema']
        destination_table = row['destination_table']
        target_fields = row['target_fields']
        if destination_table:
            psql.insert_values(processed_dfs[destination_table], destination_schema, destination_table, target_fields)

load (processed_df)

Successfully connected to postgres!
Columns to be inserted:  customer_id, city, country, age, age_category, yearly_income, gender, total_children, education, occupation
Columns in the data extracted:  ['customer_id', 'city', 'country', 'age', 'age_category', 'yearly_income', 'gender', 'total_children', 'education', 'occupation']
SQL query is generated at 2023-04-14 02:17:43.530060
START OF QUERY:
 INSERT INTO inventory_dwh.customers (customer_id, city, country, age, age_category, yearly_income, gender, total_children, education, occupation) 
 VALUES (1, 'Tlaxiaco', 'Mexico', 37, '36 - 50', '$30K - $50K', 'F', 4, 'Partial High School', 'Skilled Manual'), (2, 'Sooke', 'Canada', 83, '>75', '$70K - $90K', 'M', 1, 'Partial High School', 'Professional'), (3, 'Issaquah', 'USA', 88, '>75', '$50K - $70K', 'F', 1, 'Bachelors Degree', 'Professional'), (4, 'Burnaby', 'Canada', 29, '26 - 35', '$10K - $30K', 'M', 4, 'Partial High School', 'Skilled Manual'), (5, 'Novato', 'USA', 47, '36 - 50', '$30K 