# WEO4: Cassandra Python Programing

## lib imports

In [1]:
import pandas as pd 
from cassandra.cluster import Cluster


import random


In [2]:
## ! conda install -c conda-forge cassandra-cluster

## Create a session for cassandra

In [2]:
session = Cluster().connect()

In [3]:
rows = session.execute("desc keyspaces")

for row in rows:
    print(row)

Row(keyspace_name='m14', type='keyspace', name='m14')
Row(keyspace_name='system', type='keyspace', name='system')
Row(keyspace_name='system_auth', type='keyspace', name='system_auth')
Row(keyspace_name='system_distributed', type='keyspace', name='system_distributed')
Row(keyspace_name='system_schema', type='keyspace', name='system_schema')
Row(keyspace_name='system_traces', type='keyspace', name='system_traces')
Row(keyspace_name='system_views', type='keyspace', name='system_views')
Row(keyspace_name='system_virtual_schema', type='keyspace', name='system_virtual_schema')


## Delete existing keyspace

In [5]:
delete_keySpace= "DROP KEYSPACE m14; "
session.execute(delete_keySpace)

InvalidRequest: Error from server: code=2200 [Invalid query] message="Keyspace 'm14' doesn't exist"

## Create keySpace

In [6]:
create_keySpace = """
CREATE KEYSPACE IF NOT EXISTS m14 
WITH REPLICATION = {
    'class' : 'SimpleStrategy',
    'replication_factor' : 1
};
"""

_ = session.execute(create_keySpace)



In [9]:
rows = session.execute('desc keyspaces')

for row in rows:
    print(row)

Row(keyspace_name='m14', type='keyspace', name='m14')
Row(keyspace_name='system', type='keyspace', name='system')
Row(keyspace_name='system_auth', type='keyspace', name='system_auth')
Row(keyspace_name='system_distributed', type='keyspace', name='system_distributed')
Row(keyspace_name='system_schema', type='keyspace', name='system_schema')
Row(keyspace_name='system_traces', type='keyspace', name='system_traces')
Row(keyspace_name='system_views', type='keyspace', name='system_views')
Row(keyspace_name='system_virtual_schema', type='keyspace', name='system_virtual_schema')


##  Create table: Inventory 

In [10]:
create_table = """
CREATE TABLE IF NOT EXISTS  m14.inventory_by_sku (
sku int, 
name text, 
description text,
warehouse_num int,
PRIMARY KEY ((sku) )
);
"""

_= session.execute(create_table)

# Mock data for inventory table 


In [11]:
sku = [1001, 1002, 1003, 1004, 1005]
name = ['Red Dead Redemption 2', 'The Witcher 3: Wild Hunt', 'Grand Theft Auto V', 'Assassins Creed Odyssey', 'Fallout 4']
description = ['Action-adventure', 'Action RPG', 'Open world', 'Action RPG', 'Post-apocalyptic']
warehouse_num = [123, 456, 789]

mockup_data = [(sku[i], name[i], description[i], 123 if i < 3 else random.choice(warehouse_num)) for i in range(5)]

df = pd.DataFrame(mockup_data, columns=['sku', 'name', 'description', 'warehouse_num'])
df.head

<bound method NDFrame.head of     sku                      name       description  warehouse_num
0  1001     Red Dead Redemption 2  Action-adventure            123
1  1002  The Witcher 3: Wild Hunt        Action RPG            123
2  1003        Grand Theft Auto V        Open world            123
3  1004   Assassins Creed Odyssey        Action RPG            123
4  1005                 Fallout 4  Post-apocalyptic            456>

In [12]:
# insert statements 
print("Printing Insert queries...")
for _, row in df.iterrows():
    insert_statement = "INSERT INTO m14.inventory_by_sku (sku, name, description, warehouse_num) VALUES ({}, '{}', '{}', {});".format(row['sku'], row['name'].replace("'", "''"), row['description'].replace("'", "''"), row['warehouse_num'])
    print(insert_statement)

Printing Insert queries...
INSERT INTO m14.inventory_by_sku (sku, name, description, warehouse_num) VALUES (1001, 'Red Dead Redemption 2', 'Action-adventure', 123);
INSERT INTO m14.inventory_by_sku (sku, name, description, warehouse_num) VALUES (1002, 'The Witcher 3: Wild Hunt', 'Action RPG', 123);
INSERT INTO m14.inventory_by_sku (sku, name, description, warehouse_num) VALUES (1003, 'Grand Theft Auto V', 'Open world', 123);
INSERT INTO m14.inventory_by_sku (sku, name, description, warehouse_num) VALUES (1004, 'Assassins Creed Odyssey', 'Action RPG', 123);
INSERT INTO m14.inventory_by_sku (sku, name, description, warehouse_num) VALUES (1005, 'Fallout 4', 'Post-apocalyptic', 456);


