In [1]:
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import SimpleStatement
import json
from datetime import datetime

In [2]:
def establish_cassandra_connection():
    """
    Establishes a secure connection to the Cassandra database using credentials and a secure connect bundle.
    """
    config = {'secure_connect_bundle': 'secure-connect-rishikadwipi.zip'}

    # Load secrets from JSON file
    with open("rishikadwipi-token.json", "r") as secret_file:
        credentials = json.load(secret_file)

    auth_provider = PlainTextAuthProvider(credentials["clientId"], credentials["secret"])
    cluster = Cluster(cloud=config, auth_provider=auth_provider)
    session = cluster.connect()
    return session, cluster

In [3]:
def upload_raw_data(session, keyspace, table_name, data_frame):
    """
    Creates a table in Cassandra and inserts the raw data from the provided DataFrame.
    """
    # Define table structure
    create_table_stmt = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.{table_name} (
        region TEXT,
        country TEXT,
        itemtype TEXT,
        saleschannel TEXT,
        orderpriority TEXT,
        orderdate TEXT,
        orderid BIGINT PRIMARY KEY,
        shipdate TEXT,
        unitssold INT,
        unitprice FLOAT,
        unitcost FLOAT,
        totalrevenue FLOAT,
        totalcost FLOAT,
        totalprofit FLOAT
    )
    """
    session.execute(create_table_stmt)

    # Prepare an insert statement
    insert_stmt = session.prepare(f"""
    INSERT INTO {keyspace}.{table_name} (
        region, country, itemtype, saleschannel, orderpriority,
        orderdate, orderid, shipdate, unitssold, unitprice,
        unitcost, totalrevenue, totalcost, totalprofit
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """)
    
    for _, row in data_frame.iterrows():
        session.execute(insert_stmt, (
            row['Region'], row['Country'], row['Item Type'], row['Sales Channel'],
            row['Order Priority'], row['Order Date'], row['Order ID'], row['Ship Date'],
            row['UnitsSold'], row['UnitPrice'], row['UnitCost'],
            row['TotalRevenue'], row['TotalCost'], row['TotalProfit']
        ))
    print("Raw data uploaded!!")

In [4]:
def clean_bronze_data(raw_data_frame):
    """
    Cleans raw data from the bronze stage and prepares it for the silver stage.
    """
    # Drop rows with missing critical fields
    important_cols = [
        "region", "country", "itemtype", "saleschannel", 
        "orderpriority", "orderdate", "orderid", "shipdate"
    ]
    cleaned_df = raw_data_frame.dropna(subset=important_cols)

    # Convert data types
    type_conversions = {
        "orderid": int,
        "unitssold": int,
        "unitprice": float,
        "unitcost": float,
        "totalrevenue": float,
        "totalcost": float,
        "totalprofit": float
    }
    for col, dtype in type_conversions.items():
        cleaned_df[col] = cleaned_df[col].astype(dtype)

    # Standardize date formats
    date_format = "%m/%d/%Y"
    cleaned_df["orderdate"] = pd.to_datetime(cleaned_df["orderdate"], format=date_format, errors="coerce")
    cleaned_df["shipdate"] = pd.to_datetime(cleaned_df["shipdate"], format=date_format, errors="coerce")

    # Drop rows with invalid dates
    cleaned_df = cleaned_df.dropna(subset=["orderdate", "shipdate"])

    # Lowercase for categorical fields
    categorical_fields = ["region", "country", "itemtype", "saleschannel", "orderpriority"]
    for field in categorical_fields:
        cleaned_df[field] = cleaned_df[field].str.lower()

    # Validate logical constraints (e.g., orderdate <= shipdate)
    cleaned_df = cleaned_df[cleaned_df["orderdate"] <= cleaned_df["shipdate"]]

    # Add a timestamp for data processing
    cleaned_df["processed_at"] = datetime.now()

    return cleaned_df

In [5]:
def save_silver_data(session, keyspace, table_name, silver_df):
    """
    Saves the cleaned data (silver stage) into Cassandra.
    """
    # Create the silver table
    create_stmt = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.{table_name} (
        orderid BIGINT PRIMARY KEY,
        country TEXT,
        itemtype TEXT,
        orderdate DATE,
        orderpriority TEXT,
        region TEXT,
        saleschannel TEXT,
        shipdate DATE,
        totalcost FLOAT,
        totalprofit FLOAT,
        totalrevenue FLOAT,
        unitcost FLOAT,
        unitprice FLOAT,
        unitssold INT,
        processed_at TIMESTAMP
    );
    """
    session.execute(create_stmt)

    # Insert cleaned data
    insert_stmt = session.prepare(f"""
    INSERT INTO {keyspace}.{table_name} (
        orderid, country, itemtype, orderdate, orderpriority, region, 
        saleschannel, shipdate, totalcost, totalprofit, totalrevenue, 
        unitcost, unitprice, unitssold, processed_at
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """)
    
    for _, row in silver_df.iterrows():
        session.execute(insert_stmt, (
            row["orderid"], row["country"], row["itemtype"], row["orderdate"].date(),
            row["orderpriority"], row["region"], row["saleschannel"], row["shipdate"].date(),
            row["totalcost"], row["totalprofit"], row["totalrevenue"],
            row["unitcost"], row["unitprice"], row["unitssold"], row["processed_at"]
        ))

    print("Silver data uploaded successfully!")

In [6]:
def create_gold_tables(session, keyspace, silver_table):
    """
    Creates three gold tables in Cassandra with aggregated data.
    """
    # Load data from the silver table
    query = f"SELECT * FROM {keyspace}.{silver_table}"
    rows = session.execute(query)
    silver_df = pd.DataFrame([row._asdict() for row in rows])

    # 1. Total Sales by Region
    total_sales_by_region = silver_df.groupby("region").agg(
        total_revenue=pd.NamedAgg(column="totalrevenue", aggfunc="sum"),
        total_cost=pd.NamedAgg(column="totalcost", aggfunc="sum"),
        total_profit=pd.NamedAgg(column="totalprofit", aggfunc="sum")
    ).reset_index()

    create_region_table = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.gold_sales_by_region (
        region TEXT PRIMARY KEY,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT
    )
    """
    session.execute(create_region_table)

    insert_region = session.prepare(f"""
    INSERT INTO {keyspace}.gold_sales_by_region (region, total_revenue, total_cost, total_profit)
    VALUES (?, ?, ?, ?)
    """)

    for _, row in total_sales_by_region.iterrows():
        session.execute(insert_region, (row["region"], row["total_revenue"], row["total_cost"], row["total_profit"]))

    print("Gold table: Total Sales by Region uploaded successfully!")

    # 2. Top-Selling Items by Item Type
    top_items_by_type = silver_df.groupby("itemtype").agg(
        total_units_sold=pd.NamedAgg(column="unitssold", aggfunc="sum")
    ).reset_index()

    create_item_table = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.gold_top_items_by_type (
        itemtype TEXT PRIMARY KEY,
        total_units_sold INT
    )
    """
    session.execute(create_item_table)

    insert_item = session.prepare(f"""
    INSERT INTO {keyspace}.gold_top_items_by_type (itemtype, total_units_sold)
    VALUES (?, ?)
    """)

    for _, row in top_items_by_type.iterrows():
        session.execute(insert_item, (row["itemtype"], row["total_units_sold"]))

    print("Gold table: Top-Selling Items by Item Type uploaded successfully!")

    # 3. Sales Performance by Country
    sales_performance_by_country = silver_df.groupby("country").agg(
        total_revenue=pd.NamedAgg(column="totalrevenue", aggfunc="sum"),
        total_profit=pd.NamedAgg(column="totalprofit", aggfunc="sum")
    ).reset_index()

    create_country_table = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.gold_sales_by_country (
        country TEXT PRIMARY KEY,
        total_revenue FLOAT,
        total_profit FLOAT
    )
    """
    session.execute(create_country_table)

    insert_country = session.prepare(f"""
    INSERT INTO {keyspace}.gold_sales_by_country (country, total_revenue, total_profit)
    VALUES (?, ?, ?)
    """)

    for _, row in sales_performance_by_country.iterrows():
        session.execute(insert_country, (row["country"], row["total_revenue"], row["total_profit"]))

    print("Gold table: Sales Performance by Country uploaded successfully!")

In [7]:
if __name__ == "__main__":
    # Establish connection
    session, cluster = establish_cassandra_connection()
    keyspace = 'cs'

    # Load raw data
    raw_data = pd.read_csv("https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv")
    bronze_table = 'bronze_sales'
    upload_raw_data(session, keyspace, bronze_table, raw_data)

    # Extract raw data from Cassandra
    query_stmt = f"SELECT * FROM {keyspace}.{bronze_table}"
    raw_rows = session.execute(query_stmt)
    bronze_df = pd.DataFrame([row._asdict() for row in raw_rows])

    # Clean data for silver stage
    silver_df = clean_bronze_data(bronze_df)

    # Load cleaned data to Cassandra
    silver_table = 'silver_sales'
    save_silver_data(session, keyspace, silver_table, silver_df)
     # Create gold tables
    create_gold_tables(session, keyspace, silver_table)

    # Shutdown connection
    cluster.shutdown()

Raw data uploaded!!
Silver data uploaded successfully!
Gold table: Total Sales by Region uploaded successfully!
Gold table: Top-Selling Items by Item Type uploaded successfully!
Gold table: Sales Performance by Country uploaded successfully!
