In [None]:
import pandas as pd
import numpy as np
import os
import re
import clickhouse_connect

client = clickhouse_connect.get_client(
    host=os.getenv("CLICKHOUSE_HOST"),
    port=int(os.getenv("CLICKHOUSE_PORT")),
    username=os.getenv("CLICKHOUSE_USER"),
    password=os.getenv("CLICKHOUSE_PASSWORD"),
    database=os.getenv("CLICKHOUSE_DB")
)

# File paths
base_path = '/app/data'
files = {
    'customer_details': f'{base_path}/customer_details.csv',
    'product_details': f'{base_path}/product_details.csv',
    'sales': f'{base_path}/E-commerece sales data 2024.csv'
}

# Create table schemas
client.command("""
CREATE TABLE IF NOT EXISTS warehouse.customer_details
(
    customer_id             Int64,
    age                     Nullable(Int64),
    gender                  Nullable(String),
    item_purchased          Nullable(String),
    category                Nullable(String),
    purchase_amount_usd     Nullable(Int64),
    location                Nullable(String),
    size                    Nullable(String),
    color                   Nullable(String),
    season                  Nullable(String),
    review_rating           Nullable(Float64),
    subscription_status     Nullable(Bool),
    shipping_type           Nullable(String),
    discount_applied        Nullable(Bool),
    promo_code_used         Nullable(Bool),
    previous_purchases      Nullable(Int64),
    payment_method          Nullable(String),
    frequency_of_purchases  Nullable(String)
)
ENGINE = MergeTree
ORDER BY customer_id;
""")

# client.command("""
# DROP TABLE IF EXISTS sales
# """)

client.command("""
CREATE TABLE IF NOT EXISTS warehouse.product_details (
            unique_id               String,
            product_name            Nullable(String),
            brand_name              Nullable(String),
            asin                    Nullable(String),
            category                Nullable(String),
            upc_ean_code            Nullable(String),
            list_price              Nullable(Float64),
            selling_price           Nullable(String),
            quantity                Nullable(Float64),
            model_number            Nullable(String),
            about_product           Nullable(String),
            product_specification   Nullable(String),
            technical_details       Nullable(String),
            shipping_weight         Nullable(String),
            product_dimensions      Nullable(String),
            image                   Nullable(String),
            variants                Nullable(String),
            sku                     Nullable(String),
            product_url             Nullable(String),
            stock                   Nullable(Float64),
            product_details         Nullable(String),
            dimensions              Nullable(String),
            color                   Nullable(String),
            ingredients             Nullable(String),
            direction_to_use        Nullable(String),
            is_amazon_seller        Nullable(Bool),
            size_quantity_variant   Nullable(String),
            product_description     Nullable(String),
            category_level_1        Nullable(String),
            category_level_2        Nullable(String),
            category_level_3        Nullable(String),
            category_level_4        Nullable(String),
            category_level_5        Nullable(String),
            category_level_6        Nullable(String),
            category_level_7        Nullable(String),
            selling_price_lower     Nullable(Float64),
            selling_price_upper     Nullable(Float64)                       
        ) ENGINE = MergeTree ORDER BY unique_id;
""")

client.command("""
CREATE TABLE IF NOT EXISTS warehouse.sales
(
    user_id           Int64,
    product_id        Nullable(String),
    interaction_type  Nullable(String),
    time_stamp        Nullable(DateTime)
)
ENGINE = MergeTree
ORDER BY user_id;
""")

client.command("""TRUNCATE TABLE IF EXISTS warehouse.customer_details;""")
client.command("""TRUNCATE TABLE IF EXISTS warehouse.product_details;""")
client.command("""TRUNCATE TABLE IF EXISTS warehouse.sales;""")'
# print("All files loaded successfully.")



In [10]:
# File paths
base_path = '/data'
files = {
    'customer_details': f'{base_path}/customer_details.csv',
    'product_details': f'{base_path}/product_details.csv',
    'sales': f'{base_path}/E-commerece sales data 2024.csv'
}
for table, file_path in files.items():
    df = pd.read_csv(file_path)

In [87]:
# customer_details

df = pd.read_csv('/app/data/customer_details.csv')
df.columns = [c.strip().lower().replace(" ", "_").replace("(", "").replace(")", "") for c in df.columns]

df['subscription_status'] = df['subscription_status'].map({'Yes': True, 'No': False})
df['discount_applied'] = df['discount_applied'].map({'Yes': True, 'No': False})
df['promo_code_used'] = df['promo_code_used'].map({'Yes': True, 'No': False})

client.insert_df('customer_details', df)


<clickhouse_connect.driver.summary.QuerySummary at 0x7f0a0c086d00>

In [37]:
from clickhouse_connect import get_client

client = get_client(
    host='clickhouse',
    port=8123,
    username='default',
    password='mypassword',
    database='warehouse'
)

result = client.query("SELECT * FROM customer_details LIMIT 10")

# Convert to pandas DataFrame
df = result.result_rows
print(df)

