In [209]:
!pip install cassandra-driver



In [210]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

In [211]:
# This secure connect bundle is autogenerated when you download your SCB,
# if yours is different update the file name below
cloud_config= {
  'secure_connect_bundle': 'secure-connect-cassandra.zip'
}

# This token JSON file is autogenerated when you download your token,
# if yours is different update the file name below
with open("cassandra-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

if session:
  print('Connected!')
else:
  print("An error occurred.")



Connected!


In [212]:
import requests
import pandas as pd
from cassandra.cluster import Cluster

# Download the CSV file
url = "https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv"
csv_file = "sales_100.csv"
response = requests.get(url)

# Save the file locally
with open(csv_file, 'wb') as file:
    file.write(response.content)

# Read the CSV file
df = pd.read_csv(csv_file)
print(df.head())  # Display the first few rows


                         Region           Country  Item Type Sales Channel  \
0            Sub-Saharan Africa      South Africa     Fruits       Offline   
1  Middle East and North Africa           Morocco    Clothes        Online   
2         Australia and Oceania  Papua New Guinea       Meat       Offline   
3            Sub-Saharan Africa          Djibouti    Clothes       Offline   
4                        Europe          Slovakia  Beverages       Offline   

  Order Priority  Order Date   Order ID   Ship Date  UnitsSold  UnitPrice  \
0              M   7/27/2012  443368995   7/28/2012       1593       9.33   
1              M   9/14/2013  667593514  10/19/2013       4611     109.28   
2              M   5/15/2015  940995585    6/4/2015        360     421.89   
3              H   5/17/2017  880811536    7/2/2017        562     109.28   
4              L  10/26/2016  174590194   12/4/2016       3973      47.45   

   UnitCost  TotalRevenue  TotalCost  TotalProfit  
0      6.92     

In [247]:

session.execute("""
CREATE TABLE IF NOT EXISTS cassandrasales.bronze_sales (
    id UUID PRIMARY KEY,
    transaction_id text,
    customer_id text,
    product_id text,
    amount decimal,
    transaction_date text
)
""")


<cassandra.cluster.ResultSet at 0x79a7a152a200>

In [214]:
from uuid import uuid4

In [215]:
df.columns = df.columns.str.strip().str.replace(' ', '_')

In [250]:
from datetime import datetime
from uuid import uuid4

# Assuming 'df' is your DataFrame containing the data
for _, row in df.iterrows():
    # Parse the Order_Date string to a datetime object
    try:
        transaction_date = datetime.strptime(row['Order_Date'], '%m/%d/%Y').strftime('%Y-%m-%d %H:%M:%S')
    except ValueError:
        print(f"Invalid date format: {row['Order_Date']}")
        continue  # Skip rows with invalid date formats

    # Insert the data into the Cassandra table
    session.execute("""
    INSERT INTO cassandrasales.bronze_sales (
        id, transaction_id, customer_id, product_id, amount, transaction_date
    ) VALUES (%s, %s, %s, %s, %s, %s)
    """, (
        uuid4(),                             # UUID for the primary key (always generates a new value)
        str(row['Order_ID']),                # Convert Order_ID to string (assuming 'Order_ID' is correct)
        str(row['Region']),                  # Convert Region to string (assuming 'Region' is correct)
        str(row['Item_Type']),               # Convert Item_Type to string (assuming 'Item_Type' is correct)
        float(row['TotalRevenue']),          # Ensure TotalRevenue is a float (assuming 'TotalRevenue' is correct)
        transaction_date                     # Converted date string (proper date format for Cassandra)
    ))


In [224]:
print(df.columns)

Index(['Region', 'Country', 'Item_Type', 'Sales_Channel', 'Order_Priority',
       'Order_Date', 'Order_ID', 'Ship_Date', 'UnitsSold', 'UnitPrice',
       'UnitCost', 'TotalRevenue', 'TotalCost', 'TotalProfit'],
      dtype='object')


In [225]:
# Create Silver Table if not exists
session.execute("""
CREATE TABLE IF NOT EXISTS cassandrasales.silver_sales (
    sale_id uuid PRIMARY KEY,
    total_revenue decimal,
    item_type text,
    order_date date
)
""")

<cassandra.cluster.ResultSet at 0x79a79f545ab0>

In [226]:
from datetime import datetime


# Transform Data for Silver Table
query = "SELECT sale_id, item_type, total_revenue, order_date FROM cassandrasales.bronze_sales"
rows = session.execute(query)

# Insert data into silver_sales
for row in rows:
    # Convert 'order_date' to 'YYYY-MM-DD' format (only the date part)
    try:
        # If the date is in 'YYYY-MM-DD HH:MM:SS' format, extract only the date part
        order_date = datetime.strptime(row.order_date, '%Y-%m-%d %H:%M:%S').date()
    except ValueError:
        # If the 'order_date' is already in 'YYYY-MM-DD' format, just parse it
        order_date = datetime.strptime(row.order_date, '%Y-%m-%d').date()

    # Insert the data into the silver_sales table
    session.execute("""
    INSERT INTO cassandrasales.silver_sales (sale_id, item_type, total_revenue, order_date)
    VALUES (%s, %s, %s, %s)
    """, (
        row.sale_id,
        row.item_type,
        row.total_revenue,
        order_date
    ))


In [227]:
from collections import defaultdict
from decimal import Decimal

In [228]:
# Create Gold Table if not exists
session.execute("""
CREATE TABLE IF NOT EXISTS cassandrasales.gold_sales (
    product_id text,
    customer_id text,
    total_amount decimal,
    total_transactions int,
    PRIMARY KEY (product_id, customer_id)
)
""")


<cassandra.cluster.ResultSet at 0x79a79f4c7370>

In [229]:
# Fetch raw data from silver_sales and aggregate by item_type (instead of product_id)
query = "SELECT item_type, total_revenue FROM cassandrasales.silver_sales"
rows = session.execute(query)

# Aggregate total sales by item_type using Python
product_sales = defaultdict(float)
for row in rows:
    product_sales[row.item_type] += float(row.total_revenue)

# Insert aggregated data into gold_sales_by_product
for item_type, total_sales in product_sales.items():
    session.execute("""
    INSERT INTO cassandrasales.gold_sales_by_product (item_type, total_sales)
    VALUES (%s, %s)
    """, (item_type, total_sales))


In [230]:
# Fetch data from gold_sales_by_product table
result_product = session.execute("SELECT * FROM cassandrasales.gold_sales_by_product")

# Iterate through the rows and print the results
for row in result_product:
    print(f"Row(item_type={row.item_type}, total_sales={row.total_sales})")


Row(item_type=Household, total_sales=38519082.8)
Row(item_type=Office Supplies, total_sales=27880904.94)
Row(item_type=Vegetables, total_sales=1135114.08)
Row(item_type=Snacks, total_sales=2193642.66)
Row(item_type=Personal Care, total_sales=3191147.8499999996)
Row(item_type=Meat, total_sales=21278865.930000003)
Row(item_type=Fruits, total_sales=615033.6)
Row(item_type=Beverages, total_sales=2145024.7)
Row(item_type=Cereal, total_sales=9416123.2)
Row(item_type=Cosmetics, total_sales=28727100.400000002)
Row(item_type=Baby Food, total_sales=5200564.159999999)
Row(item_type=Clothes, total_sales=4387373.4399999995)


In [231]:
session.execute("""
CREATE TABLE IF NOT EXISTS cassandrasales.gold_daily_sales (
    transaction_date date PRIMARY KEY,
    total_sales decimal
)
""")


<cassandra.cluster.ResultSet at 0x79a79f1a0dc0>

In [232]:
from collections import defaultdict
import uuid

# Create the table for aggregated data (e.g., Total Sales by Region)
session.execute("""
CREATE TABLE IF NOT EXISTS cassandrasales.gold_sales_by_region (
    region text PRIMARY KEY,
    transaction_count int,
    total_sales decimal
)
""")

# Fetch raw data from silver_sales and aggregate by region (or any other key like order_date, item_type, etc.)
query = "SELECT region, total_revenue FROM cassandrasales.silver_sales"
rows = session.execute(query)

# Aggregate total sales by region using Python
region_sales = defaultdict(lambda: {'transaction_count': 0, 'total_sales': 0.0})
for row in rows:
    # Aggregate sales for this region
    region_sales[row.region]['transaction_count'] += 1
    region_sales[row.region]['total_sales'] += float(row.total_revenue)

# Insert aggregated data into gold_sales_by_region
for region, data in region_sales.items():
    session.execute("""
    INSERT INTO cassandrasales.gold_sales_by_region (region, transaction_count, total_sales)
    VALUES (%s, %s, %s)
    """, (region, data['transaction_count'], data['total_sales']))


In [233]:
# Query the gold_sales_by_region table to retrieve the aggregated data
query = "SELECT region, transaction_count, total_sales FROM cassandrasales.gold_sales_by_region"
rows = session.execute(query)

# Print the result
for row in rows:
    print(f"Region: {row.region}, Transaction Count: {row.transaction_count}, Total Sales: {row.total_sales}")


Region: Australia and Oceania, Transaction Count: 9, Total Sales: 10711258.13
Region: Europe, Transaction Count: 24, Total Sales: 34964749.830000006
Region: Middle East and North Africa, Transaction Count: 10, Total Sales: 24765127.25
Region: Central America and the Caribbean, Transaction Count: 11, Total Sales: 17570835.419999998
Region: Asia, Transaction Count: 19, Total Sales: 28840812.19
Region: Sub-Saharan Africa, Transaction Count: 24, Total Sales: 24225437.420000006
Region: North America, Transaction Count: 2, Total Sales: 3611757.5199999996


In [234]:
# Create the table for aggregated data (e.g., Total Sales by Unit Cost and Unit Price)
session.execute("""
CREATE TABLE IF NOT EXISTS cassandrasales.gold_sales_by_unit_cost_unit_price (
    unit_cost decimal,
    unit_price decimal,
    transaction_count int,
    total_sales decimal,
    PRIMARY KEY (unit_cost, unit_price)
)
""")

# Fetch raw data from silver_sales and aggregate by unit_cost and unit_price
query = "SELECT unit_cost, unit_price, total_revenue FROM cassandrasales.silver_sales"
rows = session.execute(query)

# Aggregate total sales by unit_cost and unit_price using Python
unit_sales = defaultdict(lambda: {'transaction_count': 0, 'total_sales': 0.0})
for row in rows:
    # Aggregate sales for this pair of unit_cost and unit_price
    unit_sales[(row.unit_cost, row.unit_price)]['transaction_count'] += 1
    unit_sales[(row.unit_cost, row.unit_price)]['total_sales'] += float(row.total_revenue)

# Insert aggregated data into gold_sales_by_unit_cost_unit_price
for (unit_cost, unit_price), data in unit_sales.items():
    session.execute("""
    INSERT INTO cassandrasales.gold_sales_by_unit_cost_unit_price (unit_cost, unit_price, transaction_count, total_sales)
    VALUES (%s, %s, %s, %s)
    """, (unit_cost, unit_price, data['transaction_count'], data['total_sales']))


In [251]:
# Query the gold_sales_by_unit_cost_unit_price table to retrieve the aggregated data
query = "SELECT unit_cost, unit_price, transaction_count, total_sales FROM cassandrasales.gold_sales_by_unit_cost_unit_price"
rows = session.execute(query)

# Print the result
for row in rows:
    print(f"Unit Cost: {row.unit_cost}, Unit Price: {row.unit_price}, Transaction Count: {row.transaction_count}, Total Sales: {row.total_sales}")


Unit Cost: 56.67, Unit Price: 81.73, Transaction Count: 8, Total Sales: 3191147.8499999996
Unit Cost: 31.79, Unit Price: 47.45, Transaction Count: 11, Total Sales: 2145024.7
Unit Cost: 159.42, Unit Price: 255.28, Transaction Count: 6, Total Sales: 5200564.159999999
Unit Cost: 117.11, Unit Price: 205.7, Transaction Count: 8, Total Sales: 9416123.2
Unit Cost: 6.92, Unit Price: 9.33, Transaction Count: 13, Total Sales: 615033.6
Unit Cost: 97.44, Unit Price: 152.58, Transaction Count: 3, Total Sales: 2193642.66
Unit Cost: 263.33, Unit Price: 437.2, Transaction Count: 12, Total Sales: 28727100.400000002
Unit Cost: 35.84, Unit Price: 109.28, Transaction Count: 10, Total Sales: 4387373.4399999995
Unit Cost: 90.93, Unit Price: 154.06, Transaction Count: 2, Total Sales: 1135114.08
Unit Cost: 524.96, Unit Price: 651.21, Transaction Count: 8, Total Sales: 27880904.94
Unit Cost: 364.69, Unit Price: 421.89, Transaction Count: 8, Total Sales: 21278865.930000003
Unit Cost: 502.54, Unit Price: 668.27,