# 1. Preparation

- Prepare Data Source
    - Run docker compose from source
      ```
      cd source
      ```
      ```
      docker-compose up -d
      ```
  
- Prepare Data Warehouse
    - Run docker compose from dwh
      ```
      cd dwh
      ```
      ```
      docker-compose up -d
      ```

- Create Connection to Data Source & Data Warehouse

In [1]:
import psycopg2
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

conn_src = psycopg2.connect(database="mini_order",
                              host="localhost",
                              user="postgres",
                              password="mypassword",
                              port="5433")
cur_src = conn_src.cursor()

conn_dwh = psycopg2.connect(database="dwh",
                              host="localhost",
                              user="postgres",
                              password="mypassword",
                              port="5434")
cur_dwh = conn_dwh.cursor()

- Define Schema & Relation for Staging and Final Area

In [2]:
def read_sql_file(file_path):
    try:
        with open(file_path, 'r') as file:
            sql_string = file.read()
        return sql_string
    except Exception as e:
        print(f"Error reading SQL file: {e}")
        return None

file_path = './helper/db_init/dwh-stg-prod.sql'
sql_string = read_sql_file(file_path)

In [3]:
# Define the query
query = f"""
{sql_string}
"""

# Execute the query
cur_dwh.execute(query)

# Commit the transaction
conn_dwh.commit()

# 2. Extract & Load

## 2.1. Extract

- Create function to store data from data source & data warehouse into Dataframe

In [4]:
def get_table_data(table_name, engine):
    try:
        query = f"SELECT * FROM {table_name};"
        df = pd.read_sql(query, engine)
        
        return df

    except Exception as e:
        print(f"Error: {e}")
        
        return pd.DataFrame()

- Extract from data sources & destination (dwh) to get updated data.
- Why we need extract data from data warehouse also?
    - Because we need to get the latest data from data warehouse.
    - It will compare with the latest data from data sources

- Extract from data sources

In [5]:
src_category = get_table_data('category', conn_src)
src_customer = get_table_data('customer', conn_src)
src_order_detail = get_table_data('order_detail', conn_src)
src_orders = get_table_data('orders', conn_src)
src_product = get_table_data('product', conn_src)
src_subcategory = get_table_data('subcategory', conn_src)

- Extract from data warehouse
- After data stored in dataframe, drop uuid columns. Because uuid doesnt exist in data sources.

In [6]:
stg_category = get_table_data('stg.category', conn_dwh).drop('uuid', axis=1)
stg_customer = get_table_data('stg.customer', conn_dwh).drop('uuid', axis=1)
stg_order_detail = get_table_data('stg.order_detail', conn_dwh).drop('uuid', axis=1)
stg_orders = get_table_data('stg.orders', conn_dwh).drop('uuid', axis=1)
stg_product = get_table_data('stg.product', conn_dwh).drop('uuid', axis=1)
stg_subcategory = get_table_data('stg.subcategory', conn_dwh).drop('uuid', axis=1)

- Create function to compare data between sources and data warehouse (staging). This function will get updated data.
- Updated data will be loaded into data warehouse (staging)

In [7]:
def compare(df1, df2):
    df = pd.concat([df1, df2])
    df = df.drop_duplicates(keep = False)
    
    return df

- Get the difference data

In [8]:
category = compare(src_category, stg_category)
customer = compare(src_customer, stg_customer)
order_detail = compare(src_order_detail, stg_order_detail)
orders = compare(src_orders, stg_orders)
product = compare(src_product, stg_product)
subcategory = compare(src_subcategory, stg_subcategory)

## 2.2. Load

- Load updated data to data warehouse

- `category` Table

In [9]:
for index, row in category.iterrows():
    # Extract values from the DataFrame row
    name = row['name']
    description = row['description']
    created_at = row['created_at']
    updated_at = row['updated_at']

    # Construct the SQL INSERT query
    insert_query = f"""
    INSERT INTO stg.category 
    (name, description, created_at, updated_at) 
    VALUES 
    ('{name}', '{description}', '{created_at}', '{updated_at}');
    """

    # Execute the INSERT query
    cur_dwh.execute(insert_query)

# Commit the transaction
conn_dwh.commit()

- `subcategory` Tables

In [10]:
for index, row in subcategory.iterrows():
    # Extract values from the DataFrame row
    name = row['name']
    category_id = row['category_id']
    description = row['description']
    created_at = row['created_at']
    updated_at = row['updated_at']
    
    
    # Construct the SQL INSERT query
    insert_query = f"""
    INSERT INTO stg.subcategory 
    (name, category_id, description, created_at, updated_at) 
    VALUES 
    ('{name}', '{category_id}', '{description}', '{created_at}', '{updated_at}');
    """

    # Execute the INSERT query
    cur_dwh.execute(insert_query)

# Commit the transaction
conn_dwh.commit()

- `customer` Tables

In [11]:
for index, row in customer.iterrows():
    # Extract values from the DataFrame row
    first_name = row['first_name']
    last_name = row['last_name']
    email = row['email']
    phone = row['phone']
    address = row['address']
    created_at = row['created_at']
    updated_at = row['updated_at']

    # Construct the SQL INSERT query
    insert_query = f"""
    INSERT INTO stg.customer 
    (first_name, last_name, email, phone, address, created_at, updated_at) 
    VALUES 
    ('{first_name}', '{last_name}', '{email}', '{phone}', '{address}', '{created_at}', '{updated_at}');
    """

    # Execute the INSERT query
    cur_dwh.execute(insert_query)

# Commit the transaction
conn_dwh.commit()

- `orders` Tables

