In [5]:
!pip install cassandra-driver

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

cloud_config= {
  'secure_connect_bundle': '/content/secure-connect-reena-mula.zip'
}


with open("/content/Reena Mula-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

row = session.execute("select release_version from system.local").one()
if row:
  print(row[0])
  print("connected to cassandra")
else:
  print("An error occurred.")





4.0.0.6816
connected to cassandra


In [8]:
keyspace = "medallion_architecture"
session.set_keyspace(keyspace)
print("keyspace is set to medallion architecture")

keyspace is set to medallion architecture


In [9]:
def drop_table(session, keyspace, table_name):
    query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s AND table_name = %s"
    result = session.execute(query, (keyspace, table_name))

    if result.current_rows:
        print(f"Table '{table_name}' already exists in keyspace '{keyspace}'. Dropping it...")
        drop_query = f"DROP TABLE {table_name}"
        session.execute(drop_query)
        print(f"Table '{table_name}' dropped successfully.")
    else:
        print(f"Table '{table_name}' does not exist. Ready to create a new table.")

In [13]:
drop_table(session,keyspace,"bronze_sales_data")
session.execute("""
    CREATE TABLE IF NOT EXISTS bronze_sales_data (
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date TEXT,  -- Use TEXT for date; adjust as needed
        order_id TEXT PRIMARY KEY,  -- Unique order ID
        ship_date TEXT,
        units_sold INT,
        unit_price FLOAT,
        unit_cost FLOAT,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT
    )
""")

print("Connected to Astra DB and Bronze table created successfully!")


Table 'bronze_sales_data' already exists in keyspace 'medallion_architecture'. Dropping it...
Table 'bronze_sales_data' dropped successfully.
Connected to Astra DB and Bronze table created successfully!


In [15]:
import csv
csv_file = "/content/sales_100.csv"


with open(csv_file, 'r') as file:
    reader = csv.DictReader(file)
    for row in reader:
        session.execute("""
            INSERT INTO bronze_sales_data (
                region, country, item_type, sales_channel,
                order_priority, order_date, order_id, ship_date,
                units_sold, unit_price, unit_cost,
                total_revenue, total_cost, total_profit
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """, (
            row['Region'], row['Country'], row['Item Type'], row['Sales Channel'],
            row['Order Priority'], row['Order Date'], row['Order ID'], row['Ship Date'],
            int(row['UnitsSold']), float(row['UnitPrice']), float(row['UnitCost']),
            float(row['TotalRevenue']), float(row['TotalCost']), float(row['TotalProfit'])
        ))

print("Data inserted into Bronze table on Astra DB successfully!")

Data inserted into Bronze table on Astra DB successfully!


In [16]:
data = session.execute("SELECT * FROM bronze_sales_data LIMIT 10")
for i in data:
    print(i)

Row(order_id='940980136', country='New Zealand', item_type='Beverages', order_date='10/11/2012', order_priority='M', region='Australia and Oceania', sales_channel='Online', ship_date='11/4/2012', total_cost=184000.515625, total_profit=90640.078125, total_revenue=274640.59375, unit_cost=31.790000915527344, unit_price=47.45000076293945, units_sold=5788)
Row(order_id='363086831', country='Mali', item_type='Household', order_date='8/19/2010', order_priority='M', region='Sub-Saharan Africa', sales_channel='Offline', ship_date='9/7/2010', total_cost=2169465.25, total_profit=715456.4375, total_revenue=2884921.5, unit_cost=502.5400085449219, unit_price=668.27001953125, units_sold=4317)
Row(order_id='176461303', country='Singapore', item_type='Snacks', order_date='1/28/2013', order_priority='C', region='Asia', sales_channel='Online', ship_date='2/7/2013', total_cost=747949.4375, total_profit=423254.625, total_revenue=1171204.125, unit_cost=97.44000244140625, unit_price=152.5800018310547, units_

In [36]:
drop_table(session,keyspace,"silver_sales_data")
session.execute(f"""
    CREATE TABLE IF NOT EXISTS silver_sales_data (
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_date DATE,
        order_id TEXT PRIMARY KEY,  -- Ensure uniqueness
        ship_date DATE,
        units_sold INT,
        unit_price FLOAT,
        total_profit FLOAT
    )
""")
print(f"Silver table 'silver_sales_data' created successfully!")

silver_table = "silver_sales_data"
bronze_table = "bronze_sales_data"

def parse_date(date_str):
    """Parse date from 'mm/dd/yyyy' format."""
    try:
        return datetime.strptime(date_str, "%m/%d/%Y").date()
    except Exception as e:
        print(f"Error parsing date '{date_str}': {e}")
        return None

def is_valid_row(row):
    """Validate row for critical fields and business rules."""
    if not row.order_id or not row.order_date or not row.total_profit:
        return False

    if row.units_sold is None or row.total_profit is None or row.units_sold < 0 or row.total_profit < 0:
        return False

    if parse_date(row.order_date) is None or parse_date(row.ship_date) is None:
        return False

    return True

def clean_text(text):
    """Normalize and clean text fields (e.g., capitalize)."""
    return text.strip().title() if text else None

rows = session.execute(f"SELECT * FROM {bronze_table}")

for row in rows:
    if is_valid_row(row):
        order_date = parse_date(row.order_date)
        ship_date = parse_date(row.ship_date)
        region = clean_text(row.region)
        country = clean_text(row.country)
        item_type = clean_text(row.item_type)
        sales_channel = clean_text(row.sales_channel)


        session.execute(f"""
            INSERT INTO {silver_table} (
                region, country, item_type, sales_channel,
                order_date, order_id, ship_date,
                units_sold, unit_price, total_profit
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """, (
            region, country, item_type, sales_channel,
            order_date, row.order_id, ship_date,
            row.units_sold, row.unit_price, row.total_profit
        ))

print("Data transformed and loaded into Silver table successfully!")





Table 'silver_sales_data' already exists in keyspace 'medallion_architecture'. Dropping it...
Table 'silver_sales_data' dropped successfully.
Silver table 'silver_sales_data' created successfully!
Data transformed and loaded into Silver table successfully!


In [38]:
data = session.execute("SELECT * FROM silver_sales_data LIMIT 10")
for i in data:
    print(i)

Row(order_id='940980136', country='New Zealand', item_type='Beverages', order_date=Date(15624), region='Australia And Oceania', sales_channel='Online', ship_date=Date(15648), total_profit=90640.078125, unit_price=47.45000076293945, units_sold=5788)
Row(order_id='363086831', country='Mali', item_type='Household', order_date=Date(14840), region='Sub-Saharan Africa', sales_channel='Offline', ship_date=Date(14859), total_profit=715456.4375, unit_price=668.27001953125, units_sold=4317)
Row(order_id='176461303', country='Singapore', item_type='Snacks', order_date=Date(15733), region='Asia', sales_channel='Online', ship_date=Date(15743), total_profit=423254.625, unit_price=152.5800018310547, units_sold=7676)
Row(order_id='438011872', country='Dominica', item_type='Beverages', order_date=Date(15503), region='Central America And The Caribbean', sales_channel='Online', ship_date=Date(15539), total_profit=98673.65625, unit_price=47.45000076293945, units_sold=6301)
Row(order_id='135178029', countr

In [32]:

create_regional_sales = """
CREATE TABLE IF NOT EXISTS regional_sales (
            region TEXT,
            country TEXT,
            total_units_sold INT,
            total_revenue FLOAT,
            PRIMARY KEY (region, country)
        )
"""

create_channel_analysis = """
CREATE TABLE IF NOT EXISTS channel_analysis (
            sales_channel TEXT,
            total_units_sold INT,
            total_profit FLOAT,
            PRIMARY KEY (sales_channel)
        )
"""

create_profitability_insights = """
CREATE TABLE IF NOT EXISTS profitability_insights (
            order_id TEXT PRIMARY KEY,
            total_profit FLOAT
        )
"""

drop_table(session,keyspace,"regional_sales")
session.execute(create_regional_sales)
print("Created gold table: regional_sales")

drop_table(session,keyspace,"channel_analysis")
session.execute(create_channel_analysis)
print("Created gold table: channel_analysis")


drop_table(session,keyspace,"profitability_insights")
session.execute(create_profitability_insights)
print("Created gold table: profitability_insights")

Table 'regional_sales' already exists in keyspace 'medallion_architecture'. Dropping it...
Table 'regional_sales' dropped successfully.
Created gold table: regional_sales
Table 'channel_analysis' already exists in keyspace 'medallion_architecture'. Dropping it...
Table 'channel_analysis' dropped successfully.
Created gold table: channel_analysis
Table 'profitability_insights' already exists in keyspace 'medallion_architecture'. Dropping it...
Table 'profitability_insights' dropped successfully.
Created gold table: profitability_insights


In [50]:
import pandas as pd
rows = session.execute("SELECT region, country, units_sold, unit_price FROM silver_sales_data")



regional_sales = {}

for row in rows:
    key = (row.region, row.country)
    if key not in regional_sales:
        regional_sales[key] = {"total_units_sold": 0, "total_revenue": 0.0}

    regional_sales[key]["total_units_sold"] += row.units_sold
    regional_sales[key]["total_revenue"] += row.units_sold * row.unit_price


for (region, country), metrics in regional_sales.items():
    session.execute("""
        INSERT INTO regional_sales (region, country, total_units_sold, total_revenue)
        VALUES (%s, %s, %s, %s)
    """, (region, country, metrics["total_units_sold"], metrics["total_revenue"]))

print("Data loaded into regional_sales table successfully!")
data = session.execute("SELECT * FROM regional_sales LIMIT 10")
for i in data:
    print(i)


Data loaded into regional_sales table successfully!
Row(region='Europe', country='Albania', total_revenue=2140268.5, total_units_sold=9433)
Row(region='Europe', country='Belgium', total_revenue=231050.71875, total_units_sold=2827)
Row(region='Europe', country='Czech Republic', total_revenue=4003440.5, total_units_sold=9157)
Row(region='Europe', country='Estonia', total_revenue=6666661.5, total_units_sold=9976)
Row(region='Europe', country='Finland', total_revenue=5552598.0, total_units_sold=8846)
Row(region='Europe', country='France', total_revenue=2517397.75, total_units_sold=5758)
Row(region='Europe', country='Greece', total_revenue=1784241.75, total_units_sold=8674)
Row(region='Europe', country='Iceland', total_revenue=628499.375, total_units_sold=2462)
Row(region='Europe', country='Italy', total_revenue=3996349.0, total_units_sold=13119)
Row(region='Europe', country='Moldova', total_revenue=34856.87890625, total_units_sold=3736)


In [47]:
query = "SELECT order_id, sales_channel, units_sold, total_profit FROM silver_sales_data"
rows = session.execute(query)

df = pd.DataFrame(rows, columns=['order_id', 'sales_channel', 'units_sold', 'total_profit'])

channel_analysis_df = df.groupby('sales_channel').agg(
    total_units_sold=('units_sold', 'sum'),
    total_profit=('total_profit', 'sum')
).reset_index()

for _, row in channel_analysis_df.iterrows():
    session.execute("""
        INSERT INTO channel_analysis (sales_channel, total_units_sold, total_profit)
        VALUES (%s, %s, %s)
    """, (row['sales_channel'], row['total_units_sold'], row['total_profit']))

print("Data loaded into channel_analysis table successfully!")
data = session.execute("SELECT * FROM channel_analysis LIMIT 10")
for i in data:
    print(i)

Data loaded into channel_analysis table successfully!
Row(sales_channel='Online', total_profit=24963806.0, total_units_sold=308320)
Row(sales_channel='Offline', total_profit=16363386.0, total_units_sold=186490)


In [49]:
query = "SELECT order_id, sales_channel, units_sold, total_profit FROM silver_sales_data"
rows = session.execute(query)

df = pd.DataFrame(rows, columns=['order_id', 'sales_channel', 'units_sold', 'total_profit'])
profitability_insights_df = df.sort_values(by='total_profit', ascending=False).head(100)

for _, row in profitability_insights_df.iterrows():
    session.execute("""
        INSERT INTO profitability_insights (order_id, total_profit)
        VALUES (%s, %s)
    """, (row['order_id'], row['total_profit']))

print("Data loaded into profitability_insights table successfully!")
data = session.execute("SELECT * FROM profitability_insights LIMIT 10")
for i in data:
    print(i)

Data loaded into profitability_insights table successfully!
Row(order_id='940980136', total_profit=90640.078125)
Row(order_id='363086831', total_profit=715456.4375)
Row(order_id='176461303', total_profit=423254.625)
Row(order_id='438011872', total_profit=98673.65625)
Row(order_id='135178029', total_profit=180477.0625)
Row(order_id='488121116', total_profit=11423.400390625)
Row(order_id='845056617', total_profit=23827.669921875)
Row(order_id='667593514', total_profit=338631.84375)
Row(order_id='425793445', total_profit=9349.01953125)
Row(order_id='467399013', total_profit=40856.94140625)
