In [None]:
from dotenv import load_dotenv
load_dotenv()

import kagglehub
import os
import numpy as np
import pandas as pd
import re
import shutil
from sqlalchemy import create_engine, types, text, Engine

from src import database_methods as dbm 


In [2]:
cache_path: str = kagglehub.dataset_download("gabrielramos87/an-online-shop-business")
os.listdir(cache_path)



['Sales Transaction v.4a.csv']

In [3]:
file_name: str = "Sales Transaction v.4a.csv"

target_dir: str = os.path.join("..", "data", "raw")
os.makedirs(target_dir, exist_ok=True)

source_file: str = os.path.join(cache_path, file_name)
destination_file: str = os.path.join(target_dir, file_name)

try:
    shutil.copy2(source_file, destination_file)
    print(f"Dataset successful copied into {target_dir} directory.")
except Exception as e:
    print(f"Error {e} - Unsuccessful dataset copying process.")

del target_dir, source_file, destination_file, cache_path


Dataset successful copied into ..\data\raw directory.


In [None]:

df: pd.DataFrame = pd.read_csv(
    "../data/raw/Sales Transaction v.4a.csv", 
    index_col=["TransactionNo"], 
    parse_dates=["Date"], 
    date_format="%m/%d/%Y", 
    dtype={
        "ProductNo": "category", 
        "ProductName": "category", 
        "Price": "float32", 
        "Quantity": "int32",  
        "CustomerNo": "Int32", 
        "Country": "category"
    }
)
df.index.name = "order_id"
df.rename(columns=lambda x: re.sub(r"([a-z])([A-Z])",r"\1_\2", x).lower().replace("_no", "_id"), inplace=True)
print(df.sample(5))

KeyboardInterrupt: 

In [None]:
print(f"NaN values quantity for each column:\n{df.isnull().sum()}")

In [None]:
df.dropna(how="any", inplace=True) # Due to records with missing value in "customer_no" are useless"
df["customer_id"] = df["customer_id"].astype("int32")

In [None]:
print(f"NaN values quantity for each column:\n{df.isnull().sum()}")

In [None]:
print(df.describe(include="all"))

In [None]:
mask: pd.Series = (df["quantity"] < 0) # Returns aren't taken into account
df = df[~mask]
print(f"Min value in 'quantity' column:\n{df["quantity"].min()}")

In [None]:
print(f"Orders quantity for each customer:\n{df["customer_id"].value_counts()}")

In [None]:
print(df.columns.to_list())

In [None]:
print(f"'product_id values max length:\n{df["product_id"].str.len().max()}")
print(f"'product_name' values max length:\n{df["product_name"].str.len().max()}")
print(f"'price' values max:\n{df["price"].max()}")
print(f"'quantity' values max:\n{df["quantity"].max()}")
print(f"'customer_id' values max:\n{df["customer_id"].max()}")
print(f"'country' values max length:\n{df["country"].str.len().max()}")


In [None]:
try:
    engine: Engine = dbm.get_db_engine()
    print("DB Engine successfuly created")
except Exception as e:
    print(f"DB Engine creation error: {e}")
    
try:
    df.to_sql(
        name="e_commerce_order_details", 
        con=engine, 
        if_exists="replace", 
        index=True, 
        index_label="order_id", 
        method=dbm.psql_insert_copy, 
        chunksize=5000, 
        dtype={
            "order_id": types.VARCHAR(30), 
            "date": types.DATE, 
            "product_id": types.VARCHAR(30), 
            "product_name": types.VARCHAR(50), 
            "price": types.NUMERIC(8, 2), 
            "quantity": types.INT, 
            "customer_id": types.VARCHAR(30), 
            "country": types.VARCHAR(30), 
        }
    )
    print("DataFrame successfully migrated to postreSQL DB")
except Exception as e:
    print(f"DataFrame migration process error {e}")

try:
    with engine.connect() as connection:
        connection.execute(text("CREATE INDEX IF NOT EXISTS idx_customer_lookup ON e_commerce_order_details (customer_id);"))
        connection.commit()
    print("'order_id' successfully set as INDEX")
except Exception as e:
    print(f"INDEX set error: {e}")


In [None]:
sql_db_len: pd.DataFrame = pd.read_sql(
    sql="SELECT COUNT(order_id) AS rows_quantity FROM e_commerce_order_details;", 
    con=engine
)
print(f"DataFrame length:\n{len(df)}")
print(f"SQL DB length:\n{sql_db_len.iloc[0]}")
del sql_db_len

In [None]:
sql_db_random_row: pd.DataFrame = pd.read_sql(
    sql="SELECT * FROM e_commerce_order_details ORDER BY RANDOM() LIMIT 1", 
    con=engine
)
print(f"SQL DB random row:\n{sql_db_random_row}")
del sql_db_random_row

In [None]:
df.to_parquet("../data/processed/e_commerce_order_details.parquet", engine="pyarrow")