### Create tables in sql, split the data following the data model design, and save the split data to sql

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import zipfile
import gzip

In [2]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
import os
import sys
pth = '/Users/lky/Desktop/Data Engineering Final Project/' # Change to your local path of the "Data Engineering Final Project" folder
sys.path.append(pth)
ecommerce_behavior_path = pth + 'Data/' + 'ecommerce_behavior'

Create ecommerce_customer_behavior schema in sql before running the following codes

In [4]:
from sqlalchemy import create_engine
from datetime import datetime
import mysql.connector

# MySQL connection details
db_connection = create_engine(
    'mysql+mysqlconnector://root:20020206lky@localhost/ecommerce_customer_behavior'
)

# MySQL cursor-based connection for creating tables dynamically
connection = mysql.connector.connect(
    host="localhost",
    user="root",
    password="20020206lky",
    database="ecommerce_customer_behavior"
)

In [5]:
# Ecommerce Behavior files (very big file)
zf = zipfile.ZipFile(f'{ecommerce_behavior_path}/2019-Nov.csv.zip') 
ecommerce_behavior_df_2019_11 = pd.read_csv(zf.open('2019-Nov.csv'))
print("Ecommerce Behavior 2019-11:")
ecommerce_behavior_df_2019_11.head()

Ecommerce Behavior 2019-11:


Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-11-01 00:00:00 UTC,view,1003461,2053013555631882655,electronics.smartphone,xiaomi,489.07,520088904,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33
1,2019-11-01 00:00:00 UTC,view,5000088,2053013566100866035,appliances.sewing_machine,janome,293.65,530496790,8e5f4f83-366c-4f70-860e-ca7417414283
2,2019-11-01 00:00:01 UTC,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387
3,2019-11-01 00:00:01 UTC,view,3601530,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f
4,2019-11-01 00:00:01 UTC,view,1004775,2053013555631882655,electronics.smartphone,xiaomi,183.27,558856683,313628f1-68b8-460d-84f6-cec7a8796ef2


In [6]:
with gzip.open(f'{ecommerce_behavior_path}/2019-Dec.csv.gz', 'rt', encoding='utf-8') as f:
    ecommerce_behavior_df_2019_12 = pd.read_csv(f)
ecommerce_behavior_df_2019_12

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-12-01 00:00:00 UTC,view,1005105,2232732093077520756,construction.tools.light,apple,1302.48,556695836,ca5eefc5-11f9-450c-91ed-380285a0bc80
1,2019-12-01 00:00:00 UTC,view,22700068,2232732091643068746,,force,102.96,577702456,de33debe-c7bf-44e8-8a12-3bf8421f842a
2,2019-12-01 00:00:01 UTC,view,2402273,2232732100769874463,appliances.personal.massager,bosch,313.52,539453785,5ee185a7-0689-4a33-923d-ba0130929a76
3,2019-12-01 00:00:02 UTC,purchase,26400248,2053013553056579841,computers.peripherals.printer,,132.31,535135317,61792a26-672f-4e61-9832-7b63bb1714db
4,2019-12-01 00:00:02 UTC,view,20100164,2232732110089618156,apparel.trousers,nika,101.68,517987650,906c6ca8-ff5c-419a-bde9-967ba8e2233e
...,...,...,...,...,...,...,...,...,...
67542873,2019-12-31 23:59:58 UTC,view,1005118,2232732093077520756,construction.tools.light,apple,908.80,515771654,06ed2ab3-39f6-4fd4-a755-3a26c72eabc8
67542874,2019-12-31 23:59:58 UTC,view,1307229,2053013554658804075,electronics.audio.headphone,lenovo,1044.25,595160289,c3b5e44a-b97d-44ce-bdfc-d4f8ff669ac0
67542875,2019-12-31 23:59:59 UTC,view,9300104,2053013554524586339,kids.dolls,sony,411.83,526621231,7da1da5a-0d1e-4768-b481-97354e2ef28e
67542876,2019-12-31 23:59:59 UTC,view,4803759,2232732079706079299,sport.bicycle,xiaomi,17.99,574723072,2f967c6e-b6c4-4971-8599-d34f86570e29


