In [16]:
import pandas as pd
import sqlite3
from datetime import datetime
import os

In [17]:
# data ingestion read the parquet file using the pandas. As CSV file format is not efficient file format for storage. So using the parquet and attaching the csv for reference
def ingest_data(parquet_file):
    
    # Check if the file exists
    if not os.path.exists(csv_file):
        raise FileNotFoundError(f"The file {csv_file} does not exist.")
    
    df = pd.read_csv(csv_file)
    
    # Check if the DataFrame is empty
    if df.empty:
        raise ValueError(f"The file {csv_file} contains no data.")
        
    print("Printing the first 5 rows",df.head(5))
    return df

# data cleaning
def clean_data(df):
    # dropping the missing null values
    df.dropna(inplace=True)  
    
    # Coverting the timestamp format
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    
    return df

In [43]:
# data transformation
def transform_data(df):
    # Calculate interaction counts per user and product
    df['interaction_count_user'] = df.groupby('user_id')['interaction_id'].transform('count')
    df['interaction_count_user_product'] = df.groupby(['user_id', 'product_id'])['interaction_id'].transform('count')

    return df

In [45]:
# Function to load data into a SQLite database
def load_data(df, db_file):
    try:
        # connect to the SQLite database (or create it if it doesn't exist)
        conn = sqlite3.connect(db_file)
        cursor = conn.cursor()
        
        # create the interactions table if it doesn't exist
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS interactions (
                interaction_id INTEGER PRIMARY KEY,
                user_id INTEGER,
                product_id INTEGER,
                action TEXT,
                timestamp TIMESTAMP,
                interaction_count_user INTEGER,
                interaction_count_user_product INTEGER
            )
        ''')
        
        df.to_sql('interactions', conn, if_exists='replace', index=False)
        
        # commit the changes and close the connection
        conn.commit()
    except sqlite3.Error as e:
        print(f"An error occurred: {e}")
    finally:
        if conn:
            conn.close()

In [46]:
if __name__ == "__main__":
    # Step 1: Data ingestion
    csv_file = 'interaction_data.csv'
    df = ingest_data(csv_file)

    # Step 2: Data cleaning
    df = clean_data(df)

    # Step 3: Data Transformation
    df = transform_data(df)

    # Step 4: DataLoading
    db_file = 'iCustomerDB.db'  # SQLite database file
    load_data(df, db_file)

    print("ETL process completed.")


Printing the first 5 rows    interaction_id  user_id  product_id    action            timestamp
0           10001      101         201     click  2024-07-01 12:00:00
1           10002      102         202      view  2024-07-01 12:05:00
2           10003      101         201  purchase  2024-07-01 12:10:00
3           10004      103         203     click  2024-07-01 12:15:00
4           10005      102         202      view  2024-07-01 12:20:00
ETL process completed.
