In [346]:
from sqlalchemy import create_engine, text
from time import time
import pandas as pd
import os 

In [None]:
def database_connect(user, password, host, port, db):
    engine = create_engine(
        f'postgresql://{user}:{password}@{host}:{port}/{db}')
    engine.connect()
    return engine
db_engine = database_connect('root', 'root', 'postgres-db', '5432', 'RetailDB')

In [19]:
def get_file_old(url): 
    if url.endswith('.csv.gz'):
        csv_name = 'output.csv.gz'
    else:
        csv_name = 'output.csv'
    ret = os.system(f"wget {url} -O {csv_name}")
    return ret 

In [51]:
def get_file(url, csv_file): 
    ret = os.system(f"wget {url} -O {csv_file}")
    if ret != 0:
        print(f'Unable to get file {csv_file} , file not found or internet issue')
        os.system(f'rm {csv_file}')
    return ret 

In [354]:
def load_data(df): 
    #OLTP
    print('Insert on transactional table: online_retail_cleaned', end=' => ')
    sql = '''
        SELECT max("Id") FROM "online_retail_cleaned";
    '''
    max_id_df = pd.read_sql(sql, db_engine)
    last_id = max_id_df['max'][0] + 1
    
    df['Id'] = range(last_id, last_id + df.shape[0])
    df = df.set_index('Id')
    
    t_start = time()
    df.to_sql(name='online_retail_cleaned', con=db_engine, if_exists='append', index=True, index_label='Id')
    print(f'Finished insertion in {time()-t_start:.3}s')

    # Date Dim
    print('Insert on DWH table: DimDate', end=' => ')
    sql = '''
        SELECT min("FullDate") AS "MinDate", max("FullDate") AS "MaxDate" FROM "DimDate";
    '''
    max_date_df = pd.read_sql(sql, db_engine)
    max_date = max_date_df['MaxDate'][0] 
    
    max_invoice_date = df['InvoiceDate'].dt.date.max()
    
    date_df = None
    if max_invoice_date > max_date: 
        sql = f'''
            SELECT CAST(to_char(date_trunc('day', days)::date, 'YYYYMMDD') AS INT) AS "DateKey"
            	, date_trunc('day', days):: date AS "FullDate"
            	, EXTRACT(YEAR FROM days) AS "Year"
            	, EXTRACT(MONTH FROM days) AS "Month"
            	, EXTRACT(DAY FROM days) AS "Day"
            	, EXTRACT(WEEK FROM days) AS "Week"
            	, TO_CHAR(days, 'Month') AS "MonthName"
            	, TO_CHAR(days, 'Day') AS "DayName"
            FROM generate_series
                    ( '{str(max_date)}'::timestamp + interval '1 day'
                    , '{str(max_invoice_date)}'::timestamp
                    , '1 day'::interval) days;
            '''
        date_df = pd.read_sql(sql, db_engine, index_col='DateKey')
    
        date_df['Year'] = date_df['Year'].astype(int)
        date_df['Month'] = date_df['Month'].astype(int)
        date_df['Day'] = date_df['Day'].astype(int)
        date_df['Week'] = date_df['Week'].astype(int)
    
        t_start = time()
        date_df.to_sql(name='DimDate', con=db_engine, if_exists='append', index=True, index_label='DateKey')
        print(f'Finished insertion in {time()-t_start:.3}s')
    else: 
        print('No new insertion')

    # DimCustomer 
    print('Insert on DWH table: DimCustomer', end=' => ')
    sql = '''
        SELECT Distinct "CustomerID"
        FROM "DimCustomer";
    '''
    customers_df = pd.read_sql(sql, db_engine)
    existing_customers = list(customers_df['CustomerID'])
    
    customer_df = df[['CustomerID', 'Country']].drop_duplicates(subset=['CustomerID']).copy()
    
    customer_df_insert = customer_df[~customer_df['CustomerID'].isin(existing_customers)]
    customer_df_update = customer_df[customer_df['CustomerID'].isin(existing_customers)]
    
    
    sql = '''
        SELECT max("CustomerKey") AS "MaxCustomerKey" 
        FROM "DimCustomer";
    '''
    max_customer_df = pd.read_sql(sql, db_engine)
    max_customer = max_customer_df['MaxCustomerKey'][0] + 1
    customer_df_insert['CustomerKey'] = range(max_customer, max_customer + customer_df_insert.shape[0])
    
    def set_customer_name(customer_key): 
        return 'Customer ' + str(customer_key)
    customer_df_insert['CustomerName'] = customer_df_insert['CustomerKey'].apply(set_customer_name)
    customer_df_insert = customer_df_insert.set_index('CustomerKey')
    
    t_start = time()
    customer_df_insert.to_sql(name='DimCustomer', con=db_engine, if_exists='append', index=True, index_label='CustomerKey')
    print(f'Finished insertion in {time()-t_start:.3}s')
    
    def update_customer_dwh(row): 
        query = f'''
            UPDATE "DimCustomer"
            SET "Country" = '{row['Country']}'
            WHERE 
                "CustomerID" = '{row['CustomerID']}'
        '''
        with db_engine.connect() as db_conn:
            db_conn.execute(text(query))
            db_conn.commit()
    _ = customer_df_update.apply(update_customer_dwh, axis=1)

    
    # DimProduct 
    print('Insert on DWH table: DimProduct', end=' => ')
    sql = '''
        SELECT Distinct "StockCode"
        FROM "DimProduct";
    '''
    products_df = pd.read_sql(sql, db_engine)
    existing_products = list(products_df['StockCode'])
    
    product_df = df[['StockCode', 'Description', 'UnitPrice']].drop_duplicates().copy()
    
    
    product_df_insert = product_df[~product_df['StockCode'].isin(existing_products)]
    product_df_update = product_df[product_df['StockCode'].isin(existing_products)]
    
    sql = '''
        SELECT max("ProductKey") AS "MaxProductKey" 
        FROM "DimProduct";
    '''
    max_product_df = pd.read_sql(sql, db_engine)
    max_product = max_product_df['MaxProductKey'][0] + 1
    
    product_df_insert['ProductKey'] = range(max_product, max_product + product_df_insert.shape[0])
    def set_product_name(product_key): 
        return 'Product ' + str(product_key)
    product_df_insert['ProductName'] = product_df_insert['ProductKey'].apply(set_product_name)
    product_df_insert = product_df_insert.set_index('ProductKey')
    
    t_start = time()
    product_df_insert.to_sql(name='DimProduct', con=db_engine, if_exists='append', index=True, index_label='ProductKey')
    print(f'Finished insertion in {time()-t_start:.3}s')
    
    def update_product_dwh(row): 
        query = f'''
                UPDATE "DimProduct"
                SET "UnitPrice" = {row['UnitPrice']}
                WHERE "StockCode" = '{row['StockCode']}'
                AND "Description" = '{row['Description'].replace("'", " ")}'
        '''
        with db_engine.connect() as db_conn:
            db_conn.execute(text(query))
            db_conn.commit()
    _ = product_df_update.apply(update_product_dwh, axis=1)
        

