IMPORTING REQUIRED LIBRARIES

In [43]:
!pip install cassandra-driver
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json
import csv
from datetime import datetime
import pandas as pd
from cassandra.query import SimpleStatement



CONNECTING TO CASSSANDRA

In [9]:
def connect_to_cassandra():
  cloud_config= {
  'secure_connect_bundle': '/secure-connect-saicharan-database.zip'
  }
  with open('/Saicharan_Database-token.json') as f:
      secrets = json.load(f)

  CLIENT_ID = secrets["clientId"]
  CLIENT_SECRET = secrets["secret"]

  auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
  cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
  session = cluster.connect()
  if session:
    print("Session Created Successfully")
    return session
  else:
    print("Session is not established")

In [10]:
session = connect_to_cassandra()
keyspace = "medallion_architecture"
session.set_keyspace(keyspace)
print("keyspace is set to medallion architecture")



Session Created Successfully
keyspace is set to medallion architecture


BRONZE TABLE

In [11]:
def list_table_data(session, keyspace, table_name):
  query = "SELECT * FROM %s LIMIT 10"
  data = session.execute(query,(table_name))
  for each in data:
    print(each)

def drop_table(session, keyspace, table_name):
    query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s AND table_name = %s"
    result = session.execute(query, (keyspace, table_name))

    if result.current_rows:
        print(f"Table '{table_name}' already exists in keyspace '{keyspace}'. Dropping it...")
        drop_query = f"DROP TABLE {table_name}"
        session.execute(drop_query)
        print(f"Table '{table_name}' dropped successfully.")
    else:
        print(f"Table '{table_name}' does not exist. Ready to create a new table.")

In [50]:
drop_table(session,keyspace,"bronze_data")
session.execute("""
    CREATE TABLE IF NOT EXISTS bronze_data (
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date TEXT,  -- Use TEXT for date; adjust as needed
        order_id TEXT PRIMARY KEY,  -- Unique order ID
        ship_date TEXT,
        units_sold INT,
        unit_price FLOAT,
        unit_cost FLOAT,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT
    )
""")

print("Connected to Astra DB and Bronze table created successfully!")

csv_file = "/sales_100.csv"

with open(csv_file, 'r') as file:
    reader = csv.DictReader(file)
    for row in reader:
        session.execute("""
            INSERT INTO bronze_data (
                region, country, item_type, sales_channel,
                order_priority, order_date, order_id, ship_date,
                units_sold, unit_price, unit_cost,
                total_revenue, total_cost, total_profit
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """, (
            row['Region'], row['Country'], row['Item Type'], row['Sales Channel'],
            row['Order Priority'], row['Order Date'], row['Order ID'], row['Ship Date'],
            int(row['UnitsSold']), float(row['UnitPrice']), float(row['UnitCost']),
            float(row['TotalRevenue']), float(row['TotalCost']), float(row['TotalProfit'])
        ))

print("BRONZE TABLE")

print("Data inserted into Bronze table on Astra DB successfully!")

data = session.execute("SELECT * FROM bronze_data LIMIT 10")
for i in data:
    print(i)

