In [1]:
# Install the Cassandra python driver
!pip install cassandra-driver

Collecting cassandra-driver
  Downloading cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.2 kB)
Collecting geomet<0.3,>=0.1 (from cassandra-driver)
  Downloading geomet-0.2.1.post1-py3-none-any.whl.metadata (1.0 kB)
Downloading cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m31.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading geomet-0.2.1.post1-py3-none-any.whl (18 kB)
Installing collected packages: geomet, cassandra-driver
Successfully installed cassandra-driver-3.29.2 geomet-0.2.1.post1


In [2]:
# Import the necessary libraries
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

In [3]:
# This secure connect bundle is autogenerated when you download your SCB,
# if yours is different update the file name below
cloud_config= {
  'secure_connect_bundle': 'secure-connect-bigdata.zip'
}

# This token JSON file is autogenerated when you download your token,
# if yours is different update the file name below
with open("BIGDATA-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

if session:
  print('Connected!')
else:
  print("An error occurred.")



Connected!


In [8]:
# Connect to the example Keyspace
session = cluster.connect()

# Load the CSV data into a pandas DataFrame
import pandas as pd
df = pd.read_csv('sales_100.csv')



In [19]:
df['Order Date'] = pd.to_datetime(df['Order Date'], format='%m/%d/%Y').dt.strftime('%Y-%m-%d')
df['Ship Date'] = pd.to_datetime(df['Ship Date'], format='%m/%d/%Y').dt.strftime('%Y-%m-%d')

In [12]:
session.execute("""Create table if not exists bigdata.sales_orders(region text, country text, itemType text, salesChannel text, orderPriority text, orderDate Date, OrderID int PRIMARY KEY, shipDate Date)""")

<cassandra.cluster.ResultSet at 0x7f057cba7f70>

In [20]:
# Insert data from DataFrame into Cassandra table
insert_query = session.prepare("INSERT INTO bigdata.sales_orders(region, country, itemType, salesChannel, orderPriority, orderDate, OrderID, shipDate) VALUES (?, ?, ?, ?, ?, ?, ?, ?)")

for index, row in df.iterrows():
    session.execute(insert_query, (row['Region'], row['Country'], row['Item Type'], row['Sales Channel'], row['Order Priority'], row['Order Date'], row['Order ID'], row['Ship Date']))

print("Data inserted successfully.")


Data inserted successfully.


In [None]:
#data is at the bronze level, and we need to take it to the silver level

In [48]:
select_query = "SELECT * FROM sales_orders;"
rows = session.execute(select_query)

if rows:
    data = pd.DataFrame(list(rows))
    print(data)
else:
    print("No data found in the sales_orders table.")


      orderid             country   itemtype   orderdate orderpriority  \
0   571997869             Vanuatu     Fruits  2013-11-03             C   
1   349235904          Mauritius     Clothes  2012-11-17             M   
2   440306556               India     Snacks  2012-10-10             L   
3   667593514             Morocco    Clothes  2013-09-14             M   
4   520480573           Indonesia  Household  2011-09-28             C   
..        ...                 ...        ...         ...           ...   
94  252889239            Thailand       Meat  2015-02-04             C   
95  830192887           Sri Lanka     Fruits  2011-11-07             L   
96  925136649              Serbia    Clothes  2016-07-06             L   
97  824714744  Dominican Republic  Baby Food  2011-08-25             H   
98  572335612             Vanuatu     Cereal  2014-06-20             C   

                               region saleschannel    shipdate  
0               Australia and Oceania       On

In [50]:
print("Raw columns:", data.columns)
data.columns = data.columns.str.strip()
print("Cleaned columns:", data.columns)

Raw columns: Index(['orderid', 'country', 'itemtype', 'orderdate', 'orderpriority',
       'region', 'saleschannel', 'shipdate'],
      dtype='object')
Cleaned columns: Index(['orderid', 'country', 'itemtype', 'orderdate', 'orderpriority',
       'region', 'saleschannel', 'shipdate'],
      dtype='object')


In [51]:
data['is_invalid'] = data['shipdate'] <= data['orderdate']

In [52]:
print(data['is_invalid'])

0     False
1     False
2     False
3     False
4      True
      ...  
94    False
95    False
96    False
97    False
98    False
Name: is_invalid, Length: 99, dtype: bool


In [53]:
data = data[data['is_invalid'] != True]

In [None]:
#the data is clean now, so we can push it into another table and claim it as a silver-level table

In [58]:
session.execute("""Create table if not exists bigdata.sales_orders_silver(region text, country text, itemType text, salesChannel text, orderPriority text, orderDate Date, OrderID int PRIMARY KEY, shipDate Date)""")
insert_query = """
    INSERT INTO sales_orders_silver(region, country, itemtype, saleschannel, orderpriority, orderdate,
                              orderid, shipdate)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
"""


for _, row in data.iterrows():
    session.execute(insert_query, (
        row['region'], row['country'], row['itemtype'], row['saleschannel'], row['orderpriority'],
        row['orderdate'], row['orderid'], row['shipdate'])
    )

In [None]:
#now we create 3 gold tables in cassandra
#first gold1

In [61]:
query = "SELECT * FROM sales_orders_silver where country = 'Italy' ALLOW FILTERING"
rows = session.execute(query)
df = pd.DataFrame(rows)

In [62]:
session.execute("""Create table if not exists bigdata.sales_orders_gold1(region text, country text, itemType text, salesChannel text, orderPriority text, orderDate Date, OrderID int PRIMARY KEY, shipDate Date)""")

<cassandra.cluster.ResultSet at 0x7f057bbb6c50>

In [63]:
insert_query = """
    INSERT INTO sales_orders_gold1(region, country, itemtype, saleschannel, orderpriority, orderdate,
                              orderid, shipdate)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
"""


for _, row in df.iterrows():
    session.execute(insert_query, (
        row['region'], row['country'], row['itemtype'], row['saleschannel'], row['orderpriority'],
        row['orderdate'], row['orderid'], row['shipdate'])
    )

In [None]:
#now gold2
#for this, we can estimate how fresh the fruits can be

In [65]:
query = "SELECT itemtype, orderdate,orderid, shipdate FROM sales_orders_silver where itemtype = 'Fruits' ALLOW FILTERING"
rows = session.execute(query)
df = pd.DataFrame(rows)

In [64]:
session.execute("""Create table if not exists bigdata.sales_orders_gold2(itemType text, orderDate Date, OrderID int PRIMARY KEY, shipDate Date)""")

<cassandra.cluster.ResultSet at 0x7f057bbb6f20>

In [66]:
insert_query = """
    INSERT INTO sales_orders_gold2(itemtype, orderdate,
                              orderid, shipdate)
    VALUES (%s, %s, %s, %s);
"""


for _, row in df.iterrows():
    session.execute(insert_query, (
        row['itemtype'],
        row['orderdate'], row['orderid'], row['shipdate'])
    )

In [None]:
#now gold 3
#in this table we try to find which region has the most recent orders

In [68]:
query = "select orderdate , region ,OrderID from sales_orders_silver where orderdate > '2016-01-01' ALLOW FILTERING;"
rows = session.execute(query)
df = pd.DataFrame(rows)

In [69]:
session.execute("""Create table if not exists bigdata.sales_orders_gold3(orderDate Date,region text, OrderID int PRIMARY KEY)""")

<cassandra.cluster.ResultSet at 0x7f057cd84670>

In [71]:
insert_query = """
    INSERT INTO sales_orders_gold3(orderdate , region ,OrderID)
    VALUES (%s, %s, %s);
"""


for _, row in df.iterrows():
    session.execute(insert_query, (
        row['orderdate'],row['region'], row['orderid'])
    )