In [30]:
def create_sql_tables(connection, month_label):
    cursor = connection.cursor()

    # Event Time Table
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS event_time_{month_label} (
            event_time_id INT(9) NOT NULL PRIMARY KEY,
            event_year YEAR(4) NOT NULL,
            event_month TINYINT(2) NOT NULL,
            event_day TINYINT(2) NOT NULL,
            event_hms CHAR(8) NOT NULL
        );
    """)

    # Customer Table
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS customer_{month_label} (
            user_session VARCHAR(50) NOT NULL PRIMARY KEY,
            user_id INT(9) NOT NULL
        );
    """)

    # Category Table
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS category_{month_label} (
            category_id BIGINT(20) NOT NULL PRIMARY KEY,
            primary_category VARCHAR(30) NOT NULL,
            secondary_category VARCHAR(30),
            tertiary_category VARCHAR(30)
        );
    """)

    # Product Table
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS product_{month_label} (
            product_id INT(6) NOT NULL,
            brand VARCHAR(50) NOT NULL,
            price DECIMAL(6,2) NOT NULL,
            event_time_id INT(9) NOT NULL,
            category_id BIGINT(20) NOT NULL,
            PRIMARY KEY (product_id, event_time_id),
            FOREIGN KEY (event_time_id) REFERENCES event_time_{month_label}(event_time_id),
            FOREIGN KEY (category_id) REFERENCES category_{month_label}(category_id)
        );
    """)

    # Event Table
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS event_{month_label} (
            event_id INT(9) NOT NULL PRIMARY KEY,
            event_type VARCHAR(10) NOT NULL,
            event_time_id INT(9) NOT NULL,
            user_session VARCHAR(50) NOT NULL,
            product_id INT(6) NOT NULL,
            last_update DATETIME NOT NULL,
            FOREIGN KEY (event_time_id) REFERENCES event_time_{month_label}(event_time_id),
            FOREIGN KEY (user_session) REFERENCES customer_{month_label}(user_session),
            FOREIGN KEY (product_id) REFERENCES product_{month_label}(product_id)
        );
    """)

    connection.commit()
    print(f"Tables for {month_label} created successfully!")

In [31]:
def normalize_and_save_to_sql(df, month_label):
    # Create tables dynamically for the given month
    create_sql_tables(connection, month_label)
    print(f"Processing data for {month_label}...")

    # Ensure the event_time column is in datetime format
    df['event_time'] = pd.to_datetime(df['event_time'], errors='coerce')  # Convert to datetime, set invalid entries to NaT
    
    # Count unique user_id for each user_session
    user_session_check = df.groupby('user_session')['user_id'].nunique()
    # Identify problematic sessions where there are multiple user_ids
    problematic_sessions = user_session_check[user_session_check > 1].index
    print(f"Problematic user_sessions: {len(problematic_sessions)}")
    
    # Filter out rows with problematic user_sessions
    df_cleaned = df[~df['user_session'].isin(problematic_sessions)]
    print(f"Removed {len(df) - len(df_cleaned)} rows with inconsistent user_session-user_id pairs.")
    
    # Remove rows with missing values in all columns
    df = df_cleaned.dropna()

    # 1. Save `event_time_{month_label}` table
    df['event_year'] = df['event_time'].dt.year
    df['event_month'] = df['event_time'].dt.month
    df['event_day'] = df['event_time'].dt.day
    df['event_hms'] = df['event_time'].dt.strftime('%H:%M:%S')
    event_time = df[['event_time', 'event_year', 'event_month', 'event_day', 'event_hms']].drop_duplicates()
    event_time['event_time_id'] = range(1, len(event_time) + 1)
    
    event_time_sql = event_time.drop(columns=['event_time'])
    event_time_sql.to_sql(f'event_time_{month_label}', con=db_connection, if_exists='append', index=False, chunksize=1000)
    print(f"Inserted {len(event_time_sql)} rows into event_time_{month_label}.")

    # Map `event_time_id` to the original DataFrame
    event_time_mapping = event_time.set_index(['event_time']).to_dict()['event_time_id']
    df['event_time_id'] = df['event_time'].map(event_time_mapping)

    # 2. Save `customer_{month_label}` table
    customer = df[['user_session', 'user_id']].drop_duplicates()
    customer.to_sql(f'customer_{month_label}', con=db_connection, if_exists='append', index=False, chunksize=1000)
    print(f"Inserted {len(customer)} rows into customer_{month_label}.")

    # 3. Save `category_{month_label}` table
    category = df[['category_id', 'category_code']].drop_duplicates()
    category[['primary_category', 'secondary_category', 'tertiary_category']] = (
        category['category_code'].str.split('.', expand=True).iloc[:, :3]  # Ensure only up to 3 columns
    )
    category = category.drop(columns=['category_code'])
    category.to_sql(f'category_{month_label}', con=db_connection, if_exists='append', index=False, chunksize=1000)
    print(f"Inserted {len(category)} rows into category_{month_label}.")

    # 4. Save `product_{month_label}` table
    product = df[['product_id', 'brand', 'price', 'event_time_id', 'category_id']].drop_duplicates(subset=['product_id', 'event_time_id'])
    product.to_sql(f'product_{month_label}', con=db_connection, if_exists='append', index=False, chunksize=1000)
    print(f"Inserted {len(product)} rows into product_{month_label}.")

    # 5. Save `event_{month_label}` table
    event = df[['event_type', 'event_time_id', 'user_session', 'product_id']].drop_duplicates()
    event['last_update'] = datetime.now().strftime('%Y-%m-%d')  # Set the current time as last_update
    event['event_id'] = range(1, len(event) + 1)  # Assign unique event_id
    event.to_sql(f'event_{month_label}', con=db_connection, if_exists='append', index=False, chunksize=1000)
    print(f"Inserted {len(event)} rows into event_{month_label}.")

    print(f"Processing for {month_label} completed successfully!")

In [32]:
normalize_and_save_to_sql(ecommerce_behavior_df_2019_11, '2019_11')

Tables for 2019_11 created successfully!
Processing data for 2019_11...
Problematic user_sessions: 591
Removed 1769 rows with inconsistent user_session-user_id pairs.
Inserted 2509411 rows into event_time_2019_11.
Inserted 9540290 rows into customer_2019_11.
Inserted 270 rows into category_2019_11.
Inserted 40988556 rows into product_2019_11.
Inserted 42017748 rows into event_2019_11.
Processing for 2019_11 completed successfully!


In [33]:
normalize_and_save_to_sql(ecommerce_behavior_df_2019_12, '2019_12')

Tables for 2019_12 created successfully!
Processing data for 2019_12...
Problematic user_sessions: 1472
Removed 4263 rows with inconsistent user_session-user_id pairs.
Inserted 2645093 rows into event_time_2019_12.
Inserted 13231718 rows into customer_2019_12.
Inserted 860 rows into category_2019_12.
Inserted 52493598 rows into product_2019_12.
Inserted 53508355 rows into event_2019_12.
Processing for 2019_12 completed successfully!