In [352]:
def transform_data(csv_file): 
    df = pd.read_csv(csv_file)
    
    # Drop null values 
    df = df.dropna(subset=['CustomerID', 'InvoiceNo', 'StockCode', 'Description'], how='any')

    # Data Types validation 
    dtypes = {
    'InvoiceNo': 'object', 
    'StockCode': 'object', 
    'Description': 'object', 
    'Quantity': 'int64', 
    'InvoiceDate': 'datetime64[ns]',
    'UnitPrice': 'float64', 
    'CustomerID': 'int64', 
    'Country':'object'
    }

    try: 
        df_cols = list(df.columns)
        for col in df_cols: 
            col_dtype = dtypes.get(col, 'unknown')
            print(col + ' (' + col_dtype + ')', end='\t-> ')
            if col_dtype == str(df[col].dtype): 
                print('ok')
            elif col_dtype == 'unknown': 
                print('Unknown column, will be dropped')
                df = df.drop(columns=[col])
            else: 
                if 'datetime' in col_dtype:
                    df[col] = pd.to_datetime(df[col])
                else: 
                    df[col] = df[col].astype(col_dtype)
                print('ok')
    except Exception as e: 
        print(e)


    # Imputing null values & handling outliers  
    df.loc[df['Quantity'] < 0, 'Quantity'] = df.loc[df['Quantity'] < 0, 'Quantity'] * -1
    median_of_means = round(df.groupby('InvoiceNo')['Quantity'].mean().median())
    df['Quantity'] = df['Quantity'].fillna(median_of_means)
    df.loc[df['Quantity'] > 100, 'Quantity'] = 100

    df.loc[df['UnitPrice'] < 0, 'UnitPrice'] = df.loc[df['UnitPrice'] < 0, 'UnitPrice'] * -1
    median_of_means = round(df.groupby('UnitPrice')['UnitPrice'].mean().median())
    df['UnitPrice'] = df['UnitPrice'].fillna(median_of_means)
    df.loc[df['UnitPrice'] > 1000, 'UnitPrice'] = 1000

    nulls_cnt = df[['InvoiceNo', 'StockCode', 'CustomerID', 'Description', 'Quantity', 'UnitPrice']].isna().sum().sum()
    assert nulls_cnt == 0 

    # Drop Duplicates 
    df = df.drop_duplicates()
    assert df.duplicated().sum() == 0

    csv_file = 'cleaned(stg)_' + csv_file 
    df.to_csv(csv_file, index=False)

    load_data(df)