In [12]:
for index, row in orders.iterrows():
    # Extract values from the DataFrame row
    order_id = row['order_id']
    customer_id = row['customer_id']
    order_date = row['order_date']
    status = row['status']
    created_at = row['created_at']
    updated_at = row['updated_at']

    # Construct the SQL INSERT query
    insert_query = f"""
    INSERT INTO stg.orders 
    (order_id, customer_id, order_date, status, created_at, updated_at) 
    VALUES 
    ('{order_id}', '{customer_id}', '{order_date}', '{status}', '{created_at}', '{updated_at}');
    """

    # Execute the INSERT query
    cur_dwh.execute(insert_query)

# Commit the transaction
conn_dwh.commit()

- `product` Tables

In [13]:
for index, row in product.iterrows():
    # Extract values from the DataFrame row
    product_id = row['product_id']
    name = row['name']
    subcategory_id = row['subcategory_id']
    price = row['price']
    stock = row['stock']
    created_at = row['created_at']
    updated_at = row['updated_at']

    # Construct the SQL INSERT query
    insert_query = f"""
    INSERT INTO stg.product 
    (product_id, name, subcategory_id, price, stock, created_at, updated_at) 
    VALUES 
    ('{product_id}', '{name}', {subcategory_id}, {price}, {stock}, '{created_at}', '{updated_at}');
    """

    # Execute the INSERT query
    cur_dwh.execute(insert_query)

# Commit the transaction
conn_dwh.commit()

- `order_detail` Tables

In [14]:
for index, row in order_detail.iterrows():
    # Extract values from the DataFrame row
    order_id = row['order_id']
    product_id = row['product_id']
    quantity = row['quantity']
    price = row['price']
    created_at = row['created_at']
    updated_at = row['updated_at']

    # Construct the SQL INSERT query
    insert_query = f"""
    INSERT INTO stg.order_detail 
    (order_id, product_id, quantity, price, created_at, updated_at) 
    VALUES 
    ('{order_id}', '{product_id}', '{quantity}', '{price}', '{created_at}', '{updated_at}');
    """

    # Execute the INSERT query
    cur_dwh.execute(insert_query)

# Commit the transaction
conn_dwh.commit()

# 3. Transform

## 3.1. dim_customer

In [15]:
# Construct the SQL INSERT query
insert_query = f"""
INSERT INTO prod.dim_customer (
    customer_id,
	customer_nk,
    first_name,
    last_name,
    email,
    phone,
    address,
    created_at,
    updated_at
)

SELECT
    c.uuid AS customer_id,
	c.customer_id AS customer_nk,
    c.first_name,
    c.last_name,
    c.email,
    c.phone,
    c.address,
    c.created_at,
    c.updated_at
FROM
    stg.customer c
    
WHERE NOT EXISTS (
    SELECT 1
    FROM prod.dim_customer t
    WHERE t.first_name = c.first_name
    AND t.last_name = c.last_name 
    AND t.email = c.email
    AND t.phone = c.phone 
    AND t.address = c.address
    AND t.created_at = c.created_at
    AND t.updated_at = c.updated_at
);
"""

# Execute the INSERT query
cur_dwh.execute(insert_query)

# Commit the transaction
conn_dwh.commit()

## 3.2. dim_product

In [16]:
# Construct the SQL INSERT query
insert_query = f"""
INSERT INTO prod.dim_product (
    product_id,
	product_nk,
    "name",
    price,
    stock,
    category_name,
    category_desc,
    subcategory_name,
    subcategory_desc,
    created_at,
    updated_at
)

SELECT
    p.uuid AS product_id,
	p.product_id AS product_nk,
    p."name",
    p.price,
    p.stock,
    c."name" AS category_name,
    c.description AS category_desc,
    s."name" AS subcategory_name,
    s.description AS subcategory_desc,
    p.created_at,
    p.updated_at
FROM
    stg.product p
    
INNER JOIN
    stg.subcategory s ON p.subcategory_id = s.subcategory_id
INNER JOIN
    stg.category c ON s.category_id = c.category_id
    
WHERE NOT EXISTS (
    SELECT 1
    FROM prod.dim_product t
    WHERE t."name" = p."name"
    AND t.price = p.price
    AND t.stock = p.stock
    AND t.category_name = c."name"
    AND t.category_desc = c.description
    AND t.subcategory_name = s."name"
    AND t.subcategory_desc = s.description
    AND t.created_at = p.created_at
    AND t.updated_at = p.updated_at
);
"""

# Execute the INSERT query
cur_dwh.execute(insert_query)

# Commit the transaction
conn_dwh.commit()

## 3.3. fact_order

In [17]:
# Construct the SQL INSERT query
insert_query = f"""
INSERT INTO prod.fact_order (
    order_id,
    product_id,
    customer_id,
    order_date,
    quantity,
    status,
    created_at,
    updated_at
)

SELECT
	od.uuid as order_id,
	dp.product_id,
	dc.customer_id,
	dd.date_id as order_date,
	od.quantity,
	o.status,
	o.created_at,
	o.updated_at
FROM
    stg.order_detail od
    
INNER join 
    stg.orders o ON od.order_id = o.order_id
INNER join 
    prod.dim_customer dc ON o.customer_id = dc.customer_nk
INNER join 
	prod.dim_product dp on od.product_id = dp.product_nk 
INNER join 
	prod.dim_date dd on o.order_date = dd.date_actual
    
WHERE NOT EXISTS (
    SELECT 1
    FROM prod.fact_order t
    WHERE t."order_id" = od.uuid
    AND t.product_id = dp.product_id
    AND t.customer_id = dc.customer_id
    AND t.order_date = dd.date_id
    AND t.quantity = od.quantity
    AND t.status = o.status
    AND t.created_at = o.created_at
    AND t.updated_at = o.updated_at
);
"""

# Execute the INSERT query
cur_dwh.execute(insert_query)

# Commit the transaction
conn_dwh.commit()