In [65]:
import pandas as pd
from sqlalchemy import create_engine
import mysql.connector
from mysql.connector import Error

In [66]:
# 1. Extract: Read the data from the CSV file
def extract(file_path):
    # Load data from CSV file into pandas DataFrame
    if isinstance(file_path, str):
        df = pd.read_csv(file_path)
    else :
        return file_path
    return df

# 2. Transform: Clean and transform the data (you can add more transformation logic as needed)
def transform(df):
    if 'order_id' in df.columns :
        df.drop('order_id',axis= 1,inplace = True)
    # Example transformation: Convert 'is_active' to boolean if it's not
    df['is_active'] = 1
    
    # You can add more transformations here as needed
    return df

def load(df, connection_params, table_name):
    try:
        # Establish a connection using the connection parameters
        conn = mysql.connector.connect(**connection_params)
        cursor = conn.cursor()
        # Iterate over each row of the DataFrame
        for _, row in df.iterrows():
            # Filter out the columns with null (None) values in the row
            non_null_row = row.dropna()  # This drops columns with null values in the current row
            
            if non_null_row.empty:
                print(f"Skipping row with no non-null data: {row}")
                continue

            # Dynamically generate the columns and placeholders for the current row
            columns = ', '.join(non_null_row.index)  # Get the non-null column names
            placeholders = ', '.join(['%s'] * len(non_null_row))  # Create placeholders for non-null values
            values = tuple(non_null_row.values)  # Get the non-null values as a tuple

            # Construct the INSERT query for the current row
            insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
            # Execute the insert query for the current row
            cursor.execute(insert_query, values)
        
        # Commit the transaction after inserting all rows
        conn.commit()
        print(f"Data loaded successfully into the table: {table_name}")
    except Error as e:
        print(f"Error: {e}")
    finally:
        if conn.is_connected():
            cursor.close()
            conn.close()

# Main dim_dim_dim_dim_etl Function
def dim_etl(file_path, connection_params, table_name):
    # Extract
    df = extract(file_path)
    
    # Transform
    df_transformed = transform(df)
    
    # Load
    load(df_transformed, connection_params, table_name)

In [67]:
# Connection parameters for the database
connection_params = {
    'host': 'localhost',
    'database': 'e-commerce-dwh',
    'user': 'root',
    'password': '0000'
}

## Data Ingestion

### User Data Ingestion

In [None]:
file_path = './data/user_dataset.csv'

# # Name of the table you want to load data into
table_name = 'dim_user'

# # Run the ETL process
dim_etl(file_path, connection_params, table_name)

### Product Data Ingestion

In [None]:
file_path = './data/products_dataset.csv'

# # Name of the table you want to load data into
table_name = 'dim_product'

# # Run the dim_etl process
dim_etl(file_path, connection_params, table_name)

Data loaded successfully into the table: dim_product


### Seller Data Ingestion

In [None]:
file_path = './data/seller_dataset.csv'

# # Name of the table you want to load data into
table_name = 'dim_seller'

# # Run the ETL process
dim_etl(file_path, connection_params, table_name)

Data loaded successfully into the table: dim_seller


### Payment Data Ingestion

In [13]:
file_path = './data/payment_dataset.csv'

# # Name of the table you want to load data into
table_name = 'dim_payment'

# # Run the ETL process
dim_etl(file_path, connection_params, table_name)

Data loaded successfully into the table: dim_payment


### FeedBack Data Ingestion

In [15]:
file_path = './data/feedback_dataset.csv'

# # Name of the table you want to load data into
table_name = 'dim_feedback'

# # Run the ETL process
dim_etl(file_path, connection_params, table_name)

Data loaded successfully into the table: dim_feedback


### Date Data Ingestion