In [343]:
def fetch_data(base_url): 
    cnt = 0 
    file_not_found = 0
    while file_not_found == 0: 
        url = ''
        csv_file = 'ext_online_retail_' + str(cnt) + '.csv'
        url = base_url + csv_file
        print(f'Downloading file {csv_file} .. ')
        file_not_found = get_file(url, csv_file)
        if file_not_found == 0: 
            transform_data(csv_file)
            cnt += 1
    return cnt

In [353]:
base_url = 'https://github.com/yossef-elmahdy/technical-assessment/releases/download/online-retail-data/'
fetch_data(base_url)

Downloading file online_retail_0.csv .. 


--2024-01-28 06:09:13--  https://github.com/yossef-elmahdy/technical-assessment/releases/download/online-retail-data/online_retail_0.csv
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/748174019/ad5aa8fa-827e-4a61-811f-8259eb9728f0?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240128%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240128T060913Z&X-Amz-Expires=300&X-Amz-Signature=5d92d6e579b8fbf48ace5bea620809d2467ef805ba3c1f7c803887dc10d032d2&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=748174019&response-content-disposition=attachment%3B%20filename%3Donline_retail_0.csv&response-content-type=application%2Foctet-stream [following]
--2024-01-28 06:09:13--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/748174019/ad5aa8

InvoiceNo (object)	-> ok
StockCode (object)	-> ok
Description (object)	-> ok
Quantity (int64)	-> ok
InvoiceDate (datetime64[ns])	-> ok
UnitPrice (float64)	-> ok
CustomerID (int64)	-> ok
Country (object)	-> ok
Insert on transactional table: online_retail_cleaned => Finished insertion in 0.00941s
Insert on DWH table: DimDate => Insert on DWH table: DimCustomer => Finished insertion in 0.00973s
Insert on DWH table: DimProduct => Finished insertion in 0.00393s
Downloading file online_retail_1.csv .. 


