In [64]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import pandas as pd
import json

In [65]:
cloud_config = {
    'secure_connect_bundle' : 'secure-connect-cassandra-assignment.zip'
}

with open("token.json") as f:
    secrets = json.load(f)
    
CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID,CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

if session:
    print("Connected")
else:
    print("An error occured")

Connected


Set keyspace

In [66]:
session.set_keyspace("assignment")

# Bronze Level

Insert raw data

In [67]:
df = pd.read_csv("sales_100.csv")

In [68]:
# Drop the existing table if already created
# session.execute("DROP TABLE IF EXISTS sales_data")

In [69]:
session.execute("""
CREATE TABLE IF NOT EXISTS sales_data (
    region TEXT,
    country TEXT,
    item_type TEXT,
    sales_channel TEXT,
    order_priority TEXT,
    order_date DATE,
    order_id BIGINT PRIMARY KEY,
    ship_date DATE,
    units_sold INT,
    unit_price FLOAT,
    unit_cost FLOAT,
    total_revenue FLOAT,
    total_cost FLOAT,
    total_profit FLOAT
)
""")

<cassandra.cluster.ResultSet at 0x21d048439d0>

In [70]:
from datetime import datetime

# Convert dates to the correct format if they are not already
df['Order Date'] = pd.to_datetime(df['Order Date']).dt.date
df['Ship Date'] = pd.to_datetime(df['Ship Date']).dt.date

# Insert rows into Cassandra
for _, row in df.iterrows():
    session.execute("""
    INSERT INTO sales_data (
        region, country, item_type, sales_channel, order_priority, order_date, 
        order_id, ship_date, units_sold, unit_price, unit_cost, total_revenue, 
        total_cost, total_profit
    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        row['Region'], row['Country'], row['Item Type'], row['Sales Channel'], 
        row['Order Priority'], row['Order Date'], row['Order ID'], row['Ship Date'], 
        row['UnitsSold'], row['UnitPrice'], row['UnitCost'], row['TotalRevenue'], 
        row['TotalCost'], row['TotalProfit']
    ))

In [71]:
rows = session.execute("SELECT * FROM sales_data LIMIT 5")
for row in rows:
    print(row)

Row(order_id=294530856, country='Italy', item_type='Cereal', order_date=Date(15293), order_priority='M', region='Europe', sales_channel='Online', ship_date=Date(15336), total_cost=829138.8125, total_profit=627217.1875, total_revenue=1456356.0, unit_cost=117.11000061035156, unit_price=205.6999969482422, units_sold=7080)
Row(order_id=274930989, country='Dominica', item_type='Household', order_date=Date(15297), order_priority='C', region='Central America and the Caribbean', sales_channel='Offline', ship_date=Date(15321), total_cost=3539891.75, total_profit=1167402.125, total_revenue=4707294.0, unit_cost=502.5400085449219, unit_price=668.27001953125, units_sold=7044)
Row(order_id=498071897, country='Taiwan', item_type='Cereal', order_date=Date(14710), order_priority='H', region='Asia', sales_channel='Online', ship_date=Date(14755), total_cost=1100482.625, total_profit=832480.25, total_revenue=1932962.875, unit_cost=117.11000061035156, unit_price=205.6999969482422, units_sold=9397)
Row(orde

# Silver Level

Insert cleaned and transformed data

In [72]:
df.head(3)

Unnamed: 0,Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,UnitsSold,UnitPrice,UnitCost,TotalRevenue,TotalCost,TotalProfit
0,Sub-Saharan Africa,South Africa,Fruits,Offline,M,2012-07-27,443368995,2012-07-28,1593,9.33,6.92,14862.69,11023.56,3839.13
1,Middle East and North Africa,Morocco,Clothes,Online,M,2013-09-14,667593514,2013-10-19,4611,109.28,35.84,503890.08,165258.24,338631.84
2,Australia and Oceania,Papua New Guinea,Meat,Offline,M,2015-05-15,940995585,2015-06-04,360,421.89,364.69,151880.4,131288.4,20592.0


In [73]:
# Calculate profit margin
df['ProfitMargin_in_Percent'] = ((df['TotalProfit'] / df['TotalRevenue']) * 100).round(2)

In [74]:
df.head(3)

Unnamed: 0,Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,UnitsSold,UnitPrice,UnitCost,TotalRevenue,TotalCost,TotalProfit,ProfitMargin_in_Percent
0,Sub-Saharan Africa,South Africa,Fruits,Offline,M,2012-07-27,443368995,2012-07-28,1593,9.33,6.92,14862.69,11023.56,3839.13,25.83
1,Middle East and North Africa,Morocco,Clothes,Online,M,2013-09-14,667593514,2013-10-19,4611,109.28,35.84,503890.08,165258.24,338631.84,67.2
2,Australia and Oceania,Papua New Guinea,Meat,Offline,M,2015-05-15,940995585,2015-06-04,360,421.89,364.69,151880.4,131288.4,20592.0,13.56


Create silver table in DB

In [75]:
# Drop the existing table if already created
# session.execute("DROP TABLE IF EXISTS silver_sales_data")

In [76]:
session.execute("""
CREATE TABLE IF NOT EXISTS silver_sales_data (
    region TEXT,
    country TEXT,
    item_type TEXT,
    sales_channel TEXT,
    order_priority TEXT,
    order_date DATE,
    order_id BIGINT PRIMARY KEY,
    ship_date DATE,
    units_sold INT,
    unit_price FLOAT,
    unit_cost FLOAT,
    total_revenue FLOAT,
    total_cost FLOAT,
    total_profit FLOAT,
    ProfitMargin_in_Percent FLOAT
)
""")

<cassandra.cluster.ResultSet at 0x21d048f7760>

In [77]:
# Insert into the Silver table
for _, row in df.iterrows():
    session.execute("""
    INSERT INTO silver_sales_data (
        region, country, item_type, sales_channel, order_priority, order_date, 
        order_id, ship_date, units_sold, unit_price, unit_cost, total_revenue, 
        total_cost, total_profit, ProfitMargin_in_Percent
    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        row['Region'], row['Country'], row['Item Type'], row['Sales Channel'], 
        row['Order Priority'], row['Order Date'], row['Order ID'], row['Ship Date'], 
        row['UnitsSold'], row['UnitPrice'], row['UnitCost'], row['TotalRevenue'], 
        row['TotalCost'], row['TotalProfit'], row['ProfitMargin_in_Percent']
    ))

