In [None]:
!pip install cassandra-driver




In [None]:
# Import the necessary libraries
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

In [None]:
import pandas as pd


In [None]:
# This secure connect bundle is autogenerated when you download your SCB,
# if yours is different update the file name below
cloud_config= {
  'secure_connect_bundle': 'secure-connect-learner.zip'
}

# This token JSON file is autogenerated when you download your token,
# if yours is different update the file name below
with open("Learner-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

if session:
  print('Connected!')
else:
  print("An error occurred.")



Connected!


In [None]:
create_table_query = """
CREATE TABLE IF NOT EXISTS bronze_sales (
    id UUID PRIMARY KEY,
    customer_id UUID,
    product_id UUID,
    amount DECIMAL,
    purchase_date TIMESTAMP
);
"""
session.execute(create_table_query)

<cassandra.cluster.ResultSet at 0x7a7c7a4dfe80>

In [None]:
session.set_keyspace('example')

**Bronze Table**

In [None]:
# Set the keyspace
session.set_keyspace('example')

# Try to fetch data from the table
rows = session.execute("SELECT * FROM bronze_sales LIMIT 5;")
for row in rows:
    print(row)


In [None]:
# Set the keyspace
session.set_keyspace('example')

# Query system_schema.tables to check if bronze_sales exists
rows = session.execute("SELECT * FROM system_schema.tables WHERE keyspace_name = 'example';")
for row in rows:
    print(row)


Row(keyspace_name='example', table_name='bronze_data', additional_write_policy='99PERCENTILE', bloom_filter_fp_chance=0.01, caching=OrderedMapSerializedKey([('keys', 'ALL'), ('rows_per_partition', 'NONE')]), cdc=None, comment='', compaction=OrderedMapSerializedKey([('class', 'org.apache.cassandra.db.compaction.UnifiedCompactionStrategy')]), compression=OrderedMapSerializedKey([('chunk_length_in_kb', '64'), ('class', 'org.apache.cassandra.io.compress.LZ4Compressor')]), crc_check_chance=1.0, dclocal_read_repair_chance=0.0, default_time_to_live=0, extensions=OrderedMapSerializedKey([]), flags=SortedSet(['compound']), gc_grace_seconds=864000, id=UUID('de6a4250-b1fe-11ef-b8b4-cb29c41c2c61'), max_index_interval=2048, memtable_flush_period_in_ms=0, min_index_interval=128, nodesync=None, read_repair='BLOCKING', read_repair_chance=0.0, speculative_retry='99PERCENTILE')
Row(keyspace_name='example', table_name='bronze_sales', additional_write_policy='99PERCENTILE', bloom_filter_fp_chance=0.01, ca

In [None]:
import pandas as pd

# Load the CSV file from GitHub
url = "sales_100.csv"
df = pd.read_csv(url)

# Display the first 5 rows to confirm the data is loaded
print(df.head())

                         Region           Country  Item Type Sales Channel  \
0            Sub-Saharan Africa      South Africa     Fruits       Offline   
1  Middle East and North Africa           Morocco    Clothes        Online   
2         Australia and Oceania  Papua New Guinea       Meat       Offline   
3            Sub-Saharan Africa          Djibouti    Clothes       Offline   
4                        Europe          Slovakia  Beverages       Offline   

  Order Priority  Order Date   Order ID   Ship Date  UnitsSold  UnitPrice  \
0              M   7/27/2012  443368995   7/28/2012       1593       9.33   
1              M   9/14/2013  667593514  10/19/2013       4611     109.28   
2              M   5/15/2015  940995585    6/4/2015        360     421.89   
3              H   5/17/2017  880811536    7/2/2017        562     109.28   
4              L  10/26/2016  174590194   12/4/2016       3973      47.45   

   UnitCost  TotalRevenue  TotalCost  TotalProfit  
0      6.92     

In [None]:
print(df.columns)

Index(['Region', 'Country', 'Item Type', 'Sales Channel', 'Order Priority',
       'Order Date', 'Order ID', 'Ship Date', 'UnitsSold', 'UnitPrice',
       'UnitCost', 'TotalRevenue', 'TotalCost', 'TotalProfit'],
      dtype='object')


**sliver table**

In [None]:
create_silver_table_query = """
CREATE TABLE IF NOT EXISTS example.silver_sales (
    order_id BIGINT PRIMARY KEY,
    region TEXT,
    country TEXT,
    item_type TEXT,
    sales_channel TEXT,
    order_priority TEXT,
    order_date TIMESTAMP,
    ship_date TIMESTAMP,
    units_sold INT,
    unit_price FLOAT,
    unit_cost FLOAT,
    total_revenue FLOAT,
    total_cost FLOAT,
    total_profit FLOAT
)
"""
session.execute(create_silver_table_query)
print("Silver table created successfully!")


Silver table created successfully!


In [None]:
import datetime

# Query the Bronze table for all records
bronze_rows = session.execute("SELECT * FROM example.bronze_data")

# Define an INSERT query for the Silver table
insert_silver_query = """
INSERT INTO example.silver_sales (
    order_id, region, country, item_type, sales_channel, order_priority,
    order_date, ship_date, units_sold, unit_price, unit_cost,
    total_revenue, total_cost, total_profit
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""

prepared_silver = session.prepare(insert_silver_query)

# Insert data into the Silver table
for row in bronze_rows:
    # Convert date strings to TIMESTAMP
    order_date = datetime.datetime.strptime(row.order_date, '%m/%d/%Y')
    ship_date = datetime.datetime.strptime(row.ship_date, '%m/%d/%Y')

    session.execute(prepared_silver, (
        row.order_id, row.region, row.country, row.item_type, row.sales_channel,
        row.order_priority, order_date, ship_date, row.units_sold,
        row.unit_price, row.unit_cost, row.total_revenue, row.total_cost, row.total_profit
    ))

print("Data inserted into Silver table successfully!")


Data inserted into Silver table successfully!


In [None]:
rows = session.execute("SELECT * FROM example.silver_sales LIMIT 5")
for row in rows:
    print(row)


Row(order_id=294530856, country='Italy', item_type='Cereal', order_date=datetime.datetime(2011, 11, 15, 0, 0), order_priority='M', region='Europe', sales_channel='Online', ship_date=datetime.datetime(2011, 12, 28, 0, 0), total_cost=829138.8125, total_profit=627217.1875, total_revenue=1456356.0, unit_cost=117.11000061035156, unit_price=205.6999969482422, units_sold=7080)
Row(order_id=274930989, country='Dominica', item_type='Household', order_date=datetime.datetime(2011, 11, 19, 0, 0), order_priority='C', region='Central America and the Caribbean', sales_channel='Offline', ship_date=datetime.datetime(2011, 12, 13, 0, 0), total_cost=3539891.75, total_profit=1167402.125, total_revenue=4707294.0, unit_cost=502.5400085449219, unit_price=668.27001953125, units_sold=7044)
Row(order_id=498071897, country='Taiwan', item_type='Cereal', order_date=datetime.datetime(2010, 4, 11, 0, 0), order_priority='H', region='Asia', sales_channel='Online', ship_date=datetime.datetime(2010, 5, 26, 0, 0), total_

In [None]:
query = "SELECT * FROM example.silver_sales ;"
rows = session.execute(query)

for row in rows:
    print(row)

Row(order_id=294530856, country='Italy', item_type='Cereal', order_date=datetime.datetime(2011, 11, 15, 0, 0), order_priority='M', region='Europe', sales_channel='Online', ship_date=datetime.datetime(2011, 12, 28, 0, 0), total_cost=829138.8125, total_profit=627217.1875, total_revenue=1456356.0, unit_cost=117.11000061035156, unit_price=205.6999969482422, units_sold=7080)
Row(order_id=274930989, country='Dominica', item_type='Household', order_date=datetime.datetime(2011, 11, 19, 0, 0), order_priority='C', region='Central America and the Caribbean', sales_channel='Offline', ship_date=datetime.datetime(2011, 12, 13, 0, 0), total_cost=3539891.75, total_profit=1167402.125, total_revenue=4707294.0, unit_cost=502.5400085449219, unit_price=668.27001953125, units_sold=7044)
Row(order_id=498071897, country='Taiwan', item_type='Cereal', order_date=datetime.datetime(2010, 4, 11, 0, 0), order_priority='H', region='Asia', sales_channel='Online', ship_date=datetime.datetime(2010, 5, 26, 0, 0), total_

# **Gold Table 1: Total Profit by Region**

In [145]:
# Create Gold Table 1
create_gold_table_1_query = """
CREATE TABLE IF NOT EXISTS gold_total_sales_by_product (
    item_type TEXT PRIMARY KEY,
    total_quantity INT,
    total_sales DECIMAL
);
"""
session.execute(create_gold_table_1_query)
print("Gold Table 1 created successfully!")

# Query Silver table and aggregate data by item_type
rows = session.execute("SELECT item_type, units_sold, unit_price FROM example.silver_sales;")

# Perform aggregation
product_sales = {}
for row in rows:
    item_type = row.item_type  # Using 'item_type' as the product identifier
    quantity = row.units_sold
    sales = quantity * row.unit_price

    if item_type in product_sales:
        product_sales[item_type]['total_quantity'] += quantity
        product_sales[item_type]['total_sales'] += sales
    else:
        product_sales[item_type] = {'total_quantity': quantity, 'total_sales': sales}

# Insert aggregated data into Gold Table 1
for item_type, data in product_sales.items():
    session.execute(
        """
        INSERT INTO gold_total_sales_by_product (item_type, total_quantity, total_sales)
        VALUES (%s, %s, %s)
        """,
        (item_type, data['total_quantity'], data['total_sales'])
    )

print("Gold Table 1 (Total Sales by Product) populated successfully!")

Gold Table 1 created successfully!


InvalidRequest: Error from server: code=2200 [Invalid query] message="Undefined column name item_type"

In [146]:
# Drop existing Gold tables if they exist
session.execute("DROP TABLE IF EXISTS gold_total_sales_by_product;")
session.execute("DROP TABLE IF EXISTS gold_sales_by_region;")
session.execute("DROP TABLE IF EXISTS gold_top_customers;")

print("Existing Gold tables dropped successfully!")

Existing Gold tables dropped successfully!


In [147]:
# Step 2: Create Gold Table 1: Total Sales by Product
create_gold_table_1_query = """
CREATE TABLE IF NOT EXISTS gold_total_sales_by_product (
    item_type TEXT PRIMARY KEY,
    total_quantity INT,
    total_sales DECIMAL
);
"""
session.execute(create_gold_table_1_query)
print("Gold Table 1 created successfully!")

# Step 3: Create Gold Table 2: Total Sales by Region
create_gold_table_2_query = """
CREATE TABLE IF NOT EXISTS gold_sales_by_region (
    region TEXT PRIMARY KEY,
    total_sales DECIMAL
);
"""
session.execute(create_gold_table_2_query)
print("Gold Table 2 created successfully!")

# Step 4: Create Gold Table 3: Top Customers
create_gold_table_3_query = """
CREATE TABLE IF NOT EXISTS gold_top_customers (
    customer_id UUID PRIMARY KEY,
    total_sales DECIMAL
);
"""
session.execute(create_gold_table_3_query)
print("Gold Table 3 created successfully!")

Gold Table 1 created successfully!
Gold Table 2 created successfully!
Gold Table 3 created successfully!


In [148]:
rows = session.execute("SELECT item_type, units_sold, unit_price FROM example.silver_sales;")

product_sales = {}
for row in rows:
    item_type = row.item_type
    quantity = row.units_sold
    sales = quantity * row.unit_price

    if item_type in product_sales:
        product_sales[item_type]['total_quantity'] += quantity
        product_sales[item_type]['total_sales'] += sales
    else:
        product_sales[item_type] = {'total_quantity': quantity, 'total_sales': sales}

for item_type, data in product_sales.items():
    session.execute(
        """
        INSERT INTO gold_total_sales_by_product (item_type, total_quantity, total_sales)
        VALUES (%s, %s, %s)
        """,
        (item_type, data['total_quantity'], data['total_sales'])
    )

print("Gold Table 1 (Total Sales by Product) populated successfully!")

Gold Table 1 (Total Sales by Product) populated successfully!


In [149]:
# Gold Table 2 (Total Sales by Region)
rows = session.execute("SELECT region, units_sold, unit_price FROM example.silver_sales;")

region_sales = {}
for row in rows:
    region = row.region
    sales = row.units_sold * row.unit_price

    if region in region_sales:
        region_sales[region] += sales
    else:
        region_sales[region] = sales

for region, total_sales in region_sales.items():
    session.execute(
        """
        INSERT INTO gold_sales_by_region (region, total_sales)
        VALUES (%s, %s)
        """,
        (region, total_sales)
    )

print("Gold Table 2 (Total Sales by Region) populated successfully!")

Gold Table 2 (Total Sales by Region) populated successfully!


In [152]:
from uuid import uuid4

# Gold Table 3 (Top Customers)
# We assume order_id acts as a proxy for a customer in this case.
rows = session.execute("SELECT order_id, total_profit FROM example.silver_sales;")

# Aggregating total profit by order_id (acting as customer_id)
customer_sales = {}
for row in rows:
    order_id = row.order_id  # Using order_id as a stand-in for customer_id
    sales = row.total_profit

    if order_id in customer_sales:
        customer_sales[order_id] += sales
    else:
        customer_sales[order_id] = sales

# Insert top 10 customers (based on total_profit) into Gold Table 3
for order_id, total_sales in sorted(customer_sales.items(), key=lambda x: x[1], reverse=True)[:10]:
    # Convert order_id to UUID (if it's not already UUID)
    # If `order_id` is an integer or string, convert it to UUID
    customer_id = uuid4()  # Using uuid4() to generate a new UUID (if needed)

    session.execute(
        """
        INSERT INTO gold_top_customers (customer_id, total_sales)
        VALUES (%s, %s)
        """,
        (customer_id, total_sales)
    )

print("Gold Table 3 (Top Customers) populated successfully!")

Gold Table 3 (Top Customers) populated successfully!


In [154]:
# Step 6: Verify Data
print("Gold Table 1 Data:")
rows = session.execute("SELECT * FROM gold_total_sales_by_product;")
for row in rows:
    print(row)



Gold Table 1 Data:
Row(item_type='Household', total_quantity=57640, total_sales=Decimal('38519083.92578125'))
Row(item_type='Office Supplies', total_quantity=42814, total_sales=Decimal('27880905.880737305'))
Row(item_type='Vegetables', total_quantity=7368, total_sales=Decimal('1135114.0620117188'))
Row(item_type='Snacks', total_quantity=14377, total_sales=Decimal('2193642.6863250732'))
Row(item_type='Personal Care', total_quantity=39045, total_sales=Decimal('3191147.981071472'))
Row(item_type='Meat', total_quantity=50437, total_sales=Decimal('21278866.668823242'))
Row(item_type='Fruits', total_quantity=65920, total_sales=Decimal('615033.5949707031'))
Row(item_type='Beverages', total_quantity=45206, total_sales=Decimal('2145024.734489441'))
Row(item_type='Cereal', total_quantity=45776, total_sales=Decimal('9416123.060302734'))
Row(item_type='Cosmetics', total_quantity=65707, total_sales=Decimal('28727101.202087402'))
Row(item_type='Baby Food', total_quantity=20372, total_sales=Decimal('

In [155]:
print("Gold Table 2 Data:")
rows = session.execute("SELECT * FROM gold_sales_by_region;")
for row in rows:
    print(row)


Gold Table 2 Data:
Row(region='Australia and Oceania', total_sales=Decimal('10711258.296281815'))
Row(region='Europe', total_sales=Decimal('34964750.484773636'))
Row(region='Middle East and North Africa', total_sales=Decimal('24765128.00546646'))
Row(region='Central America and the Caribbean', total_sales=Decimal('17570835.936037064'))
Row(region='Asia', total_sales=Decimal('28840812.910438538'))
Row(region='Sub-Saharan Africa', total_sales=Decimal('24225438.06429863'))
Row(region='North America', total_sales=Decimal('3611757.625427246'))


In [156]:

print("Gold Table 3 Data:")
rows = session.execute("SELECT * FROM gold_top_customers;")
for row in rows:
    print(row)

Gold Table 3 Data:
Row(customer_id=UUID('fe664d2b-3838-44bb-b4f8-176d9a248f98'), total_sales=Decimal('1291202.375'))
Row(customer_id=UUID('48059d26-9cf7-47bd-a610-eb40c7af0243'), total_sales=Decimal('1297765.625'))
Row(customer_id=UUID('635ee6c7-1ef8-46ed-b1e6-84ed3665290f'), total_sales=Decimal('1214903.75'))
Row(customer_id=UUID('ebba8b57-7e7c-4e61-9fc8-e7b69c9aaa8d'), total_sales=Decimal('1390971.875'))
Row(customer_id=UUID('b61d431d-945e-4510-8480-9ab2832cad0c'), total_sales=Decimal('1380006.25'))
Row(customer_id=UUID('d1a101f2-8a1f-40f0-b406-c633830c6951'), total_sales=Decimal('1681149.0'))
Row(customer_id=UUID('993a2efa-8b37-456b-8020-0302fbfcd8fe'), total_sales=Decimal('1653322.5'))
Row(customer_id=UUID('b2846796-b573-47ed-9312-f3770fa29492'), total_sales=Decimal('1350368.0'))
Row(customer_id=UUID('f72e970b-ab6d-4f5d-9608-369b70ccc9c3'), total_sales=Decimal('1350622.125'))
Row(customer_id=UUID('b3888ba7-5f20-4112-afe6-5d23acb4b49c'), total_sales=Decimal('1592127.625'))


In [144]:
rows = session.execute("SELECT column_name FROM system_schema.columns WHERE keyspace_name = 'example' AND table_name = 'silver_sales';")
print("Columns in silver_sales:")
for row in rows:
    print(row.column_name)

Columns in silver_sales:
country
item_type
order_date
order_id
order_priority
region
sales_channel
ship_date
total_cost
total_profit
total_revenue
unit_cost
unit_price
units_sold