--2024-01-28 06:09:14--  https://github.com/yossef-elmahdy/technical-assessment/releases/download/online-retail-data/online_retail_1.csv
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/748174019/428354a3-f8b4-45ff-b924-98ec0ba2c49b?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240128%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240128T060915Z&X-Amz-Expires=300&X-Amz-Signature=a485a091868db2c33ede60a141f2ccbe9f807a4fd984484ef3459117b3639f50&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=748174019&response-content-disposition=attachment%3B%20filename%3Donline_retail_1.csv&response-content-type=application%2Foctet-stream [following]
--2024-01-28 06:09:15--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/748174019/428354

InvoiceNo (object)	-> ok
StockCode (object)	-> ok
Description (object)	-> ok
Quantity (int64)	-> ok
InvoiceDate (datetime64[ns])	-> ok
UnitPrice (float64)	-> ok
CustomerID (int64)	-> ok
Country (object)	-> ok
Insert on transactional table: online_retail_cleaned => Finished insertion in 0.00817s
Insert on DWH table: DimDate => Insert on DWH table: DimCustomer => Finished insertion in 0.00546s
Insert on DWH table: DimProduct => Finished insertion in 0.00592s
Downloading file online_retail_2.csv .. 


--2024-01-28 06:09:16--  https://github.com/yossef-elmahdy/technical-assessment/releases/download/online-retail-data/online_retail_2.csv
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/748174019/1900fe41-b8a3-445c-b1c9-2fa3929b6370?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240128%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240128T060916Z&X-Amz-Expires=300&X-Amz-Signature=9339fe8bc7f8a7e33d3f5a93d58944f6083ab5c5b50ce220ce2ba7daf15372fb&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=748174019&response-content-disposition=attachment%3B%20filename%3Donline_retail_2.csv&response-content-type=application%2Foctet-stream [following]
--2024-01-28 06:09:16--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/748174019/1900fe

InvoiceNo (object)	-> ok
StockCode (object)	-> ok
Description (object)	-> ok
Quantity (int64)	-> ok
InvoiceDate (datetime64[ns])	-> ok
UnitPrice (float64)	-> ok
CustomerID (int64)	-> ok
Country (object)	-> ok
Insert on transactional table: online_retail_cleaned => Finished insertion in 0.00931s
Insert on DWH table: DimDate => Insert on DWH table: DimCustomer => Finished insertion in 0.0046s
Insert on DWH table: DimProduct => Finished insertion in 0.00507s
Downloading file online_retail_3.csv .. 


--2024-01-28 06:09:17--  https://github.com/yossef-elmahdy/technical-assessment/releases/download/online-retail-data/online_retail_3.csv
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/748174019/24d2e3fa-f565-4088-9d79-2026e76a8e5b?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240128%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240128T060917Z&X-Amz-Expires=300&X-Amz-Signature=29852eb3df004c618d1bdf1bbead8e4b61ee4d70bc33d4825f2e5af6a7e0b492&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=748174019&response-content-disposition=attachment%3B%20filename%3Donline_retail_3.csv&response-content-type=application%2Foctet-stream [following]
--2024-01-28 06:09:17--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/748174019/24d2e3

InvoiceNo (object)	-> ok
StockCode (object)	-> ok
Description (object)	-> ok
Quantity (int64)	-> ok
InvoiceDate (datetime64[ns])	-> ok
UnitPrice (float64)	-> ok
CustomerID (int64)	-> ok
Country (object)	-> ok
Insert on transactional table: online_retail_cleaned => Finished insertion in 0.00832s
Insert on DWH table: DimDate => Insert on DWH table: DimCustomer => Finished insertion in 0.0041s
Insert on DWH table: DimProduct => Finished insertion in 0.00907s
Downloading file online_retail_4.csv .. 


HTTP request sent, awaiting response... 

Unable to get file online_retail_4.csv , file not found or internet issue


404 Not Found
2024-01-28 06:09:18 ERROR 404: Not Found.



4

