# Data modeling with Apache Cassandra

## Part I. ETL Pipeline for pre-processing files

### Setup

In [33]:
from pathlib import Path
import pandas as pd
import cassandra
from helpers import *
from queries import *

In [2]:
%load_ext autoreload
%autoreload 2

### Create a list of filepaths to process original event data csv files

In [3]:
root_path = Path.cwd()
data_path = root_path.joinpath("event_data")

In [4]:
file_path_list = [e for e in data_path.rglob("*.csv")]

In [5]:
print(f"Found {len(file_path_list)} CSV files in: {data_path}")

Found 30 CSV files in: /home/workspace/event_data


### Process individual files to create a single file that will be used for Apache Casssandra tables

Let's take a look at one individual csv file first

In [6]:
first_file = file_path_list[0]

In [7]:
!head $first_file

artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
Muse,Logged In,Harper,M,1,Barrett,209.50159,paid,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1.54069E+12,275,Supermassive Black Hole (Twilight Soundtrack Version),200,1.54172E+12,42
Beastie Boys,Logged In,Harper,M,2,Barrett,161.56689,paid,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1.54069E+12,275,Lighten Up,200,1.54172E+12,42
Shakira,Logged In,Harper,M,3,Barrett,145.84118,paid,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1.54069E+12,275,Pienso En Ti,200,1.54172E+12,42
Selena,Logged In,Harper,M,4,Barrett,172.66893,paid,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1.54069E+12,275,Amor Prohibido,200,1.54172E+12,42
Kid Cudi Vs Crookers,Logged In,Harper,M,5,Barrett,162.97751,paid,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1.54069E+12,275,Day 'N' Nite,200,1.54172E+12,42
Rise Against,Logged In,Harper,M,6,Barrett,179.591

In [8]:
first_df = pd.read_csv(first_file)

In [9]:
first_df.shape

(283, 17)