## Insert mockup data

In [13]:
for _, row in df.iterrows():
    insert_statement = "INSERT INTO m14.inventory_by_sku (sku, name, description, warehouse_num) VALUES ({}, '{}', '{}', {});".format(row['sku'], row['name'], row['description'], row['warehouse_num'])
    _ = session.execute(insert_statement)
    print(_)


<cassandra.cluster.ResultSet object at 0x7f073a74d4b0>
<cassandra.cluster.ResultSet object at 0x7f073a7c11e0>
<cassandra.cluster.ResultSet object at 0x7f0740b85690>
<cassandra.cluster.ResultSet object at 0x7f0740b85780>
<cassandra.cluster.ResultSet object at 0x7f0740b868f0>


## Read our inventory table 

In [16]:
read_table = """
SELECt * from m14.inventory_by_sku
WHERE warehouse_num = 123;
"""

rows = session.execute(read_table)

for row in rows: 
    print(row)

InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"

## Create Index on Warehouse_num column

In [17]:
create_index = "CREATE  INDEX  warh_num_idx  on m14.inventory_by_sku(warehouse_num);"
_ = session.execute(create_index)

In [18]:
# run select again

rows = session.execute(read_table)
for row in rows:
    print(row)


Row(sku=1004, description='Action RPG', name='Assassins Creed Odyssey', warehouse_num=123)
Row(sku=1001, description='Action-adventure', name='Red Dead Redemption 2', warehouse_num=123)
Row(sku=1003, description='Open world', name='Grand Theft Auto V', warehouse_num=123)
Row(sku=1002, description='Action RPG', name='The Witcher 3: Wild Hunt', warehouse_num=123)


## Obseravations: 

1.The concept of partition key and clustory key is crucial in the cassandra database, we need to create partitions carefully.
2. In the above example, we have created the table with a 'sku' column as the partition key, this table is very useful for accessing a specific sku from the data.
3. However, if we try to access the warehouse_num we will get an error, becuase the table is now structured is, that the 'sku' is the partition key and all the rows are divided into partitions of sku values and stored on different nodes. we only have access to the top level of the data that is the 'SKU' column , we dont see warehouse_num , so this will trigger a full scan on the table,which causes read operations on every node in the cluster, which is quite resource intensive. 

4. Another appraoch is to create another verion of table ,this time warehouse_num as a partition key . so we can access individual warehouse_num without triggering a full scan on the cluster. 

# Create table with warehouse as parition key 

In [21]:
create_table_warehnum = create_table.replace('_by_sku', '_by_warehouse_num').replace('PRIMARY KEY ((sku) )', 'PRIMARY KEY ((warehouse_num) )')

print(create_table_warehnum)
_ = session.execute(create_table_warehnum)


CREATE TABLE IF NOT EXISTS  m14.inventory_by_warehouse_num (
sku int, 
name text, 
description text,
warehouse_num int,
PRIMARY KEY ((warehouse_num) )
);



In [22]:
# insert mockup data
for _, row in df.iterrows():
    insert_statement = "INSERT INTO m14.inventory_by_warehouse_num(sku, name, description, warehouse_num) VALUES ({}, '{}', '{}', {});".format(row['sku'], row['name'].replace("'", "''"), row['description'].replace("'", "''"), row['warehouse_num'])
    _ = session.execute(insert_statement)

In [23]:
# Read data 

read_table_warehnum = "Select * from m14.inventory_by_warehouse_num where warehouse_num = 123;"
rows = session.execute(read_table_warehnum)

for row in rows:
    print(row)

Row(warehouse_num=123, description='Action RPG', name='Assassins Creed Odyssey', sku=1004)


## Obserations: 

1. Notice ,we didn't get an error this time, nor we needed an index. this is becuase we are filtering over the partition key ,and it is very fast
2. Another point to mention is  that, this time the result of the read is different, we only got 1 record with warehouse_num 123, unline the previus(we got 4 rows).
3. The reason for this is that the cassandry follows are relaxed consistency and by defualt the consistency level is set to 1, which means if one of the node acknowledges the partition insert then the insert query operation is succesful, this means that the cassandra will reach eventual consistency over time. hence the results are not same for both the read queries. 