In [58]:
def transform_data(csv_file): 
    df = pd.read_csv(csv_file)
    
    # Drop null values 
    df = df.dropna(subset=['CustomerID', 'InvoiceNo', 'StockCode', 'Description'], how='any')

    # Data Types validation 
    dtypes = {
    'InvoiceNo': 'object', 
    'StockCode': 'object', 
    'Description': 'object', 
    'Quantity': 'int64', 
    'InvoiceDate': 'datetime64[ns]',
    'UnitPrice': 'float64', 
    'CustomerID': 'int64', 
    'Country':'object'
    }

    try: 
        df_cols = list(df.columns)
        for col in df_cols: 
            col_dtype = dtypes.get(col, 'unknown')
            print(col + ' (' + col_dtype + ')', end='\t-> ')
            if col_dtype == str(df[col].dtype): 
                print('ok')
            elif col_dtype == 'unknown': 
                print('Unknown column, will be dropped')
                df = df.drop(columns=[col])
            else: 
                if 'datetime' in col_dtype:
                    df[col] = pd.to_datetime(df[col])
                else: 
                    df[col] = df[col].astype(col_dtype)
                print('ok')
    except Exception as e: 
        print(e)


    # Imputing null values & handling outliers  
    df.loc[df['Quantity'] < 0, 'Quantity'] = df.loc[df['Quantity'] < 0, 'Quantity'] * -1
    median_of_means = round(df.groupby('InvoiceNo')['Quantity'].mean().median())
    df['Quantity'] = df['Quantity'].fillna(median_of_means)
    df.loc[df['Quantity'] > 100, 'Quantity'] = 100

    df.loc[df['UnitPrice'] < 0, 'UnitPrice'] = df.loc[df['UnitPrice'] < 0, 'UnitPrice'] * -1
    median_of_means = round(df.groupby('UnitPrice')['UnitPrice'].mean().median())
    df['UnitPrice'] = df['UnitPrice'].fillna(median_of_means)
    df.loc[df['UnitPrice'] > 1000, 'UnitPrice'] = 1000

    nulls_cnt = df[['InvoiceNo', 'StockCode', 'CustomerID', 'Description', 'Quantity', 'UnitPrice']].isna().sum().sum()
    assert nulls_cnt == 0 

    # Drop Duplicates 
    df = df.drop_duplicates()
    assert df.duplicated().sum() == 0

    csv_file = 'cleaned(stg)_' + csv_file 
    df.to_csv(csv_file, index=False)

In [59]:
transform_data('online_retail_0.csv')

InvoiceNo (object)	-> ok
StockCode (object)	-> ok
Description (object)	-> ok
Quantity (int64)	-> ok
InvoiceDate (datetime64[ns])	-> ok
UnitPrice (float64)	-> ok
CustomerID (int64)	-> ok
Country (object)	-> ok


  df = pd.read_csv(csv_file)


In [300]:
df = pd.read_csv('online_retail_1.csv')

In [301]:
dtypes = {
    'InvoiceNo': 'object', 
    'StockCode': 'object', 
    'Description': 'object', 
    'Quantity': 'int64', 
    'InvoiceDate': 'datetime64[ns]',
    'UnitPrice': 'float64', 
    'CustomerID': 'int64', 
    'Country':'object'
}

In [302]:
df = df.dropna(subset=['CustomerID', 'InvoiceNo', 'StockCode', 'Description'], how='any')

### 2. Data Types Validation

In [303]:
try: 
    df_cols = list(df.columns)
    for col in df_cols: 
        col_dtype = dtypes.get(col, 'unknown')
        print(col + ' (' + col_dtype + ')', end='\t-> ')
        if col_dtype == str(df[col].dtype): 
            print('ok')
        elif col_dtype == 'unknown': 
            print('Unknown column, will be dropped')
            df = df.drop(columns=[col])
        else: 
            if 'datetime' in col_dtype:
                df[col] = pd.to_datetime(df[col])
            else: 
                df[col] = df[col].astype(col_dtype)
            print('ok')
except Exception as e: 
    print(e)