In [241]:
def get_unique_dates(df, date_columns):
    """
    This function accepts a DataFrame and a list of date column names, 
    and returns a DataFrame with unique date values from those columns,
    with seconds set to 00.
    
    Parameters:
    - df: pandas DataFrame containing the data
    - date_columns: list of column names that contain date values
    
    Returns:
    - unique_dates_df: pandas DataFrame containing unique date values from the specified columns
    """
    # Create an empty list to store unique dates
    unique_dates = pd.Series(dtype='datetime64[ns]')

    # Iterate over the date columns and extract unique values
    for column in date_columns:
        column_dates = pd.to_datetime(df[column], errors='coerce').apply(lambda x: x.replace(second=0, microsecond=0)).dropna().unique()
        # Combine with existing unique dates
        unique_dates = pd.concat([unique_dates, pd.Series(column_dates)], ignore_index=True)

    # Remove duplicates and reset the index
    unique_dates = unique_dates.drop_duplicates().reset_index(drop=True)

    # Convert the unique dates to a DataFrame
    unique_dates_df = pd.DataFrame(unique_dates, columns=['full_timestamp'])
    
    return unique_dates_df


def add_date_attributes(unique_dates):
    """
    Adds date-related attributes to the unique dates.
    
    Parameters:
    - unique_dates: pandas DataFrame containing unique date values
    
    Returns:
    - date_dimension_df: pandas DataFrame containing the full set of date attributes
    """
    unique_dates['full_timestamp'] = pd.to_datetime(unique_dates['full_timestamp'])
    # Creating a dictionary to store date attributes
    date_attributes = {
        'full_timestamp': unique_dates['full_timestamp'],
        'year': unique_dates['full_timestamp'].dt.year,
        'quarter': unique_dates['full_timestamp'].dt.quarter,
        'month': unique_dates['full_timestamp'].dt.month,
        'month_name': unique_dates['full_timestamp'].dt.strftime('%B'),
        'day': unique_dates['full_timestamp'].dt.day,
        'day_of_week': unique_dates['full_timestamp'].dt.weekday + 1,  # Monday=1, Sunday=7
        'day_of_week_name': unique_dates['full_timestamp'].dt.strftime('%A'),
        'hour': unique_dates['full_timestamp'].dt.hour,
        'minute': unique_dates['full_timestamp'].dt.minute,
        'week_of_year': unique_dates['full_timestamp'].dt.isocalendar().week
    }
    
    # Convert the dictionary to a DataFrame
    date_dimension_df = pd.DataFrame(date_attributes)
    
    return date_dimension_df

In [242]:
df1 = pd.read_csv('./data/order_dataset.csv')
df2 = pd.read_csv('./data/order_item_dataset.csv')
df = pd.merge(df1, df2, on='order_id', how='inner')
unique_dates_df = get_unique_dates(df, ['order_date','order_approved_date','pickup_date','delivered_date','pickup_limit_date'])
# date_df = add_date_attributes(unique_dates_df)
# dim_etl(date_df, connection_params, 'dim_date')

(199926,)
(275575,)


## Fact Data Ingestion

In [270]:
df1 = pd.read_csv('./data/order_dataset.csv')
df2 = pd.read_csv('./data/order_item_dataset.csv')
df = pd.merge(df1, df2, on='order_id', how='inner')
df.head(1)

Unnamed: 0,order_id,user_name,order_status,order_date,order_approved_date,pickup_date,delivered_date,estimated_time_delivery,order_item_id,product_id,seller_id,pickup_limit_date,price,shipping_cost
0,e481f51cbdc54678b7cc49136f2d6af7,7c396fd4830fd04220f754e42b4e5bff,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,1,87285b34884572647811a353c7ac498a,3504c0cb71d7fa48d967e0e4c94d59d9,2017-10-06 11:07:15,29990.0,8720.0