In [10]:
first_df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
0,Muse,Logged In,Harper,M,1,Barrett,209.50159,paid,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1540690000000.0,275,Supermassive Black Hole (Twilight Soundtrack V...,200,1541720000000.0,42.0
1,Beastie Boys,Logged In,Harper,M,2,Barrett,161.56689,paid,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1540690000000.0,275,Lighten Up,200,1541720000000.0,42.0
2,Shakira,Logged In,Harper,M,3,Barrett,145.84118,paid,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1540690000000.0,275,Pienso En Ti,200,1541720000000.0,42.0
3,Selena,Logged In,Harper,M,4,Barrett,172.66893,paid,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1540690000000.0,275,Amor Prohibido,200,1541720000000.0,42.0
4,Kid Cudi Vs Crookers,Logged In,Harper,M,5,Barrett,162.97751,paid,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1540690000000.0,275,Day 'N' Nite,200,1541720000000.0,42.0


Load all csv files into one dataframe and save it as CSV

In [11]:
try:
    df = pd.read_csv(f"{root_path}/event_datafile_new.csv")
    print("Loaded file from disk")
except FileNotFoundError:
    columns = ["artist", "firstName", "gender", "itemInSession", "lastName", "length", "level", "location", "sessionId", "song", "userId"]
    df = load_all_records(file_path_list, columns)
finally:
    print(f"Shape: {df.shape}")

Loaded file from disk
Shape: (8056, 17)


In [12]:
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
0,Miami Horror,Logged In,Kate,F,88,Harrell,250.8273,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540470000000.0,293,Sometimes,200,1541550000000.0,97.0
1,The White Stripes,Logged In,Kate,F,89,Harrell,241.8673,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540470000000.0,293,My Doorbell (Album Version),200,1541550000000.0,97.0
2,Juan Carmona,Logged In,Kate,F,90,Harrell,331.44118,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540470000000.0,293,Panales de Algodon,200,1541550000000.0,97.0
3,Alison Krauss / Union Station,Logged In,Kate,F,91,Harrell,171.04934,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540470000000.0,293,Restless,200,1541550000000.0,97.0
4,Bullet For My Valentine,Logged In,Kate,F,92,Harrell,235.65016,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540470000000.0,293,Begging For Mercy,200,1541550000000.0,97.0


In [13]:
df.dtypes

artist            object
auth              object
firstName         object
gender            object
itemInSession      int64
lastName          object
length           float64
level             object
location          object
method            object
page              object
registration     float64
sessionId          int64
song              object
status             int64
ts               float64
userId           float64
dtype: object

Set correct dtypes for ts (timestamp) and userId (int)

In [14]:
# Although userId should be of type int, because of a limitation of pandas < 0.24 (Series of type int can not hold NaNs) we leave it as int
# df["userId"] = df["userId"].fillna(0).astype("int64")
df["ts"] = df["ts"].astype("datetime64[ms]")

In [15]:
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
0,Miami Horror,Logged In,Kate,F,88,Harrell,250.8273,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540470000000.0,293,Sometimes,200,2018-11-07 00:20:00,97.0
1,The White Stripes,Logged In,Kate,F,89,Harrell,241.8673,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540470000000.0,293,My Doorbell (Album Version),200,2018-11-07 00:20:00,97.0
2,Juan Carmona,Logged In,Kate,F,90,Harrell,331.44118,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540470000000.0,293,Panales de Algodon,200,2018-11-07 00:20:00,97.0
3,Alison Krauss / Union Station,Logged In,Kate,F,91,Harrell,171.04934,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540470000000.0,293,Restless,200,2018-11-07 00:20:00,97.0
4,Bullet For My Valentine,Logged In,Kate,F,92,Harrell,235.65016,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540470000000.0,293,Begging For Mercy,200,2018-11-07 00:20:00,97.0


In [16]:
if not root_path.joinpath("event_datafile_new.csv").exists():
    df.to_csv(f"{root_path}/event_datafile_new.csv", index=False)
else:
    print("File event_datafile_new.csv does already exist")

File event_datafile_new.csv does already exist


## Part II. Data Modelling with Apache Cassandra

Now we are ready to work with the CSV file titled <font color=red>**event_datafile_new.csv**</font>, located within the W\workspace directory.  
The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Create Cluster

In [17]:
from cassandra.cluster import Cluster

cluster = Cluster()
session = cluster.connect()

#### Create Keyspace

In [18]:
# http://cassandra.apache.org/doc/latest/cql/ddl.html#create-keyspace
session.execute(cluster_create_keyspace)

<cassandra.cluster.ResultSet at 0x7f7116ddc710>

#### Set Keyspace

In [19]:
session.set_keyspace("sparkify")

## Create queries to find answers to the following three questions

1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4
2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

### Query 1

In [20]:
session.execute(q1_drop_table)

<cassandra.cluster.ResultSet at 0x7f7116ddc198>

In [21]:
session.execute(q1_create_table)

<cassandra.cluster.ResultSet at 0x7f7116dd96d8>

#### Insert data

In [22]:
table = "song_playlist_session"
cols = ["sessionId", "itemInSession", "artist", "song", "length"]
q1_cql = f"INSERT INTO {table} ({cols[0]}, {cols[1]}, {cols[2]}, {cols[3]}, {cols[4]}) VALUES (?, ?, ?, ?, ?)"
print(q1_cql)

INSERT INTO song_playlist_session (sessionId, itemInSession, artist, song, length) VALUES (?, ?, ?, ?, ?)


In [23]:
batch_insert(cql=q1_cql, cols=cols, data=df, size=500, session=session)

Starting batch insert for 8056 rows in 17 batches of size 500
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 473 rows of data
Inserted 473 rows of data
Batch insert finished


#### Perform sanity check

In [25]:
query(f"SELECT * FROM {table} LIMIT 5", session, print_result=False)

[Row(sessionid=23, iteminsession=0, artist='Regina Spektor', length=191.08526611328125, song='The Calculation (Album Version)'),
 Row(sessionid=23, iteminsession=1, artist='Octopus Project', length=250.95791625976562, song='All Of The Champs That Ever Lived'),
 Row(sessionid=23, iteminsession=2, artist='Tegan And Sara', length=180.06158447265625, song='So Jealous'),
 Row(sessionid=23, iteminsession=3, artist='Dragonette', length=153.39056396484375, song='Okay Dolores'),
 Row(sessionid=23, iteminsession=4, artist='Lil Wayne / Eminem', length=229.58975219726562, song='Drop The World')]

#### Execute query
_Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4_

In [26]:
q1 = query(q1_query, session)

[Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', length=495.30731201171875)]


### Query 2

In [34]:
session.execute(q2_drop_table)

<cassandra.cluster.ResultSet at 0x7f7116dc3588>

In [35]:
session.execute(q2_create_table)

<cassandra.cluster.ResultSet at 0x7f7116dcda20>

#### Insert data

In [36]:
table = "song_playlist_user"
cols = ["userId", "sessionId", "itemInSession", "artist", "song", "firstName", "lastName"]
q2_cql = f"INSERT INTO {table} ({cols[0]}, {cols[1]}, {cols[2]}, {cols[3]}, {cols[4]}, {cols[5]}, {cols[6]}) VALUES (?, ?, ?, ?, ?, ?, ?)"
print(q2_cql)

INSERT INTO song_playlist_user (userId, sessionId, itemInSession, artist, song, firstName, lastName) VALUES (?, ?, ?, ?, ?, ?, ?)


In [37]:
batch_insert(cql=q2_cql, cols=cols, data=df, size=250, session=session)

Starting batch insert for 8056 rows in 33 batches of size 250
Inserted 245 rows of data
Inserted 245 rows of data
Inserted 245 rows of data
Inserted 245 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Inserted 244 rows of data
Batch insert finished


#### Perform sanity check

In [39]:
query(f"SELECT * FROM {table} LIMIT 5", session, print_result=False)

[Row(userid=30.0, sessionid=872, iteminsession=0, artist='Pepper', firstname='Avery', lastname='Watkins', song='Dry Spell (LP Version)'),
 Row(userid=95.0, sessionid=222, iteminsession=32, artist='Britt Nicole', firstname='Sara', lastname='Johnson', song='Like A Star'),
 Row(userid=95.0, sessionid=222, iteminsession=31, artist='Britt Nicole', firstname='Sara', lastname='Johnson', song='Like A Star'),
 Row(userid=95.0, sessionid=222, iteminsession=30, artist='ZZ Top', firstname='Sara', lastname='Johnson', song='Avalon Hideaway (LP Version)'),
 Row(userid=95.0, sessionid=222, iteminsession=29, artist='The Knack', firstname='Sara', lastname='Johnson', song='Let Me Out')]

#### Execute query
_Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182_

In [40]:
# explicitly select itemInSession to show that sorting works
q2 = query(q2_query, session)

[Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz', iteminsession=3),
 Row(artist='Sebastien Tellier', song='Kilometer', firstname='Sylvie', lastname='Cruz', iteminsession=2),
 Row(artist='Three Drives', song='Greece 2000', firstname='Sylvie', lastname='Cruz', iteminsession=1),
 Row(artist='Down To The Bone', song="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz', iteminsession=0)]


### Query 3

In [41]:
session.execute(q3_drop_table)

<cassandra.cluster.ResultSet at 0x7f7116dd9c88>

In [42]:
session.execute(q3_create_table)

<cassandra.cluster.ResultSet at 0x7f7116dabf28>

#### Insert data

In [43]:
table = "song_user_name"
cols = ["song", "userId", "firstName", "lastName"]
q3_cql = f"INSERT INTO {table} ({cols[0]}, {cols[1]}, {cols[2]}, {cols[3]}) VALUES (?, ?, ?, ?)"
print(q3_cql)

INSERT INTO song_user_name (song, userId, firstName, lastName) VALUES (?, ?, ?, ?)


In [44]:
batch_insert(cql=q3_cql, cols=cols, data=df, size=500, session=session)

Starting batch insert for 8056 rows in 17 batches of size 500
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 474 rows of data
Inserted 473 rows of data
Inserted 473 rows of data
Batch insert finished


#### Perform sanity check

In [46]:
query(f"SELECT * FROM {table} LIMIT 5", session, print_result=False)

[Row(song="Wonder What's Next", userid=49.0, firstname='Chloe', lastname='Cuevas'),
 Row(song="In The Dragon's Den", userid=49.0, firstname='Chloe', lastname='Cuevas'),
 Row(song='Too Tough (1994 Digital Remaster)', userid=44.0, firstname='Aleena', lastname='Kirby'),
 Row(song='Rio De Janeiro Blue (Album Version)', userid=49.0, firstname='Chloe', lastname='Cuevas'),
 Row(song='My Place', userid=15.0, firstname='Lily', lastname='Koch')]

#### Execute query
_Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'_

In [47]:
q3 = query(q3_query, session)

[Row(firstname='Jacqueline', lastname='Lynch'),
 Row(firstname='Tegan', lastname='Levine'),
 Row(firstname='Sara', lastname='Johnson')]


### Drop tables before closing out session

In [48]:
[session.execute(q) for q in [q1_drop_table, q2_drop_table, q3_drop_table]]

[<cassandra.cluster.ResultSet at 0x7f711b390748>,
 <cassandra.cluster.ResultSet at 0x7f7117e12668>,
 <cassandra.cluster.ResultSet at 0x7f7116db5898>]

### Close session and cluster connection¶

In [49]:
session.shutdown()
cluster.shutdown()