In [78]:
rows = session.execute("SELECT * FROM silver_sales_data LIMIT 5")
for row in rows:
    print(row)          

Row(order_id=294530856, country='Italy', item_type='Cereal', order_date=Date(15293), order_priority='M', profitmargin_in_percent=43.06999969482422, region='Europe', sales_channel='Online', ship_date=Date(15336), total_cost=829138.8125, total_profit=627217.1875, total_revenue=1456356.0, unit_cost=117.11000061035156, unit_price=205.6999969482422, units_sold=7080)
Row(order_id=274930989, country='Dominica', item_type='Household', order_date=Date(15297), order_priority='C', profitmargin_in_percent=24.799999237060547, region='Central America and the Caribbean', sales_channel='Offline', ship_date=Date(15321), total_cost=3539891.75, total_profit=1167402.125, total_revenue=4707294.0, unit_cost=502.5400085449219, unit_price=668.27001953125, units_sold=7044)
Row(order_id=498071897, country='Taiwan', item_type='Cereal', order_date=Date(14710), order_priority='H', profitmargin_in_percent=43.06999969482422, region='Asia', sales_channel='Online', ship_date=Date(14755), total_cost=1100482.625, total_

# Gold Level 

In [79]:
# Drop the existing table if already created
# session.execute("DROP TABLE IF EXISTS gold_total_profit_by_region")

Table 1

In [80]:
# Create Gold Table 1
session.execute("""
CREATE TABLE IF NOT EXISTS gold_total_profit_by_region (
    region TEXT,
    country TEXT,
    total_profit FLOAT,
    PRIMARY KEY (region, country)
)
""")

gold1_df = df.groupby(['Region', 'Country'])['TotalProfit'].sum().reset_index()
for _, row in gold1_df.iterrows():
    session.execute("""
    INSERT INTO gold_total_profit_by_region (region, country, total_profit)
    VALUES (%s, %s, %s)
    """, (row['Region'], row['Country'], row['TotalProfit']))


In [81]:
rows = session.execute("SELECT * FROM gold_total_profit_by_region LIMIT 5")
for row in rows:
    print(row)

Row(region='Australia and Oceania', country='East Timor', total_profit=22944.810546875)
Row(region='Australia and Oceania', country='New Zealand', total_profit=90640.078125)
Row(region='Australia and Oceania', country='Papua New Guinea', total_profit=688308.5)
Row(region='Australia and Oceania', country='Samoa ', total_profit=937534.625)
Row(region='Australia and Oceania', country='Solomon Islands', total_profit=700209.25)


In [82]:
# Drop the existing table if already created
# session.execute("DROP TABLE IF EXISTS gold_total_revenue_by_region")

Table 2

In [83]:
# Create Gold Table 2
session.execute("""
CREATE TABLE IF NOT EXISTS gold_total_revenue_by_region (
    region TEXT,
    total_revenue FLOAT,
    PRIMARY KEY (region)
)
""")

gold2_df = df.groupby(['Region'])['TotalRevenue'].sum().reset_index()
for _, row in gold2_df.iterrows():
    session.execute("""
    INSERT INTO gold_total_revenue_by_region (region, total_revenue)
    VALUES (%s, %s)
    """, (row['Region'], row['TotalRevenue']))

In [84]:
rows = session.execute("SELECT * FROM gold_total_revenue_by_region LIMIT 5")
for row in rows:
    print(row)

Row(region='Australia and Oceania', total_revenue=10711258.0)
Row(region='Europe', total_revenue=34964748.0)
Row(region='Middle East and North Africa', total_revenue=24765128.0)
Row(region='Central America and the Caribbean', total_revenue=17570836.0)
Row(region='Asia', total_revenue=28840812.0)


In [85]:
# Drop the existing table if already created
# session.execute("DROP TABLE IF EXISTS gold_avg_profit_margin_by_channel")

Table 3

In [86]:
# Create Gold Table 3
session.execute("""
CREATE TABLE IF NOT EXISTS gold_avg_profit_margin_by_channel (
    sales_channel TEXT,
    avg_profit_margin FLOAT,
    PRIMARY KEY (sales_channel)
)
""")

# ProfitMargin 
df['ProfitMargin'] = (df['TotalProfit'] / df['TotalRevenue'])

gold3_df = df.groupby(['Sales Channel'])['ProfitMargin'].mean().reset_index()
for _, row in gold3_df.iterrows():
    session.execute("""
    INSERT INTO gold_avg_profit_margin_by_channel (sales_channel, avg_profit_margin)
    VALUES (%s, %s)
    """, (row['Sales Channel'], row['ProfitMargin']))


In [87]:
rows = session.execute("SELECT * FROM gold_avg_profit_margin_by_channel")
for row in rows:
    print(row)

Row(sales_channel='Online', avg_profit_margin=0.3344477713108063)
Row(sales_channel='Offline', avg_profit_margin=0.3479812741279602)
