In [1]:
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import SimpleStatement
import json
from datetime import datetime

In [2]:
# Decorator for Cassandra Table Operations
def cassandra_table_operation(func):
    def wrapper(session, keyspace, table_name, *args, **kwargs):
        # Create table if necessary
        func(session, keyspace, table_name, *args, **kwargs)
        print(f"Operation {func.__name__} completed for table: {table_name}")
    return wrapper

In [3]:
# Function to establish connection with Cassandra
def cassandra_connection():
    config = {
        'secure_connect_bundle': 'secure-connect-aturics.zip'
    }
    with open("Aturics-token.json") as file:
        creds = json.load(file)

    auth_provider = PlainTextAuthProvider(creds["clientId"], creds["secret"])
    cluster = Cluster(cloud=config, auth_provider=auth_provider)
    session = cluster.connect()
    return session, cluster

In [4]:
# Insert data into the bronze table
@cassandra_table_operation
def insert_raw_data(session, keyspace_name, table_name, dataframe):
    session.execute(f"""
    CREATE TABLE IF NOT EXISTS {keyspace_name}.{table_name} (
        region TEXT,
        country TEXT,
        item_category TEXT,
        sales_medium TEXT,
        priority_level TEXT,
        order_date TEXT,
        order_id BIGINT PRIMARY KEY,
        shipping_date TEXT,
        items_sold INT,
        price_per_unit FLOAT,
        cost_per_unit FLOAT,
        revenue_total FLOAT,
        cost_total FLOAT,
        profit_total FLOAT
    )
    """)

    prepared_query = session.prepare(f"""
    INSERT INTO {keyspace_name}.{table_name} (
        region, country, item_category, sales_medium, priority_level,
        order_date, order_id, shipping_date, items_sold, price_per_unit,
        cost_per_unit, revenue_total, cost_total, profit_total
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """)

    for _, record in dataframe.iterrows():
        session.execute(prepared_query, (
            record['Region'], record['Country'], record['Item Type'], record['Sales Channel'],
            record['Order Priority'], record['Order Date'], record['Order ID'], record['Ship Date'],
            record['UnitsSold'], record['UnitPrice'], record['UnitCost'],
            record['TotalRevenue'], record['TotalCost'], record['TotalProfit']
        ))


In [5]:
# Extract data from Cassandra
def extract_data(session, keyspace, table_name):
    query = f"SELECT * FROM {keyspace}.{table_name}"
    rows = session.execute(query)
    return pd.DataFrame([row._asdict() for row in rows])

In [6]:
# Transformation to Silver
def transform_to_silver(dataframe):
    """
    Cleans and preprocesses bronze data into silver data.
    """
    # Handle missing values
    dataframe = dataframe.dropna()

    # Convert data types
    dataframe["order_date"] = pd.to_datetime(dataframe["order_date"], errors="coerce")
    dataframe["shipping_date"] = pd.to_datetime(dataframe["shipping_date"], errors="coerce")
    dataframe["items_sold"] = dataframe["items_sold"].astype(int)
    dataframe["price_per_unit"] = dataframe["price_per_unit"].astype(float)
    dataframe["cost_per_unit"] = dataframe["cost_per_unit"].astype(float)
    dataframe["revenue_total"] = dataframe["revenue_total"].astype(float)
    dataframe["cost_total"] = dataframe["cost_total"].astype(float)
    dataframe["profit_total"] = dataframe["profit_total"].astype(float)

    # Drop invalid dates
    dataframe = dataframe.dropna(subset=["order_date", "shipping_date"])

    # Standardize text fields
    dataframe["region"] = dataframe["region"].str.lower()
    dataframe["country"] = dataframe["country"].str.lower()
    dataframe["item_category"] = dataframe["item_category"].str.lower()
    dataframe["sales_medium"] = dataframe["sales_medium"].str.lower()
    dataframe["priority_level"] = dataframe["priority_level"].str.lower()

    return dataframe