InvoiceNo (object)	-> ok
StockCode (object)	-> ok
Description (object)	-> ok
Quantity (int64)	-> ok
InvoiceDate (datetime64[ns])	-> ok
UnitPrice (float64)	-> ok
CustomerID (int64)	-> ok
Country (object)	-> ok


### 3. Handling Missing Values & Outliers

In [304]:
df.loc[df['Quantity'] < 0, 'Quantity'] = df.loc[df['Quantity'] < 0, 'Quantity'] * -1
median_of_means = round(df.groupby('InvoiceNo')['Quantity'].mean().median())
df['Quantity'] = df['Quantity'].fillna(median_of_means)
df.loc[df['Quantity'] > 100, 'Quantity'] = 100

In [305]:
df.loc[df['UnitPrice'] < 0, 'UnitPrice'] = df.loc[df['UnitPrice'] < 0, 'UnitPrice'] * -1
median_of_means = round(df.groupby('UnitPrice')['UnitPrice'].mean().median())
df['UnitPrice'] = df['UnitPrice'].fillna(median_of_means)
df.loc[df['UnitPrice'] > 1000, 'UnitPrice'] = 1000

In [306]:
nulls_cnt = df[['InvoiceNo', 'StockCode', 'CustomerID', 'Description', 'Quantity', 'UnitPrice']].isna().sum().sum()
assert nulls_cnt == 0 

### 4. Handling Duplicates

In [307]:
df = df.drop_duplicates()
assert df.duplicated().sum() == 0

In [260]:
df.to_csv('file.csv', index=False)

In [241]:
from time import time
from sqlalchemy import create_engine, text
import pandas as pd

In [242]:
def database_connect(user, password, host, port, db):
    engine = create_engine(
        f'postgresql://{user}:{password}@{host}:{port}/{db}')
    engine.connect()
    return engine
db_engine = database_connect('root', 'root', 'postgres-db', '5432', 'RetailDB')

## Data Loading

### OLTP

In [173]:
sql = '''
    SELECT max("Id") FROM "online_retail_cleaned";
'''
max_id_df = pd.read_sql(sql, db_engine)
last_id = max_id_df['max'][0] + 1

In [152]:
df['Id'] = range(last_id, last_id + df.shape[0])
df = df.set_index('Id')

In [153]:
t_start = time()
df.to_sql(name='online_retail_cleaned', con=db_engine, if_exists='append', index=True, index_label='Id')
print(f'Finished insertion in {time()-t_start:.3}s')

Finished insertion in 0.0237s


### DWH

#### Date Dim

In [308]:
sql = '''
    SELECT min("FullDate") AS "MinDate", max("FullDate") AS "MaxDate" FROM "DimDate";
'''
max_date_df = pd.read_sql(sql, db_engine)
max_date = max_date_df['MaxDate'][0] 
max_date

datetime.date(2020, 12, 9)

In [309]:
max_invoice_date = df['InvoiceDate'].dt.date.max()

In [310]:
date_df = None
if max_invoice_date > max_date: 
    sql = f'''
        SELECT CAST(to_char(date_trunc('day', days)::date, 'YYYYMMDD') AS INT) AS "DateKey"
        	, date_trunc('day', days):: date AS "FullDate"
        	, EXTRACT(YEAR FROM days) AS "Year"
        	, EXTRACT(MONTH FROM days) AS "Month"
        	, EXTRACT(DAY FROM days) AS "Day"
        	, EXTRACT(WEEK FROM days) AS "Week"
        	, TO_CHAR(days, 'Month') AS "MonthName"
        	, TO_CHAR(days, 'Day') AS "DayName"
        FROM generate_series
                ( '{str(max_date)}'::timestamp + interval '1 day'
                , '{str(max_invoice_date)}'::timestamp
                , '1 day'::interval) days;
        '''
    date_df = pd.read_sql(sql, db_engine, index_col='DateKey')
date_df

