In [11]:
# Install necessary library
!pip install cassandra-driver

# Import required libraries
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import pandas as pd
import uuid
import json

# Load secure connect bundle for Cassandra
cloud_config = {
    'secure_connect_bundle': 'secure-connect-big-data-cassandra.zip'  # Replace with your secure connect bundle
}
with open("grandh23@students.rowan.edu-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

# Verifying connection
if session:
    print("✅ Successfully connected to Cassandra!")
else:
    print("❌ An error occurred while connecting.")

# Setting keyspace
keyspace_name = 'cass_keyspace'
session.set_keyspace(keyspace_name)
print(f"✅ Connected to keyspace: {session.keyspace}")

# Checking for existing tables
rows = session.execute(f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{keyspace_name}'")
print("🔍 Checking existing tables in the keyspace:")
for row in rows:
    print(f"- {row.table_name}")

# Dropping existing tables
rows = session.execute(f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{keyspace_name}'")
for row in rows:
    table_name = row.table_name
    try:
        session.execute(f"DROP TABLE IF EXISTS {table_name};")
        print(f"🗑️ Table '{table_name}' dropped.")
    except Exception as e:
        print(f"⚠️ Error dropping table '{table_name}': {e}")

# Step 1: Create Bronze Table
print("\n🟤 Creating the Bronze table, which stores raw data...")
session.execute("""
CREATE TABLE IF NOT EXISTS sales_bronze (
    region TEXT,
    country TEXT,
    item_type TEXT,
    sales_channel TEXT,
    order_priority TEXT,
    order_date TEXT,
    order_id UUID PRIMARY KEY,
    ship_date TEXT,
    units_sold INT,
    unit_price FLOAT,
    unit_cost FLOAT,
    total_revenue FLOAT,
    total_cost FLOAT,
    total_profit FLOAT
);
""")
print("🟤 Bronze table 'sales_bronze' created successfully!")

# Step 2: Load data into Bronze Table
print("\n🟤 Inserting raw data into the Bronze table...")
df = pd.read_csv('/content/sales_100.csv')  # Replace with the correct path to your CSV file
for _, row in df.iterrows():
    session.execute("""
        INSERT INTO sales_bronze (region, country, item_type, sales_channel, order_priority, order_date, order_id, ship_date, units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        row['Region'], row['Country'], row['Item Type'], row['Sales Channel'],
        row['Order Priority'], row['Order Date'], uuid.uuid4(),
        row['Ship Date'], row['UnitsSold'], row['UnitPrice'],
        row['UnitCost'], row['TotalRevenue'], row['TotalCost'], row['TotalProfit']
    ))
print("✅ Data inserted into Bronze table 'sales_bronze'.")

# Step 3: Create Silver Table
print("\n🔘 Creating the Silver table, which stores cleaned and filtered data...")
session.execute("""
CREATE TABLE IF NOT EXISTS sales_silver (
    order_id UUID PRIMARY KEY,
    region TEXT,
    country TEXT,
    item_type TEXT,
    total_profit FLOAT
);
""")
print("🔘 Silver table 'sales_silver' created successfully!")

# Populate Silver Table
print("\n🔘 Inserting cleaned and filtered data into the Silver table...")
rows = session.execute("SELECT order_id, region, country, item_type, total_profit FROM sales_bronze;")
for row in rows:
    session.execute("""
        INSERT INTO sales_silver (order_id, region, country, item_type, total_profit)
        VALUES (%s, %s, %s, %s, %s)
    """, (row.order_id, row.region, row.country, row.item_type, row.total_profit))
print("✅ Data inserted into Silver table 'sales_silver'.")

# Step 4: Create and Populate Gold Tables
print("\n🟡 Creating Gold tables, which store aggregated data for analysis...")

# Table: Sales by Region
session.execute("""
CREATE TABLE IF NOT EXISTS sales_by_region (
    region TEXT PRIMARY KEY,
    total_profit FLOAT
);
""")
print("🟡 Gold table 'sales_by_region' created.")

rows = session.execute("SELECT region, total_profit FROM sales_bronze")
df_region = pd.DataFrame(rows)
aggregated_region = df_region.groupby('region', as_index=False).agg({'total_profit': 'sum'})
for _, row in aggregated_region.iterrows():
    session.execute("""
        INSERT INTO sales_by_region (region, total_profit)
        VALUES (%s, %s)
    """, (row['region'], row['total_profit']))
print("✅ Data inserted into Gold table 'sales_by_region'.")

# Table: Sales by Item Type
session.execute("""
CREATE TABLE IF NOT EXISTS sales_by_item_type (
    item_type TEXT PRIMARY KEY,
    total_units_sold INT
);
""")
print("🟡 Gold table 'sales_by_item_type' created.")

rows = session.execute("SELECT item_type, units_sold FROM sales_bronze")
df_item_type = pd.DataFrame(rows)
aggregated_item_type = df_item_type.groupby('item_type', as_index=False).agg({'units_sold': 'sum'})
for _, row in aggregated_item_type.iterrows():
    session.execute("""
        INSERT INTO sales_by_item_type (item_type, total_units_sold)
        VALUES (%s, %s)
    """, (row['item_type'], row['units_sold']))
print("✅ Data inserted into Gold table 'sales_by_item_type'.")

# Table: Monthly Sales Summary
session.execute("""
CREATE TABLE IF NOT EXISTS monthly_sales_summary (
    order_date TEXT PRIMARY KEY,
    total_revenue FLOAT
);
""")
print("🟡 Gold table 'monthly_sales_summary' created.")

rows = session.execute("SELECT order_date, total_revenue FROM sales_bronze")
df_order_date = pd.DataFrame(rows)
aggregated_order_date = df_order_date.groupby('order_date', as_index=False).agg({'total_revenue': 'sum'})
for _, row in aggregated_order_date.iterrows():
    session.execute("""
        INSERT INTO monthly_sales_summary (order_date, total_revenue)
        VALUES (%s, %s)
    """, (row['order_date'], row['total_revenue']))
print("✅ Data inserted into Gold table 'monthly_sales_summary'.")

# Step 5: Query Gold Tables
print("\n📊 Querying the Gold tables for aggregated data:")
gold_tables = ["sales_by_region", "sales_by_item_type", "monthly_sales_summary"]
for table in gold_tables:
    print(f"\n📋 Data from {table}:")
    rows = session.execute(f"SELECT * FROM {table};")
    for row in rows:
        print(row)






✅ Successfully connected to Cassandra!
✅ Connected to keyspace: cass_keyspace
🔍 Checking existing tables in the keyspace:
- monthly_sales_summary
- sales_bronze
- sales_by_item_type
- sales_by_region
- sales_silver
🗑️ Table 'monthly_sales_summary' dropped.
🗑️ Table 'sales_bronze' dropped.
🗑️ Table 'sales_by_item_type' dropped.
🗑️ Table 'sales_by_region' dropped.
🗑️ Table 'sales_silver' dropped.

🟤 Creating the Bronze table, which stores raw data...
🟤 Bronze table 'sales_bronze' created successfully!

🟤 Inserting raw data into the Bronze table...
✅ Data inserted into Bronze table 'sales_bronze'.

🔘 Creating the Silver table, which stores cleaned and filtered data...
🔘 Silver table 'sales_silver' created successfully!

🔘 Inserting cleaned and filtered data into the Silver table...
✅ Data inserted into Silver table 'sales_silver'.

🟡 Creating Gold tables, which store aggregated data for analysis...
🟡 Gold table 'sales_by_region' created.
✅ Data inserted into Gold table 'sales_by_region'.
