In [1]:
pip install cassandra-driver

Collecting cassandra-driver
  Downloading cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.2 kB)
Collecting geomet<0.3,>=0.1 (from cassandra-driver)
  Downloading geomet-0.2.1.post1-py3-none-any.whl.metadata (1.0 kB)
Downloading cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m31.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading geomet-0.2.1.post1-py3-none-any.whl (18 kB)
Installing collected packages: geomet, cassandra-driver
Successfully installed cassandra-driver-3.29.2 geomet-0.2.1.post1


In [6]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

def load_config():
    cloud_config = {
        'secure_connect_bundle': 'secure-connect-saniko52.zip'
    }

    with open("saniko52-token.json") as f:
        secrets = json.load(f)

    client_id = secrets["clientId"]
    client_secret = secrets["secret"]

    return cloud_config, client_id, client_secret


def connect_to_cluster(cloud_config, client_id, client_secret):
    auth_provider = PlainTextAuthProvider(client_id, client_secret)
    cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
    return cluster.connect()

def main():
    cloud_config, client_id, client_secret = load_config()
    session = connect_to_cluster(cloud_config, client_id, client_secret)

    if session:
        print('Connected!')
    else:
        print("An error occurred.")

if __name__ == "__main__":
    main()




Connected!


In [10]:
import requests
import pandas as pd
from io import StringIO
def download_csv(url):
    response = requests.get(url)
    if response.status_code == 200:
        return response.text
    else:
        raise Exception(f"Failed to download file, status code: {response.status_code}")

def load_csv_to_dataframe(csv_data):
    try:

        return pd.read_csv(StringIO(csv_data), on_bad_lines='skip')
    except Exception as e:
        print(f"Error loading CSV: {e}")
        return None

def main():

    url = "https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv"
    csv_data = download_csv(url)
    df = load_csv_to_dataframe(csv_data)
    if df is not None:

        print(df.head())
    else:
        print("Failed to load the CSV data.")

if __name__ == "__main__":
    main()


                         Region           Country  Item Type Sales Channel  \
0            Sub-Saharan Africa      South Africa     Fruits       Offline   
1  Middle East and North Africa           Morocco    Clothes        Online   
2         Australia and Oceania  Papua New Guinea       Meat       Offline   
3            Sub-Saharan Africa          Djibouti    Clothes       Offline   
4                        Europe          Slovakia  Beverages       Offline   

  Order Priority  Order Date   Order ID   Ship Date  UnitsSold  UnitPrice  \
0              M   7/27/2012  443368995   7/28/2012       1593       9.33   
1              M   9/14/2013  667593514  10/19/2013       4611     109.28   
2              M   5/15/2015  940995585    6/4/2015        360     421.89   
3              H   5/17/2017  880811536    7/2/2017        562     109.28   
4              L  10/26/2016  174590194   12/4/2016       3973      47.45   

   UnitCost  TotalRevenue  TotalCost  TotalProfit  
0      6.92     

In [13]:
session.execute("""
CREATE TABLE IF NOT EXISTS mydatabase.bronze_sales (
    id UUID PRIMARY KEY,
    transaction_id text,
    customer_id text,
    product_id text,
    amount decimal,
    transaction_date text
)
""")

<cassandra.cluster.ResultSet at 0x7a3bfa60d4e0>

In [18]:
import pandas as pd
from uuid import uuid4
from datetime import datetime
import requests
from io import StringIO
url = "https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv"
response = requests.get(url)
csv_data = response.text

df = pd.read_csv(StringIO(csv_data))
df.columns = df.columns.str.strip().str.replace(' ', '_')

def insert_data_to_cassandra(row):
    try:

        transaction_date = datetime.strptime(row['Order_Date'], '%m/%d/%Y').strftime('%Y-%m-%d %H:%M:%S')
    except ValueError:
        print(f"Invalid date format: {row['Order_Date']}")
        return None


    session.execute("""
    INSERT INTO mydatabase.bronze_sales (
        id, transaction_id, customer_id, product_id, amount, transaction_date
    ) VALUES (%s, %s, %s, %s, %s, %s)
    """, (
        uuid4(),
        str(row['Order_ID']),
        str(row['Region']),
        str(row['Item_Type']),
        float(row['TotalRevenue']),
        transaction_date
    ))


df.apply(insert_data_to_cassandra, axis=1)


print(df.columns)


Index(['Region', 'Country', 'Item_Type', 'Sales_Channel', 'Order_Priority',
       'Order_Date', 'Order_ID', 'Ship_Date', 'UnitsSold', 'UnitPrice',
       'UnitCost', 'TotalRevenue', 'TotalCost', 'TotalProfit'],
      dtype='object')


In [21]:
from datetime import datetime
def transform_date(transaction_date):
    try:

        return datetime.strptime(transaction_date, '%Y-%m-%d %H:%M:%S').date()
    except ValueError:
        try:

            return datetime.strptime(transaction_date, '%Y-%m-%d').date()
        except ValueError:
            print(f"Invalid date format: {transaction_date}")
            return None


def create_silver_table():
    create_table_query = """
    CREATE TABLE IF NOT EXISTS mydatabase.silver_sales (
        transaction_id text PRIMARY KEY,
        customer_id text,
        product_id text,
        amount decimal,
        transaction_date date
    )
    """
    session.execute(create_table_query)

def insert_to_silver_sales(rows):
    for row in rows:
        transaction_date = transform_date(row.transaction_date)

        if transaction_date is not None:
            session.execute("""
            INSERT INTO mydatabase.silver_sales (transaction_id, customer_id, product_id, amount, transaction_date)
            VALUES (%s, %s, %s, %s, %s)
            """, (
                row.transaction_id,
                row.customer_id,
                row.product_id,
                row.amount,
                transaction_date
            ))