Unnamed: 0_level_0,FullDate,Year,Month,Day,Week,MonthName,DayName
DateKey,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
20201210,2020-12-10,2020.0,12.0,10.0,50.0,December,Thursday
20201211,2020-12-11,2020.0,12.0,11.0,50.0,December,Friday
20201212,2020-12-12,2020.0,12.0,12.0,50.0,December,Saturday
20201213,2020-12-13,2020.0,12.0,13.0,50.0,December,Sunday
20201214,2020-12-14,2020.0,12.0,14.0,51.0,December,Monday
...,...,...,...,...,...,...,...
20241127,2024-11-27,2024.0,11.0,27.0,48.0,November,Wednesday
20241128,2024-11-28,2024.0,11.0,28.0,48.0,November,Thursday
20241129,2024-11-29,2024.0,11.0,29.0,48.0,November,Friday
20241130,2024-11-30,2024.0,11.0,30.0,48.0,November,Saturday


In [311]:
date_df['Year'] = date_df['Year'].astype(int)
date_df['Month'] = date_df['Month'].astype(int)
date_df['Day'] = date_df['Day'].astype(int)
date_df['Week'] = date_df['Week'].astype(int)
date_df.dtypes

FullDate     object
Year          int64
Month         int64
Day           int64
Week          int64
MonthName    object
DayName      object
dtype: object

In [None]:
t_start = time()
date_df.to_sql(name='DimDate', con=db_engine, if_exists='append', index=True, index_label='DateKey')
print(f'Finished insertion in {time()-t_start:.3}s')

#### Customer Dim

In [314]:
sql = '''
    SELECT Distinct "CustomerID"
    FROM "DimCustomer";
'''
customers_df = pd.read_sql(sql, db_engine)
existing_customers = list(customers_df['CustomerID'])
len(existing_customers)

4373

In [315]:
customer_df = df[['CustomerID', 'Country']].drop_duplicates(subset=['CustomerID']).copy()
customer_df

Unnamed: 0,CustomerID,Country
0,12583111,Egypt
20,137481234,Borkena Facoo
21,17850,Morocco


In [316]:
customer_df_insert = customer_df[~customer_df['CustomerID'].isin(existing_customers)]
customer_df_update = customer_df[customer_df['CustomerID'].isin(existing_customers)]

In [317]:
customer_df_insert

Unnamed: 0,CustomerID,Country
20,137481234,Borkena Facoo


In [318]:
customer_df_update

Unnamed: 0,CustomerID,Country
0,12583111,Egypt
21,17850,Morocco


In [324]:
sql = '''
    SELECT max("CustomerKey") AS "MaxCustomerKey" 
    FROM "DimCustomer";
'''
max_customer_df = pd.read_sql(sql, db_engine)
max_customer = max_customer_df['MaxCustomerKey'][0] + 1
max_customer

100002

In [320]:
customer_df_insert['CustomerKey'] = range(max_customer, max_customer + customer_df_insert.shape[0])
def set_customer_name(customer_key): 
    return 'Customer ' + str(customer_key)
customer_df_insert['CustomerName'] = customer_df_insert['CustomerKey'].apply(set_customer_name)
customer_df_insert = customer_df_insert.set_index('CustomerKey')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  customer_df_insert['CustomerKey'] = range(max_customer, max_customer + customer_df_insert.shape[0])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  customer_df_insert['CustomerName'] = customer_df_insert['CustomerKey'].apply(set_customer_name)


In [321]:
t_start = time()
customer_df_insert.to_sql(name='DimCustomer', con=db_engine, if_exists='append', index=True, index_label='CustomerKey')
print(f'Finished insertion in {time()-t_start:.3}s')

Finished insertion in 0.0107s


In [323]:
def update_customer_dwh(row): 
    query = f'''
        UPDATE "DimCustomer"
        SET "Country" = '{row['Country']}'
        WHERE 
            "CustomerID" = '{row['CustomerID']}'
    '''
    with db_engine.connect() as db_conn:
        db_conn.execute(text(query))
        db_conn.commit()