[(1, 55, 'Male', 'Blouse', 'Clothing', 53, 'Kentucky', 'L', 'Gray', 'Winter', 3.1, 'Yes', 'Express', 'Yes', 'Yes', 14, 'Venmo', 'Fortnightly'), (2, 19, 'Male', 'Sweater', 'Clothing', 64, 'Maine', 'L', 'Maroon', 'Winter', 3.1, 'Yes', 'Express', 'Yes', 'Yes', 2, 'Cash', 'Fortnightly'), (3, 50, 'Male', 'Jeans', 'Clothing', 73, 'Massachusetts', 'S', 'Maroon', 'Spring', 3.1, 'Yes', 'Free Shipping', 'Yes', 'Yes', 23, 'Credit Card', 'Weekly'), (4, 21, 'Male', 'Sandals', 'Footwear', 90, 'Rhode Island', 'M', 'Maroon', 'Spring', 3.5, 'Yes', 'Next Day Air', 'Yes', 'Yes', 49, 'PayPal', 'Weekly'), (5, 45, 'Male', 'Blouse', 'Clothing', 49, 'Oregon', 'M', 'Turquoise', 'Spring', 2.7, 'Yes', 'Free Shipping', 'Yes', 'Yes', 31, 'PayPal', 'Annually'), (6, 46, 'Male', 'Sneakers', 'Footwear', 20, 'Wyoming', 'M', 'White', 'Summer', 2.9, 'Yes', 'Standard', 'Yes', 'Yes', 14, 'Venmo', 'Weekly'), (7, 63, 'Male', 'Shirt', 'Clothing', 85, 'Montana', 'M', 'Gray', 'Fall', 3.2, 'Yes', 'Free Shipping', 'Yes', 'Yes', 4

In [61]:
# product details

df = pd.read_csv('/app/data/product_details.csv')
df.columns = [c.strip().lower().replace(" ", "_").replace("(", "").replace(")", "") for c in df.columns]
df = df.rename(columns={"uniqe_id": "unique_id"})


df['is_amazon_seller'] = df['is_amazon_seller'].map({'Y': True, 'N': False})
# client.insert_df('product_details', df)

# cleaning dimension column

def extract_dimensions(spec):
    if not isinstance(spec, str):
        return None
    match = re.search(r'Product\s*Dimensions\s*:?\s*([\d.xX\s]+inches)', spec, re.IGNORECASE)
    if not match:
        # Try alternative pattern like: ProductDimensions:3.5x6.2x13inches
        match = re.search(r'ProductDimensions\s*:?\s*([\d.xX\s]+inches)', spec, re.IGNORECASE)
    return match.group(1).strip() if match else None

df['dimensions'] = df.apply(
    lambda row: extract_dimensions(row['product_specification']) if pd.isna(row['dimensions']) or row['dimensions'] == '' else row['dimensions'],
    axis=1
)

# cleaning asin column

def extract_asin(spec):
    if not isinstance(spec, str):
        return None
    match = re.search(r'ASIN\s*:?\s*([A-Z0-9]{10})', spec, re.IGNORECASE)
    return match.group(1) if match else None

df['asin'] = df.apply(
    lambda row: extract_asin(row['product_specification']) if pd.isna(row['asin']) or row['asin'] == '' else row['asin'],
    axis=1
)

# cleaning category column

# Split safely, handle NaN, and trim whitespace
category_split = df['category'].fillna('').str.split('|').apply(lambda x: [i.strip() for i in x if i])

# Expand into separate columns
category_df = category_split.apply(pd.Series)

# Rename the new category columns
category_df.columns = [f'category_level_{i+1}' for i in category_df.columns]

# Merge with original DataFrame
df = pd.concat([df, category_df], axis=1)

# cleaning selling price column

def parse_price(value):
    if pd.isna(value):
        return (np.nan, np.nan)
    
    # Remove dollar signs and spaces
    cleaned = re.sub(r'[\$ ]+', '', str(value))

    # Find all numbers with optional decimal part
    matches = re.findall(r'\d+(?:\.\d+)?', cleaned)

    if not matches:
        return (np.nan, np.nan)
    
    # Convert to float
    numbers = [float(m) for m in matches]
    
    if len(numbers) == 1:
        return (numbers[0], numbers[0])
    else:
        return (min(numbers), max(numbers))

# Apply to the column
df[['selling_price_lower', 'selling_price_upper']] = df['selling_price'].apply(parse_price).apply(pd.Series)

client.insert_df('product_details', df)


In [None]:
# sales

df = pd.read_csv('/app/data/E-commerece sales data 2024.csv')
df.columns = [c.strip().lower().replace(" ", "_").replace("(", "").replace(")", "") for c in df.columns]
df = df[df['user_id'].notna()]
df = df.loc[:, ['user_id','product_id','interaction_type','time_stamp']]
df['time_stamp'] = pd.to_datetime(df['time_stamp'], format='%d/%m/%Y %H:%M')

client.insert_df('sales', df)
