# Relational Data Modelling for Cassandra Database

## Import Python Packages

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

## Extract Data from CSV Files

### Creating list of file paths to process event csv data files

In [5]:
filepath = os.getcwd() + "/event_data"

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root, "*"))

### Extract data from event files

In [6]:
# Initiate an empty list of rows that will be generated from each file
full_data_rows_list = []

# Loop thru every filepath in the file path list

for f in file_path_list:

    # Read CSV File
    with open(f, "r", encoding='utf8', newline="") as csvfile:
        
        # Create new csv reader object
        csv_reader = csv.reader(csvfile)
        next(csv_reader)
    
        # Extract Data row by row and append to the list
        for row in csv_reader:
            full_data_rows_list.append(row)

# Print length of data list   
print(len(full_data_rows_list))

# Print actual data list
#print(full_data_rows_list[0:5])

8056


In [11]:
print(full_data_rows_list[0])

['', 'Logged In', 'Walter', 'M', '0', 'Frye', '', 'free', 'San Francisco-Oakland-Hayward, CA', 'GET', 'Home', '1.54092E+12', '38', '', '200', '1.54111E+12', '39']


###  Load extracted data into single csv file to be used to load Cassanda DB

In [7]:
csv.register_dialect("my_dialect", quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open("event_data.csv", 'w', encoding='utf8', newline='') as f:
    writer = csv.writer(f, dialect='my_dialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        
        #Columns to be used are specified in the index
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [9]:
# check the number of rows in your csv file
with open('event_data.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


## Apache Cassandra Provisioning and Set-Up

## The event_datafile.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=orange>**event_data.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

### Create a Cluster and Connect to DB

In [12]:
from cassandra.cluster import Cluster

try:
    cluster = Cluster(['localhost'])  # Use the Cassandra container's IP address
    session = cluster.connect()
    print("Cassandra DB Cluster Successfully Connected!!!")
except Exception as e:
    print(e)

Cassandra DB Cluster Successfully Connected!!!


### Create KeySpace

In [13]:
try:
    session.execute("""CREATE KEYSPACE IF NOT EXISTS sparkify WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }""")
    print("Keyspace Created Successfully")
except Exception as e:
    print(e)

Keyspace Created Successfully


### Set Keyspace

In [14]:
try:
    session.set_keyspace("sparkify")
    print("Sparkify Keyspace set up successfully")
except Exception as e:
    print(e)

Sparkify Keyspace set up successfully


## Cassandra DB Query Set-Up

### Query #1

#### Give me the artist, song title and song's length in the database where the sessionId = 338, and itemInSession = 4

To answer this question we will need to obtain (select) the artist name, song name, and song length from out table, and we will need to filter by sessionId and itemInSession.
In CQL our query looks like:

*SELECT artist, song_title, song_length FROM session_songs WHERE sessionId = 338 AND itemInSession = 4*

- We will name our table **session_songs**
- Our primary key will consist of partition key sessionId, and clustering key itemInSession so that we can filter by this attributes later on.
- The columns of our table will be: sessionId, itemInSession, artist, song_title and song_length.


#### Create Table

In [40]:
try:
    session.execute("""
    CREATE TABLE IF NOT EXISTS session_songs
    (sessionId int, itemInSession int, artist text, song_title text, song_length float,
    PRIMARY KEY(sessionId, itemInSession))
    """)
    print("Table Created Successfully")
except Exception as e:
    print(e)



Table Created Successfully


#### Insert Data into DB Table

In [23]:
file = 'event_data.csv'

with open(file, encoding='utf-8') as f:
    csv_reader = csv.reader(f)
    next(csv_reader)

    for row in csv_reader:
        query = "INSERT INTO session_songs (sessionId, itemInSession, artist, song_title, song_length)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        
        # Extracts columns from CSV file and attaches them to variables
        artist_name, user_name, gender, itemInSession, user_last_name, length, level, location, sessionId, song, userId = row
        session.execute(query, (int(sessionId), int(itemInSession), artist_name, song, float(length)))



#### Verify that Data is in DB Table

In [31]:
query = "SELECT * FROM session_songs"
result = session.execute(query)

# Convert the result to a Pandas DataFrame
df = pd.DataFrame(result)

print(df.head())


   sessionid  iteminsession              artist  song_length  \
0         23              0      Regina Spektor   191.085266   
1         23              1     Octopus Project   250.957916   
2         23              2      Tegan And Sara   180.061584   
3         23              3          Dragonette   153.390564   
4         23              4  Lil Wayne / Eminem   229.589752   

                          song_title  
0    The Calculation (Album Version)  
1  All Of The Champs That Ever Lived  
2                         So Jealous  
3                       Okay Dolores  
4                     Drop The World  


#### Execute Query 1

In [26]:
rows = session.execute("""SELECT artist, song_title, song_length FROM session_songs WHERE sessionId = 338 AND itemInSession = 4""")

for row in rows:
    print(row.artist, row.song_title, row.song_length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


### Query #2

#### Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

To answer this question we will need to obtain (select) the artist name, song name, user name and user lastname from out table, we will need to filter by userId and sessionId, and order by itemInSession. 

In CQL our query looks like:

*SELECT itemInSession, artist, song, firstName, lastName FROM user_songs WHERE userId = 10 AND sessionId = 182*

- We will name our table **user_songs**
- Our primary key will consist of composite partition key userId, sessionId. The reason for this is that if we only use userId as partition key, the sessionid which belongs to the same user will be put into different nodes, which will have the performance issue when the volume of data is large.
- Our clustering key will be itemInSession so that our results are order by it.
- The columns of our table will be: userId, sessionId, itemInSession, artist, song and firstName and lastName.

#### Create Table

In [32]:
try:
    session.execute("""
    CREATE TABLE IF NOT EXISTS user_songs
    (userId int, sessionId int, artist text, song text, firstName text, lastName text, itemInSession int,
    PRIMARY KEY((userId, sessionId), itemInSession))
    """)
    print("Table Created Successfully")

except Exception as e:
    print(e)

Table Created Successfully


#### Insert Data into DB Table

In [33]:
file = 'event_data.csv'

with open(file, encoding = 'utf8') as f:
    csv_reader = csv.reader(f)
    next(csv_reader) 
    for row in csv_reader:
        query = "INSERT INTO user_songs (userId, sessionId, artist, song, firstName, lastName, itemInSession)"
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        
        # Extracts columns from CSV file and attaches them to variables
        artist, firstName, gender, itemInSession, lastName, length, level, location, sessionId, song, userId = row
        session.execute(query, (int(userId), int(sessionId), artist, song, firstName, lastName, int(itemInSession)))

#### Verify that Data is in DB Table

In [34]:
query = "SELECT * FROM user_songs"
result = session.execute(query)

# Convert the result to a Pandas DataFrame
df = pd.DataFrame(result)

print(df.head())


   userid  sessionid  iteminsession                 artist firstname lastname  \
0      58        768              0       System of a Down     Emily   Benson   
1      58        768              1  Ghostland Observatory     Emily   Benson   
2      58        768              2      Evergreen Terrace     Emily   Benson   
3      85        776              2               Deftones   Kinsley    Young   
4      85        776              3   The Notorious B.I.G.   Kinsley    Young   

                            song  
0                     Sad Statue  
1                 Stranger Lover  
2                           Zero  
3           Head Up (LP Version)  
4  Playa Hater (Amended Version)  


#### Execute Query

In [35]:
rows = session.execute("""SELECT itemInSession, artist, song, firstName, lastName FROM user_songs WHERE userId = 10 AND sessionId = 182""")

for row in rows:
    print(row.iteminsession, row.artist, row.song, row.firstname, row.lastname)

0 Down To The Bone Keep On Keepin' On Sylvie Cruz
1 Three Drives Greece 2000 Sylvie Cruz
2 Sebastien Tellier Kilometer Sylvie Cruz
3 Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


### Query #3

#### Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

To answer this question we will need to obtain (select) the user first name and lastname from out table, and we will need to filter by song name. As user name and lastname, in large datasets, are not unique, we will add the column userId to uniquely identify users.

In CQL our query looks like:

*SELECT firstName, lastName FROM app_history WHERE song = 'All Hands Against His Own'*

- We will name our table **app_history**
- Our primary key will consist of partition key song, and clustering key userId. This uniquely identifies our rows.
- The columns of our table will be: song, firstName, lastName and userId.

#### Create Table

In [36]:
try:
    session.execute("""
    CREATE TABLE IF NOT EXISTS app_history
    (song text, firstName text, lastName text, userId int,
    PRIMARY KEY(song, userId))
    """)
    print("Table Created Successfully")
except Exception as e:
    print(e)

Table Created Successfully


#### Insert Data into DB Table

In [37]:
file = 'event_data.csv'

with open(file, encoding = 'utf8') as f:
    csv_reader = csv.reader(f)
    next(csv_reader) # skip header
    for line in csv_reader:
        query = "INSERT INTO app_history (song, firstName, lastName, userId)"
        query = query + " VALUES (%s, %s, %s, %s)"
        artist, firstName, gender, itemInSession, lastName, length, level, location, sessionId, song, userId = line
        session.execute(query, (song, firstName, lastName, int(userId)))

#### Verify that the Data is in the DB Table

In [38]:
query = "SELECT * FROM app_history"
result = session.execute(query)

# Convert the result to a Pandas DataFrame
df = pd.DataFrame(result)

print(df.head())

                                  song  userid firstname lastname
0                   Wonder What's Next      49     Chloe   Cuevas
1                  In The Dragon's Den      49     Chloe   Cuevas
2    Too Tough (1994 Digital Remaster)      44    Aleena    Kirby
3  Rio De Janeiro Blue (Album Version)      49     Chloe   Cuevas
4                             My Place      15      Lily     Koch


#### Execute Query

In [39]:
rows = session.execute("""SELECT firstName, lastName FROM app_history WHERE song = 'All Hands Against His Own'""")

for row in rows:
    print(row.firstname, row.lastname)

Jacqueline Lynch
Tegan Levine
Sara Johnson


## Drop Tables and Close Session

In [41]:
try:
    session.execute("""DROP TABLE user_songs""")
    print("Table: `user_songs` dropped successfully")
except Exception as e:
    print(e)

try:
    session.execute("""DROP TABLE session_songs""")
    print("Table: `session_songs` dropped successfully")
except Exception as e:
    print(e)

try:
    session.execute("""DROP TABLE app_history""")
    print("Table: `app_history` dropped successfully")
except Exception as e:
    print(e)


Table: `user_songs` dropped successfully
Table: `session_songs` dropped successfully
Table: `app_history` dropped successfully


In [42]:
try:
    session.shutdown()
    print("Apache Cassandra DB Session Shut Down Successful")
    cluster.shutdown()
    print("Apache Cassandra DB Cluster Shut Down Successful")
except Exception as e:
    print(e)

Apache Cassandra DB Session Shut Down Successful
Apache Cassandra DB Cluster Shut Down Successful