In [7]:
@cassandra_table_operation
def create_and_populate_silver_table(session, keyspace, table_name, dataframe):
    """
    Creates and populates the silver table.
    """
    session.execute(f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.{table_name} (
        region TEXT,
        country TEXT,
        item_category TEXT,
        sales_medium TEXT,
        priority_level TEXT,
        order_date TIMESTAMP,
        order_id BIGINT PRIMARY KEY,
        shipping_date TIMESTAMP,
        items_sold INT,
        price_per_unit FLOAT,
        cost_per_unit FLOAT,
        revenue_total FLOAT,
        cost_total FLOAT,
        profit_total FLOAT
    )
    """)

    prepared_query = session.prepare(f"""
    INSERT INTO {keyspace}.{table_name} (
        region, country, item_category, sales_medium, priority_level,
        order_date, order_id, shipping_date, items_sold, price_per_unit,
        cost_per_unit, revenue_total, cost_total, profit_total
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """)

    for _, record in dataframe.iterrows():
        session.execute(prepared_query, (
            record['region'], record['country'], record['item_category'], record['sales_medium'],
            record['priority_level'], record['order_date'], record['order_id'], record['shipping_date'],
            record['items_sold'], record['price_per_unit'], record['cost_per_unit'],
            record['revenue_total'], record['cost_total'], record['profit_total']
        ))


In [8]:
# Transformation Decorators
def transform_to_gold(func):
    def wrapper(dataframe, *args, **kwargs):
        transformed_df = func(dataframe, *args, **kwargs)
        print(f"Transformation {func.__name__} completed!")
        return transformed_df
    return wrapper

In [9]:
@transform_to_gold
def profit_analysis_by_region(dataframe):
    """
    Analyzes total profit by region.
    """
    profit_by_region = dataframe.groupby("region").agg(
        total_profit=pd.NamedAgg(column="profit_total", aggfunc="sum")
    ).reset_index()
    return profit_by_region

In [10]:
@transform_to_gold
def shipping_duration_analysis(dataframe):
    """
    Analyzes average shipping duration by country.
    """
    dataframe["order_date"] = pd.to_datetime(dataframe["order_date"])
    dataframe["shipping_date"] = pd.to_datetime(dataframe["shipping_date"])
    dataframe["shipping_duration"] = (dataframe["shipping_date"] - dataframe["order_date"]).dt.days

    shipping_duration = dataframe.groupby("country").agg(
        avg_shipping_duration=pd.NamedAgg(column="shipping_duration", aggfunc="mean")
    ).reset_index()
    return shipping_duration

In [11]:
@transform_to_gold
def monthly_revenue_trend(dataframe):
    """
    Analyzes monthly revenue trends by region.
    """
    dataframe["order_date"] = pd.to_datetime(dataframe["order_date"])
    dataframe["year_month"] = dataframe["order_date"].dt.to_period("M").astype(str)

    revenue_trend = dataframe.groupby(["year_month", "region"]).agg(
        total_revenue=pd.NamedAgg(column="revenue_total", aggfunc="sum")
    ).reset_index()
    return revenue_trend

In [12]:
# Gold Table Creation
@cassandra_table_operation
def create_gold_table(session, keyspace, table_name, dataframe, schema):
    session.execute(f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.{table_name} ({schema})
    """)

    prepared_query = session.prepare(f"""
    INSERT INTO {keyspace}.{table_name} ({', '.join(dataframe.columns)}) 
    VALUES ({', '.join(['?' for _ in dataframe.columns])})
    """)

    for _, record in dataframe.iterrows():
        session.execute(prepared_query, tuple(record))

In [13]:
# Main Function
def main():
    # Establish connection
    session, cluster = cassandra_connection()
    keyspace = 'j01'
    bronze_table = "bronzedata"
    silver_table = "silverdata"

    # Load raw data from the CSV file
    df = pd.read_csv("https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv")

    # Insert raw data into the bronze table
    insert_raw_data(session, keyspace, bronze_table, df)

    # Extract data from the bronze table
    bronze_df = extract_data(session, keyspace, bronze_table)
    
    # Transform data to Silver level
    silver_df = transform_to_silver(bronze_df)

    # Create and populate the Silver table
    create_and_populate_silver_table(session, keyspace, silver_table, silver_df)


    # Gold Table: Profit Analysis by Region
    profit_df = profit_analysis_by_region(bronze_df)
    create_gold_table(
        session, keyspace, "profit_by_region", profit_df,
        "region TEXT PRIMARY KEY, total_profit FLOAT"
    )

    # Gold Table: Shipping Duration Analysis
    shipping_df = shipping_duration_analysis(bronze_df)
    create_gold_table(
        session, keyspace, "shipping_duration_analysis", shipping_df,
        "country TEXT PRIMARY KEY, avg_shipping_duration FLOAT"
    )

    # Gold Table: Monthly Revenue Trend
    revenue_trend_df = monthly_revenue_trend(bronze_df)
    create_gold_table(
        session, keyspace, "monthly_revenue_trend", revenue_trend_df,
        "year_month TEXT, region TEXT, total_revenue FLOAT, PRIMARY KEY (year_month, region)"
    )


if __name__ == "__main__":
    main()

Operation insert_raw_data completed for table: bronzedata
Operation create_and_populate_silver_table completed for table: silverdata
Transformation profit_analysis_by_region completed!
Operation create_gold_table completed for table: profit_by_region
Transformation shipping_duration_analysis completed!
Operation create_gold_table completed for table: shipping_duration_analysis
Transformation monthly_revenue_trend completed!
Operation create_gold_table completed for table: monthly_revenue_trend