_ = customer_df_update.apply(update_customer_dwh, axis=1)

#### Product 

In [325]:
sql = '''
    SELECT Distinct "StockCode"
    FROM "DimProduct";
'''
products_df = pd.read_sql(sql, db_engine)
existing_products = list(products_df['StockCode'])
len(existing_products)

3684

In [326]:
product_df = df[['StockCode', 'Description', 'UnitPrice']].drop_duplicates().copy()
product_df

Unnamed: 0,StockCode,Description,UnitPrice
0,22728,Youssef,3.75
1,22727,ALARM CLOCK BAKELIKE RED,3.75
2,22726,ALARM CLOCK BAKELIKE GREEN,3.75
3,21724,PANDA AND BUNNIES STICKER SHEET,0.85
4,21883,Youssef,0.65
5,10002,INFLATABLE POLITICAL GLOBE,0.85
6,21791,VINTAGE HEADS AND TAILS CARD GAME,1.25
7,21035,SET/2 RED RETROSPOT TEA TOWELS,2.95
8,22326,ROUND SNACK BOXES SET OF4 WOODLAND,2.95
9,22629,SPACEBOY LUNCH BOX,1.95


In [334]:
product_df_insert = product_df[~product_df['StockCode'].isin(existing_products)]
product_df_update = product_df[product_df['StockCode'].isin(existing_products)]

In [335]:
product_df_insert

Unnamed: 0,StockCode,Description,UnitPrice
37,22752111,SET 7 BABUSHKA NESTING BOXES,7.65
38,21730111,GLASS STAR FROSTED T-LIGHT HOLDER,4.25


In [329]:
product_df_update

Unnamed: 0,StockCode,Description,UnitPrice
0,22728,Youssef,3.75
1,22727,ALARM CLOCK BAKELIKE RED,3.75
2,22726,ALARM CLOCK BAKELIKE GREEN,3.75
3,21724,PANDA AND BUNNIES STICKER SHEET,0.85
4,21883,Youssef,0.65
5,10002,INFLATABLE POLITICAL GLOBE,0.85
6,21791,VINTAGE HEADS AND TAILS CARD GAME,1.25
7,21035,SET/2 RED RETROSPOT TEA TOWELS,2.95
8,22326,ROUND SNACK BOXES SET OF4 WOODLAND,2.95
9,22629,SPACEBOY LUNCH BOX,1.95


In [336]:
sql = '''
    SELECT max("ProductKey") AS "MaxProductKey" 
    FROM "DimProduct";
'''
max_product_df = pd.read_sql(sql, db_engine)
max_product = max_product_df['MaxProductKey'][0] + 1
max_product

9181

In [337]:
product_df_insert['ProductKeymax_product, max_product + product_df_insert.shape[0])
def set_product_name(product_key): 
    return 'Product ' + str(product_key)
product_df_insert['ProductName'] = product_df_insert['ProductKey'].apply(set_product_name)'] = range(
product_df_insert = product_df_insert.set_index('ProductKey')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  product_df_insert['ProductKey'] = range(max_product, max_product + product_df_insert.shape[0])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  product_df_insert['ProductName'] = product_df_insert['ProductKey'].apply(set_product_name)


In [338]:
t_start = time()
product_df_insert.to_sql(name='DimProduct', con=db_engine, if_exists='append', index=True, index_label='ProductKey')
print(f'Finished insertion in {time()-t_start:.3}s')

Finished insertion in 0.015s


In [342]:
def update_product_dwh(row): 
    query = f'''
            UPDATE "DimProduct"
            SET "UnitPrice" = {row['UnitPrice']}
            WHERE "StockCode" = '{row['StockCode']}'
            AND "Description" = '{row['Description'].replace("'", " ")}'
    '''
    with db_engine.connect() as db_conn:
        db_conn.execute(text(query))
        db_conn.commit()
_ = product_df_update.apply(update_product_dwh, axis=1)