# Assignment - ETL transform to DWH

## 1. Import libraries

### 1.1 Load libraries

In [2]:
from sqlalchemy.schema import CreateSchema
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
import pandas as pd
from urllib.parse import quote_plus
from dotenv import load_dotenv
import os

load_dotenv()

True

### 1.2 Database connection

#### 1.2.a MSSQL

In [3]:
server_name = os.getenv('MSSQL_SERVER_NAME')
database_name = os.getenv('MSSQL_DATABASE_NAME')
username = os.getenv('MSSQL_USER_NAME')
password = os.getenv('MSSQL_PASSWORD')
driver_name = os.getenv('MSSQL_DRIVER_NAME')

# Add your port number here (1433 is default)
port_number = int(os.getenv('MSSQL_PORT_NUMBER'))

raw_connection_string = (
    f'DRIVER={{{driver_name}}};'
    f'SERVER={server_name},{port_number};' # Note the 'host,port' format
    f'DATABASE={database_name};'
    f'UID={username};'
    f'PWD={password};'
    # **CRITICAL: This is the trust connection setting you need**
    f'TrustServerCertificate=yes;' 
)

quoted_connection_string = quote_plus(raw_connection_string)

connection_url = f"mssql+pyodbc:///?odbc_connect={quoted_connection_string}"

mssql_engine = create_engine(connection_url)
try:
    with mssql_engine.connect() as conn:
        # optional: issue a lightweight query
        conn.execute(text("SELECT 1"))
    print("Connection successful")
except SQLAlchemyError as e:
    print("Connection failed:", e)
    # handle/log error accordingly

mssql_engine