In [272]:
def map_user_key(order_df,date_columns, connection_params):
    """
    Map user_name in order_df to user_key from dim_user and update order_df.
    
    Parameters:
    - order_df: pandas DataFrame containing the order data with user_name.
    - connection_params: dictionary with MySQL connection parameters.
    
    Returns:
    - Updated order_df with user_key instead of user_name.
    """
    try:
        # Connect to the database
        conn = mysql.connector.connect(**connection_params)
        cursor = conn.cursor()

        # Fetch user_key and user_id from dim_user
        query = "SELECT user_id, user_name FROM dim_user"
        cursor.execute(query)
        user_mapping = {user_id: user_key for user_key, user_id in cursor.fetchall()}
        # Map user_name (order_df) to user_key (dim_user) using user_id
        order_df['user_id'] = order_df['user_name'].map(user_mapping).apply(lambda x: pd.NA if pd.isna(x) else int(x))  # Use -1 for unknown users

        # Fetch seller_key and seller_id from dim_seller
        query = "SELECT seller_id, seller_key FROM dim_seller"
        cursor.execute(query)
        seller_mapping = {seller_key: seller_id for seller_key, seller_id in cursor.fetchall()}
        order_df['seller_id'] = order_df['seller_id'].map(seller_mapping).apply(lambda x: pd.NA if pd.isna(x) else int(x))

        # Fetch product_key and user_id from dim_user
        query = "SELECT product_id, product_key FROM dim_product"
        cursor.execute(query)
        product_mapping = {product_key: product_id for product_key, product_id in cursor.fetchall()}
        order_df['product_id'] = order_df['product_id'].map(product_mapping).apply(lambda x: pd.NA if pd.isna(x) else int(x))

        # Fetch id and fulldate from dim_date
        query = "SELECT date_id, full_timestamp FROM dim_date"
        cursor.execute(query)
        date_mapping = {full_timestamp: date_id for date_id,full_timestamp in cursor.fetchall()}

        for column in date_columns:
            order_df[column] = pd.to_datetime(df[column]).apply(lambda x: x.replace(second=0, microsecond=0))
            order_df[f'{column}_id'] = order_df[column].map(date_mapping).apply(lambda x: pd.NA if pd.isna(x) else int(x))
        
        

        order_df.drop(columns=['user_name']+date_columns, inplace=True)  # Drop user_name column
        print("dimensions mapped successfully.")
        return order_df

    except Error as e:
        print(f"Error: {e}")
    finally:
        if conn.is_connected():
            cursor.close()
            conn.close()

In [273]:
date_columns = ['order_date', 'order_approved_date', 'pickup_date', 'delivered_date','pickup_limit_date']
df = map_user_key(df,date_columns, connection_params)

dimensions mapped successfully.


In [275]:
feedback_df = pd.read_csv('./data/feedback_dataset.csv')
payment_df = pd.read_csv('./data/payment_dataset.csv')

def add_feedback_and_payment(feedback_df,payment_df,df):

    """
    Adds feedback and payment information to the main DataFrame.
    This function takes three DataFrames: feedback_df, payment_df, and df. It adds two new columns to df:
    'feedback_id' and 'payment_id', which are mapped from the feedback_df and payment_df respectively based on 'order_id'.
    Parameters:
    feedback_df (pd.DataFrame): DataFrame containing feedback information with 'order_id'.
    payment_df (pd.DataFrame): DataFrame containing payment information with 'order_id'.
    df (pd.DataFrame): Main DataFrame to which feedback and payment information will be added.
    Returns:
    None: The function modifies the input DataFrame df in place.
    """

    feedback_df['feedback_key'] = feedback_df.index + 1
    feedback_mapper = {row['order_id']: row['feedback_key'] for _, row in feedback_df.iterrows()}
    df['feedback_id'] = df['order_id'].map(feedback_mapper).apply(lambda x: pd.NA if pd.isna(x) else int(x))
    payment_df['payment_key'] = payment_df.index + 1
    payment_mapper = {row['order_id']: row['payment_key'] for _, row in payment_df.iterrows()} 
    df['payment_id'] = df['order_id'].map(payment_mapper).apply(lambda x: pd.NA if pd.isna(x) else int(x))

add_feedback_and_payment(feedback_df,payment_df,df)

In [290]:
def transform_fact(df):
    duplicate_order_ids = df[df.duplicated(subset=['order_id'], keep=False)]
    print("Duplicate order_ids:")
    duplicate_order_ids.head()
    # Drop duplicates if all columns match except order_item_id
    df_deduped = df.drop_duplicates(subset=[col for col in df.columns if col != 'order_item_id'])

    # Show shape before and after dropping duplicates
    print("Shape before dropping duplicates:", df.shape)
    print("Shape after dropping duplicates:", df_deduped.shape)

In [291]:
transform_fact(df)
load(df, connection_params, 'fact_order')

Duplicate order_ids:
Shape before dropping duplicates: (112650, 16)
Shape after dropping duplicates: (102425, 16)
Data loaded successfully into the table: fact_order