def main():

    create_silver_table()
    query = "SELECT transaction_id, customer_id, product_id, amount, transaction_date FROM mydatabase.bronze_sales"
    rows = session.execute(query)
    insert_to_silver_sales(rows)

if __name__ == "__main__":
    main()


In [23]:
from collections import defaultdict
from decimal import Decimal
def create_table(query):
    session.execute(query)

def aggregate_sales(query, group_by_column):
    rows = session.execute(query)
    sales = defaultdict(float)

    for row in rows:
        sales[getattr(row, group_by_column)] += float(row.amount)

    return sales

def insert_aggregated_data(table, data, column, value_column):
    for key, value in data.items():
        session.execute(f"""
        INSERT INTO {table} ({column}, {value_column})
        VALUES (%s, %s)
        """, (key, value))
create_table("""
CREATE TABLE IF NOT EXISTS mydatabase.gold_sales_by_customer (
    customer_id text PRIMARY KEY,
    total_sales decimal
)
""")
customer_sales = aggregate_sales("SELECT customer_id, amount FROM mydatabase.silver_sales", 'customer_id')

insert_aggregated_data('mydatabase.gold_sales_by_customer', customer_sales, 'customer_id', 'total_sales')
create_table("""
CREATE TABLE IF NOT EXISTS mydatabase.gold_sales_by_product (
    product_id text PRIMARY KEY,
    total_sales decimal
)
""")
product_sales = aggregate_sales("SELECT product_id, amount FROM mydatabase.silver_sales", 'product_id')
insert_aggregated_data('mydatabase.gold_sales_by_product', product_sales, 'product_id', 'total_sales')
create_table("""
CREATE TABLE IF NOT EXISTS mydatabase.gold_daily_sales (
    transaction_date date PRIMARY KEY,
    total_sales decimal
)
""")
daily_sales = aggregate_sales("SELECT transaction_date, amount FROM mydatabase.silver_sales", 'transaction_date')
insert_aggregated_data('mydatabase.gold_daily_sales', daily_sales, 'transaction_date', 'total_sales')


In [25]:
# Fetch data from Gold tables
# Gold Table 1: Total Sales by Customer
result_customer = session.execute("SELECT * FROM mydatabase.gold_sales_by_customer")
for row in result_customer:
    print(row)


Row(customer_id='Australia and Oceania', total_sales=Decimal('10711258.13'))
Row(customer_id='Europe', total_sales=Decimal('34964749.830000006'))
Row(customer_id='Middle East and North Africa', total_sales=Decimal('24765127.25'))
Row(customer_id='Central America and the Caribbean', total_sales=Decimal('17570835.42'))
Row(customer_id='Asia', total_sales=Decimal('28840812.190000005'))
Row(customer_id='Sub-Saharan Africa', total_sales=Decimal('24225437.419999998'))
Row(customer_id='North America', total_sales=Decimal('3611757.5199999996'))


In [26]:
# Gold Table 2: Total Sales by Product
result_product = session.execute("SELECT * FROM mydatabase.gold_sales_by_product")
for row in result_product:
    print(row)


Row(product_id='Household', total_sales=Decimal('38519082.8'))
Row(product_id='Office Supplies', total_sales=Decimal('27880904.94'))
Row(product_id='Vegetables', total_sales=Decimal('1135114.08'))
Row(product_id='Snacks', total_sales=Decimal('2193642.66'))
Row(product_id='Personal Care', total_sales=Decimal('3191147.8499999996'))
Row(product_id='Meat', total_sales=Decimal('21278865.93'))
Row(product_id='Fruits', total_sales=Decimal('615033.6'))
Row(product_id='Beverages', total_sales=Decimal('2145024.7'))
Row(product_id='Cereal', total_sales=Decimal('9416123.2'))
Row(product_id='Cosmetics', total_sales=Decimal('28727100.399999995'))
Row(product_id='Baby Food', total_sales=Decimal('5200564.159999999'))
Row(product_id='Clothes', total_sales=Decimal('4387373.4399999995'))


In [27]:
# Gold Table 3: Daily Sales Summary
result_daily_sales = session.execute("SELECT * FROM mydatabase.gold_daily_sales")
for row in result_daily_sales:
    print(row)

Row(transaction_date=Date(16267), total_sales=Decimal('5608790.11'))
Row(transaction_date=Date(15042), total_sales=Decimal('167640.85'))
Row(transaction_date=Date(15162), total_sales=Decimal('289426.4'))
Row(transaction_date=Date(17303), total_sales=Decimal('61415.36'))
Row(transaction_date=Date(16781), total_sales=Decimal('33410.73'))
Row(transaction_date=Date(16769), total_sales=Decimal('643018.2'))
Row(transaction_date=Date(14710), total_sales=Decimal('1932962.9'))
Row(transaction_date=Date(15293), total_sales=Decimal('1456356.0'))
Row(transaction_date=Date(15962), total_sales=Decimal('503890.08'))
Row(transaction_date=Date(16151), total_sales=Decimal('4003440.4'))
Row(transaction_date=Date(16110), total_sales=Decimal('74957.22'))
Row(transaction_date=Date(15218), total_sales=Decimal('6666661.52'))
Row(transaction_date=Date(15389), total_sales=Decimal('217368.45'))
Row(transaction_date=Date(17135), total_sales=Decimal('70036.2'))
Row(transaction_date=Date(17048), total_sales=Decimal