Connection failed: (pyodbc.OperationalError) ('HYT00', '[HYT00] [Microsoft][ODBC Driver 18 for SQL Server]Login timeout expired (0) (SQLDriverConnect)')
(Background on this error at: https://sqlalche.me/e/20/e3q8)


Engine(mssql+pyodbc:///?odbc_connect=DRIVER%3D%7BODBC+Driver+18+for+SQL+Server%7D%3BSERVER%3D127.0.0.1%2C1433%3BDATABASE%3Dmaster%3BUID%3Dsa%3BPWD%3Dtrung_password123%3BTrustServerCertificate%3Dyes%3B)

#### 1.2.b PostgreSQL

In [4]:
server_name = os.getenv('POSTGRESQL_SERVER_NAME')
database_name = os.getenv('POSTGRESQL_DATABASE_NAME')
username = os.getenv('POSTGRESQL_USER_NAME')
password = os.getenv('POSTGRESQL_PASSWORD')
driver_name = os.getenv('POSTGRESQL_DRIVER_NAME')

port_number = int(os.getenv('POSTGRESQL_PORT_NUMBER'))

# Modified connection_url with the port:
connection_url = f"postgresql+psycopg2://{username}:{password}@{server_name}:{port_number}/{database_name}"

postgre_engine = create_engine(connection_url)
postgre_engine

Engine(postgresql+psycopg2://postgres:***@localhost:5432/postgres)

## 2. Extract

Extract and process tables:

- Table `Product`: only keep those sellable items

- Tables `ProductCostHistory`, `ProductListPriceHistory`: merge them into a single table

- Tables `SalesOrderDetail`, `SalesOrderHeader`: retrieve `OrderDate` from `SalesOrderHeader` and merge to `SalesOrderDetail`.

In [6]:
# Load table product
input_product_df = pd.read_sql("""
        SELECT ProductID, Name, ProductSubcategoryID, FinishedGoodsFlag, SellStartDate, SellEndDate
        FROM CompanyX.Production.Product
    """.strip(), mssql_engine)

input_product_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 504 entries, 0 to 503
Data columns (total 6 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   ProductID             504 non-null    int64         
 1   Name                  504 non-null    object        
 2   ProductSubcategoryID  295 non-null    float64       
 3   FinishedGoodsFlag     504 non-null    bool          
 4   SellStartDate         504 non-null    datetime64[ns]
 5   SellEndDate           98 non-null     datetime64[ns]
dtypes: bool(1), datetime64[ns](2), float64(1), int64(1), object(1)
memory usage: 20.3+ KB


In [7]:
# Load table ProductSubCategory
input_psc_df = pd.read_sql("""
        SELECT ProductSubcategoryID, Name, ProductCategoryID
        FROM CompanyX.Production.ProductSubcategory
    """.strip(), mssql_engine)

input_psc_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 37 entries, 0 to 36
Data columns (total 3 columns):
 #   Column                Non-Null Count  Dtype 
---  ------                --------------  ----- 
 0   ProductSubcategoryID  37 non-null     int64 
 1   Name                  37 non-null     object
 2   ProductCategoryID     37 non-null     int64 
dtypes: int64(2), object(1)
memory usage: 1020.0+ bytes


In [8]:
# Load table ProductCategory
input_pc_df = pd.read_sql("""
        SELECT ProductCategoryID, Name
        FROM CompanyX.Production.ProductCategory
    """.strip(), mssql_engine)

input_pc_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 2 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   ProductCategoryID  4 non-null      int64 
 1   Name               4 non-null      object
dtypes: int64(1), object(1)
memory usage: 196.0+ bytes


In [9]:
# Load table ProductCostHistory, ProductListPriceHistory
input_pch_df = pd.read_sql("""
        SELECT ProductID, StartDate, EndDate, StandardCost
        FROM CompanyX.Production.ProductCostHistory
    """.strip(), mssql_engine)

input_plph_df = pd.read_sql("""
        SELECT ProductID, StartDate, EndDate, ListPrice
        FROM CompanyX.Production.ProductListPriceHistory
    """.strip(), mssql_engine)

pricecost_df = pd.merge(input_pch_df, input_plph_df, on=['ProductID', 'StartDate', 'EndDate'], how='inner')
history_available_products = pricecost_df['ProductID'].unique().tolist()

print(f"Number of products having pricecost history: {len(history_available_products)}")
print("-" * 50)

pricecost_df.info()

Number of products having pricecost history: 293
--------------------------------------------------
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 395 entries, 0 to 394
Data columns (total 5 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   ProductID     395 non-null    int64         
 1   StartDate     395 non-null    datetime64[ns]
 2   EndDate       200 non-null    datetime64[ns]
 3   StandardCost  395 non-null    float64       
 4   ListPrice     395 non-null    float64       
dtypes: datetime64[ns](2), float64(2), int64(1)
memory usage: 15.6 KB


In [10]:
# Load table SalesOrderDetail
input_sod_df = pd.read_sql("""
        SELECT ProductID, OrderQty, LineTotal, SalesOrderID
        FROM CompanyX.Sales.SalesOrderDetail
    """.strip(), mssql_engine)

# Load data SalesOrderHeader
input_soh_df = pd.read_sql("""
        SELECT SalesOrderID, OrderDate, CustomerID
        FROM CompanyX.Sales.SalesOrderHeader
    """.strip(), mssql_engine)

input_sod_df = pd.merge(input_sod_df, input_soh_df[['SalesOrderID', 'OrderDate', 'CustomerID']], on='SalesOrderID', how='left')

input_sod_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 121317 entries, 0 to 121316
Data columns (total 6 columns):
 #   Column        Non-Null Count   Dtype         
---  ------        --------------   -----         
 0   ProductID     121317 non-null  int64         
 1   OrderQty      121302 non-null  float64       
 2   LineTotal     121317 non-null  float64       
 3   SalesOrderID  121317 non-null  int64         
 4   OrderDate     121317 non-null  datetime64[ns]
 5   CustomerID    121317 non-null  int64         
dtypes: datetime64[ns](1), float64(2), int64(3)
memory usage: 5.6 MB


In [11]:
# Load table Customer
input_customer_df = pd.read_sql("""
        SELECT CustomerID, PersonID, StoreID, TerritoryID
        FROM CompanyX.Sales.Customer
    """.strip(), mssql_engine)

input_customer_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19820 entries, 0 to 19819
Data columns (total 4 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   CustomerID   19820 non-null  int64  
 1   PersonID     19119 non-null  float64
 2   StoreID      1336 non-null   float64
 3   TerritoryID  19820 non-null  int64  
dtypes: float64(2), int64(2)
memory usage: 619.5 KB


In [12]:
# Load table Territory
input_territory_df = pd.read_sql("""
        SELECT TerritoryID, Name
        FROM CompanyX.Sales.SalesTerritory
    """.strip(), mssql_engine)

input_territory_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 2 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   TerritoryID  10 non-null     int64 
 1   Name         10 non-null     object
dtypes: int64(1), object(1)
memory usage: 292.0+ bytes


## 3. Transform

### 3.1 Product dimension

In [13]:
print(f"Total number of products: {len(input_product_df)}")

# drop non-salable products
dim_product_df = input_product_df[input_product_df['FinishedGoodsFlag'] == 1].reset_index(drop=True).drop(columns=['FinishedGoodsFlag']).copy()
dim_product_df = dim_product_df[dim_product_df['ProductID'].isin(history_available_products)].reset_index(drop=True)

# keep tracks of salable products
salable_products = list(dim_product_df['ProductID'].unique())
print(f"Number of salable products: {len(salable_products)}")

# Keep subcategory and category
dim_category_df = input_pc_df.copy()
dim_subcategory_df = input_psc_df.copy()

print(f"Number of product categories: {len(dim_category_df)}")
print(f"Number of product subcategories: {len(dim_subcategory_df)}")

print(f"-" * 50)

# check missing status
dim_product_df.info()

Total number of products: 504
Number of salable products: 293
Number of product categories: 4
Number of product subcategories: 37
--------------------------------------------------
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 293 entries, 0 to 292
Data columns (total 5 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   ProductID             293 non-null    int64         
 1   Name                  293 non-null    object        
 2   ProductSubcategoryID  293 non-null    float64       
 3   SellStartDate         293 non-null    datetime64[ns]
 4   SellEndDate           98 non-null     datetime64[ns]
dtypes: datetime64[ns](2), float64(1), int64(1), object(1)
memory usage: 11.6+ KB


### 3.2 Price Cost History mini-dimension

In [14]:
# create a surrogate for pricecost history table
dim_pricecost_df = pricecost_df[pricecost_df['ProductID'].isin(salable_products)].reset_index(drop=True).copy()
dim_pricecost_df.insert(0, 'HistoryKey', range(1, 1 + len(dim_pricecost_df)))

dim_pricecost_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 395 entries, 0 to 394
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   HistoryKey    395 non-null    int64         
 1   ProductID     395 non-null    int64         
 2   StartDate     395 non-null    datetime64[ns]
 3   EndDate       200 non-null    datetime64[ns]
 4   StandardCost  395 non-null    float64       
 5   ListPrice     395 non-null    float64       
dtypes: datetime64[ns](2), float64(2), int64(2)
memory usage: 18.6 KB


### 3.3 Customer Dimension

In [15]:
dim_customer_df = input_customer_df.copy()
dim_territory_df = input_territory_df.copy()

dim_customer_df.info()
print(f"-" * 50)
dim_territory_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19820 entries, 0 to 19819
Data columns (total 4 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   CustomerID   19820 non-null  int64  
 1   PersonID     19119 non-null  float64
 2   StoreID      1336 non-null   float64
 3   TerritoryID  19820 non-null  int64  
dtypes: float64(2), int64(2)
memory usage: 619.5 KB
--------------------------------------------------
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 2 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   TerritoryID  10 non-null     int64 
 1   Name         10 non-null     object
dtypes: int64(1), object(1)
memory usage: 292.0+ bytes


### 3.4 Time dimension

In [16]:
import numpy as np

unique_time_lists = pd.to_datetime(input_sod_df['OrderDate'].unique())
datetime_arr = pd.DatetimeIndex(unique_time_lists).to_numpy()

sorted_indices = np.argsort(datetime_arr)
unique_time_lists = unique_time_lists[sorted_indices]

dim_time_df = pd.DataFrame({
    'date': unique_time_lists,
    'month': unique_time_lists.month,
    'quarter': unique_time_lists.quarter,
    'year': unique_time_lists.year,
})

dim_time_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1124 entries, 0 to 1123
Data columns (total 4 columns):
 #   Column   Non-Null Count  Dtype         
---  ------   --------------  -----         
 0   date     1124 non-null   datetime64[ns]
 1   month    1124 non-null   int32         
 2   quarter  1124 non-null   int32         
 3   year     1124 non-null   int32         
dtypes: datetime64[ns](1), int32(3)
memory usage: 22.1 KB


### 3.4 Fact table

In [17]:
fact_sales_df = input_sod_df.copy()
dim_pricecost_df["EndDate"] = dim_pricecost_df["EndDate"].fillna(pd.Timestamp.max)

def assign_interval(fact_df: pd.DataFrame, dim_df: pd.DataFrame, date_col="OrderDate"):
    dim = dim_df.sort_values(["StartDate", "ProductID"])
    fact = fact_df.sort_values([date_col, "ProductID"])

    merged = pd.merge(
        fact,
        dim,
        left_on=["ProductID"],
        right_on=["ProductID"],
        how='left',
    )

    return merged[
        (merged[date_col] >= merged["StartDate"]) &
        (merged[date_col] <= merged["EndDate"])
    ].reset_index(drop=True)

fact_sales_df = assign_interval(fact_sales_df, dim_pricecost_df[["HistoryKey", "ProductID", "StartDate", "EndDate"]], date_col="OrderDate")
fact_sales_df.drop(columns=['StartDate', 'EndDate'], inplace=True)
fact_sales_df['OrderQty'] = fact_sales_df['OrderQty'].fillna(0)

fact_sales_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 121246 entries, 0 to 121245
Data columns (total 7 columns):
 #   Column        Non-Null Count   Dtype         
---  ------        --------------   -----         
 0   ProductID     121246 non-null  int64         
 1   OrderQty      121246 non-null  float64       
 2   LineTotal     121246 non-null  float64       
 3   SalesOrderID  121246 non-null  int64         
 4   OrderDate     121246 non-null  datetime64[ns]
 5   CustomerID    121246 non-null  int64         
 6   HistoryKey    121246 non-null  int64         
dtypes: datetime64[ns](1), float64(2), int64(4)
memory usage: 6.5 MB


## 4. Load

### 4.1 Create new schema

In [18]:
schema_name = 'dwh'

with postgre_engine.connect() as connection:
    try:
        connection.execute(CreateSchema(schema_name, if_not_exists=True))
        connection.commit()
        print(f"Schema '{schema_name}' created successfully (or already exists).")
    except Exception as e:
        connection.rollback()
        print(f"Error creating schema '{schema_name}': {e}")

Schema 'dwh' created successfully (or already exists).


### 4.2 Write data into schema

In [19]:
# Product Dim
dim_product_df.to_sql('DimProduct', postgre_engine, schema=schema_name, if_exists='replace', index=False)
dim_subcategory_df.to_sql('DimSubcategory', postgre_engine, schema=schema_name, if_exists='replace', index=False)
dim_category_df.to_sql('DimCategory', postgre_engine, schema=schema_name, if_exists='replace', index=False)

# History Dim
dim_pricecost_df[['HistoryKey', 'StandardCost', 'ListPrice']].to_sql('DimPriceCostHistory', postgre_engine, schema=schema_name, if_exists='replace', index=False)

# Customer Dim
dim_customer_df.to_sql('DimCustomer', postgre_engine, schema=schema_name, if_exists='replace', index=False)
dim_territory_df.to_sql('DimTerritory', postgre_engine, schema=schema_name, if_exists='replace', index=False)

# Time Dim
dim_time_df.to_sql('DimDate', postgre_engine, schema=schema_name, if_exists='replace', index=False)

# Fact Table
fact_sales_df.to_sql('FactProductSales', postgre_engine, schema=schema_name, if_exists='replace', index=False)

print("ETL process completed and data loaded into PostgreSQL data warehouse.")

ETL process completed and data loaded into PostgreSQL data warehouse.