Table 'bronze_data' already exists in keyspace 'medallion_architecture'. Dropping it...
Table 'bronze_data' dropped successfully.
Connected to Astra DB and Bronze table created successfully!
BRONZE TABLE
Data inserted into Bronze table on Astra DB successfully!
Row(order_id='940980136', country='New Zealand', item_type='Beverages', order_date='10/11/2012', order_priority='M', region='Australia and Oceania', sales_channel='Online', ship_date='11/4/2012', total_cost=184000.515625, total_profit=90640.078125, total_revenue=274640.59375, unit_cost=31.790000915527344, unit_price=47.45000076293945, units_sold=5788)
Row(order_id='363086831', country='Mali', item_type='Household', order_date='8/19/2010', order_priority='M', region='Sub-Saharan Africa', sales_channel='Offline', ship_date='9/7/2010', total_cost=2169465.25, total_profit=715456.4375, total_revenue=2884921.5, unit_cost=502.5400085449219, unit_price=668.27001953125, units_sold=4317)
Row(order_id='176461303', country='Singapore', item

SILVER TABLE

In [32]:
drop_table(session,keyspace,'silver_sales_data')
create_query = """
CREATE TABLE IF NOT EXISTS silver_sales_data (
    region TEXT,
    country TEXT,
    item_type TEXT,
    total_units_sold INT,
    average_unit_price FLOAT,
    total_revenue FLOAT,
    total_cost FLOAT,
    total_profit FLOAT,
    PRIMARY KEY ((region, country), item_type)
);"""
session.execute(create_query)
print("Silver table created successfully")
fetch_query = "SELECT * FROM bronze_sales_data"
rows = session.execute(fetch_query)

aggregated_data = {}

for row in rows:
    region = row.region
    country = row.country
    item_type = row.item_type
    units_sold = row.units_sold
    unit_price = row.unit_price
    total_revenue = row.total_revenue
    total_cost = row.total_cost
    total_profit = row.total_profit

    key = (region, country, item_type)

    if key not in aggregated_data:
        aggregated_data[key] = {
            'total_units_sold': 0,
            'total_revenue': 0.0,
            'total_cost': 0.0,
            'total_profit': 0.0,
            'total_price': 0.0,
            'count': 0
        }

    aggregated_data[key]['total_units_sold'] += units_sold
    aggregated_data[key]['total_revenue'] += total_revenue
    aggregated_data[key]['total_cost'] += total_cost
    aggregated_data[key]['total_profit'] += total_profit
    aggregated_data[key]['total_price'] += unit_price * units_sold
    aggregated_data[key]['count'] += 1

Table 'silver_sales_data' already exists in keyspace 'medallion_architecture'. Dropping it...
Table 'silver_sales_data' dropped successfully.
Silver table created successfully


In [51]:
print("SILVER TABLE")

for key, data in aggregated_data.items():
    region, country, item_type = key
    total_units_sold = data['total_units_sold']
    total_revenue = data['total_revenue']
    total_cost = data['total_cost']
    total_profit = data['total_profit']
    average_unit_price = data['total_price'] / data['total_units_sold'] if data['total_units_sold'] > 0 else 0

    insert_query = ("""
        INSERT INTO silver_sales_data (region, country, item_type, total_units_sold, average_unit_price, total_revenue, total_cost, total_profit)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """)

    session.execute(insert_query, (region, country, item_type, total_units_sold, average_unit_price, total_revenue, total_cost, total_profit))

data = session.execute("SELECT * FROM silver_sales_data LIMIT 10")
for i in data:
    print(i)

SILVER TABLE
Row(region='Europe', country='United Kingdom', item_type='Cosmetics', average_unit_price=437.20001220703125, total_cost=273336.53125, total_profit=180477.0625, total_revenue=453813.59375, total_units_sold=1038)
Row(region='Europe', country='Estonia', item_type='Household', average_unit_price=668.27001953125, total_cost=5013339.0, total_profit=1653322.5, total_revenue=6666661.5, total_units_sold=9976)
Row(region='Europe', country='Montenegro', item_type='Clothes', average_unit_price=109.27999877929688, total_cost=75873.28125, total_profit=155472.484375, total_revenue=231345.765625, total_units_sold=2117)
Row(region='Australia and Oceania', country='New Zealand', item_type='Beverages', average_unit_price=47.45000076293945, total_cost=184000.515625, total_profit=90640.078125, total_revenue=274640.59375, total_units_sold=5788)
Row(region='Europe', country='Czech Republic', item_type='Cosmetics', average_unit_price=437.20001220703125, total_cost=2411312.75, total_profit=1592127

GOLD TABLE

In [40]:
fetch_query = "SELECT * FROM bronze_sales_data"
rows = session.execute(fetch_query)

# Convert the rows into a pandas DataFrame
data = []
for row in rows:
    data.append({
        'region': row.region,
        'country': row.country,
        'item_type': row.item_type,
        'sales_channel': row.sales_channel,
        'order_priority': row.order_priority,
        'order_date': row.order_date,
        'order_id': row.order_id,
        'ship_date': row.ship_date,
        'total_units_sold': row.units_sold,
        'unit_price': row.unit_price,
        'unit_cost': row.unit_cost,
        'total_revenue': row.total_revenue,
        'total_cost': row.total_cost,
        'total_profit': row.total_profit
    })

# Create a pandas DataFrame from the fetched data
df = pd.DataFrame(data)

In [45]:
drop_table(session,keyspace,'sales_by_region')
CREATE_SALES_BY_REGION = """
CREATE TABLE IF NOT EXISTS sales_by_region (
    region TEXT,
    total_units_sold INT,
    total_revenue FLOAT,
    total_cost FLOAT,
    total_profit FLOAT,
    average_unit_price FLOAT,
    PRIMARY KEY (region)
)
"""
session.execute(CREATE_SALES_BY_REGION)
print("Gold Table sales_by_region created succesfully")

Table 'sales_by_region' does not exist. Ready to create a new table.
Gold Table sales_by_region created succesfully


In [55]:
gold_table_1 = df.groupby('region').agg({
    'total_units_sold': 'sum',
    'total_revenue': 'sum',
    'total_cost': 'sum',
    'total_profit': 'sum',
    'unit_price': 'mean'
}).reset_index()

for _, row in gold_table_1.iterrows():
    insert_query = SimpleStatement("""
        INSERT INTO sales_by_region (region, total_units_sold, total_revenue, total_cost, total_profit, average_unit_price)
        VALUES (%s, %s, %s, %s, %s, %s)
    """)
    session.execute(insert_query, (row['region'], row['total_units_sold'], row['total_revenue'],
                                   row['total_cost'], row['total_profit'], row['unit_price']))
print("GOLD TABLE 1")

data = session.execute("SELECT * FROM sales_by_region LIMIT 10")
for i in data:
    print(i)

GOLD TABLE 1
Row(region='Australia and Oceania', average_unit_price=287.90777587890625, total_cost=7224318.0, total_profit=3486940.0, total_revenue=10711258.0, total_units_sold=42328)
Row(region='Europe', average_unit_price=251.1858367919922, total_cost=23697468.0, total_profit=11267281.0, total_revenue=34964748.0, total_units_sold=121002)
Row(region='Middle East and North Africa', average_unit_price=374.0870056152344, total_cost=18250866.0, total_profit=6514262.0, total_revenue=24765128.0, total_units_sold=60376)
Row(region='Central America and the Caribbean', average_unit_price=307.19000244140625, total_cost=13318535.0, total_profit=4252300.0, total_revenue=17570836.0, total_units_sold=53641)
Row(region='Asia', average_unit_price=259.8636779785156, total_cost=22090916.0, total_profit=6749896.0, total_revenue=28840812.0, total_units_sold=113129)
Row(region='Sub-Saharan Africa', average_unit_price=235.45791625976562, total_cost=16573546.0, total_profit=7651892.0, total_revenue=24225438

In [48]:
drop_table(session,keyspace,'sales_by_item_type')
CREATE_SALES_BY_ITEM_TYPE = """
CREATE TABLE IF NOT EXISTS sales_by_item_type (
    item_type TEXT,
    total_units_sold INT,
    total_revenue FLOAT,
    total_cost FLOAT,
    total_profit FLOAT,
    average_unit_price FLOAT,
    PRIMARY KEY (item_type)
);
"""
session.execute(CREATE_SALES_BY_ITEM_TYPE)
print("Gold Table sales_by_item_type created succesfully")

Table 'sales_by_item_type' already exists in keyspace 'medallion_architecture'. Dropping it...
Table 'sales_by_item_type' dropped successfully.
Gold Table sales_by_item_type created succesfully


In [56]:
gold_table_2 = df.groupby('item_type').agg({
    'total_units_sold': 'sum',
    'total_revenue': 'sum',
    'total_cost': 'sum',
    'total_profit': 'sum',
    'unit_price': 'mean'
}).reset_index()

for _, row in gold_table_2.iterrows():
    insert_query = SimpleStatement("""
        INSERT INTO sales_by_item_type (item_type, total_units_sold, total_revenue, total_cost, total_profit, average_unit_price)
        VALUES (%s, %s, %s, %s, %s, %s)
    """)
    session.execute(insert_query, (row['item_type'], row['total_units_sold'], row['total_revenue'],
                                   row['total_cost'], row['total_profit'], row['unit_price']))
print("GOLD TABLE 2")

data = session.execute("SELECT * FROM sales_by_item_type LIMIT 10")
for i in data:
    print(i)


GOLD TABLE 2
Row(item_type='Household', average_unit_price=668.27001953125, total_cost=28966406.0, total_profit=9552677.0, total_revenue=38519084.0, total_units_sold=57640)
Row(item_type='Office Supplies', average_unit_price=651.2100219726562, total_cost=22475638.0, total_profit=5405267.5, total_revenue=27880904.0, total_units_sold=42814)
Row(item_type='Vegetables', average_unit_price=154.05999755859375, total_cost=669972.25, total_profit=465141.84375, total_revenue=1135114.125, total_units_sold=7368)
Row(item_type='Snacks', average_unit_price=152.5800018310547, total_cost=1400894.875, total_profit=792747.75, total_revenue=2193642.75, total_units_sold=14377)
Row(item_type='Personal Care', average_unit_price=81.7300033569336, total_cost=2212680.0, total_profit=978467.6875, total_revenue=3191147.75, total_units_sold=39045)
Row(item_type='Meat', average_unit_price=421.8900146484375, total_cost=18393870.0, total_profit=2884996.5, total_revenue=21278866.0, total_units_sold=50437)
Row(item_t

In [53]:
drop_table(session,keyspace,'sales_by_order_priority')
CREATE_SALES_BY_ORDER_PRIORITY = """
CREATE TABLE IF NOT EXISTS sales_by_order_priority (
    order_priority TEXT,
    total_units_sold INT,
    total_revenue FLOAT,
    total_cost FLOAT,
    total_profit FLOAT,
    average_unit_price FLOAT,
    PRIMARY KEY (order_priority)
)
"""
session.execute(CREATE_SALES_BY_ORDER_PRIORITY)
print("Gold Table sales_by_order_priority created succesfully")

Table 'sales_by_order_priority' already exists in keyspace 'medallion_architecture'. Dropping it...
Table 'sales_by_order_priority' dropped successfully.
Gold Table sales_by_order_priority created succesfully


In [57]:
gold_table_3 = df.groupby('order_priority').agg({
    'total_units_sold': 'sum',
    'total_revenue': 'sum',
    'total_cost': 'sum',
    'total_profit': 'sum',
    'unit_price': 'mean'
}).reset_index()

for _, row in gold_table_3.iterrows():
    insert_query = SimpleStatement("""
        INSERT INTO sales_by_order_priority (order_priority, total_units_sold, total_revenue, total_cost, total_profit, average_unit_price)
        VALUES (%s, %s, %s, %s, %s, %s)
    """)
    session.execute(insert_query, (row['order_priority'], row['total_units_sold'], row['total_revenue'],
                                   row['total_cost'], row['total_profit'], row['unit_price']))

print("GOLD TABLE 3")
data = session.execute("SELECT * FROM sales_by_order_priority LIMIT 10")
for i in data:
    print(i)


GOLD TABLE 3
Row(order_priority='C', average_unit_price=349.7989501953125, total_cost=25044636.0, total_profit=7649023.0, total_revenue=32693660.0, total_units_sold=92171)
Row(order_priority='M', average_unit_price=269.7085266113281, total_cost=34786864.0, total_profit=14607042.0, total_revenue=49393908.0, total_units_sold=161306)
Row(order_priority='H', average_unit_price=284.71453857421875, total_cost=27728600.0, total_profit=11910944.0, total_revenue=39639544.0, total_units_sold=121706)
Row(order_priority='L', average_unit_price=198.62208557128906, total_cost=15802683.0, total_profit=7160182.5, total_revenue=22962866.0, total_units_sold=119627)
