# Project - Data Modeling with Apache Cassandra
---
>  
> **Stephanie Anderton**  
> DEND Project 1B  
> April 27, 2019  
>  

## Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Create list of filepaths to process original event CSV data files

In [2]:
# check the current working directory
print(os.getcwd())

# Get the current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    # join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    
    # print the file names to check that we're getting them all
    print(files)


/home/workspace
['2018-11-09-events.csv', '2018-11-17-events.csv', '2018-11-08-events.csv', '2018-11-23-events.csv', '2018-11-18-events.csv', '2018-11-25-events.csv', '2018-11-27-events.csv', '2018-11-07-events.csv', '2018-11-28-events.csv', '2018-11-03-events.csv', '2018-11-15-events.csv', '2018-11-22-events.csv', '2018-11-10-events.csv', '2018-11-19-events.csv', '2018-11-30-events.csv', '2018-11-12-events.csv', '2018-11-14-events.csv', '2018-11-06-events.csv', '2018-11-13-events.csv', '2018-11-04-events.csv', '2018-11-20-events.csv', '2018-11-01-events.csv', '2018-11-05-events.csv', '2018-11-11-events.csv', '2018-11-16-events.csv', '2018-11-26-events.csv', '2018-11-29-events.csv', '2018-11-02-events.csv', '2018-11-21-events.csv', '2018-11-24-events.csv']


#### Process the event files to create the data file CSV that will be used for Apache Casssandra tables

In [3]:
# initiate an empty list of rows that will be populated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

    # read CSV file 
    with open(f, 'r', encoding = 'utf8', newline = '') as csvfile: 
        # create a CSV reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        # extract each data row one by one and append it
        for line in csvreader:
            full_data_rows_list.append(line) 

# get total number of rows 
print(len(full_data_rows_list))

# create a file called 'event_datafile_new.csv' that will be used to
# insert data into the Apache Cassandra tables
csv.register_dialect('myDialect', quoting = csv.QUOTE_ALL, 
                     skipinitialspace = True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline = '') as f:
    writer = csv.writer(f, dialect = 'myDialect')
    writer.writerow(['artist', 'firstName', 'gender', 'itemInSession', 
                     'lastName', 'length', 'level', 'location', 'sessionId',
                     'song', 'userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], 
                         row[7], row[8], row[12], row[13], row[16]))


8056


In [4]:
# check the number of rows in the CSV file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


## Part II. Process data and build the Apache Cassandra database

The CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory, contains the following columns:  
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data looks like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

### Optional: Load and check the data in a DataFrame
**NOTE:**  I have added this section to use the dataframe in order to check the output from the CQL queries.

In [5]:
df = pd.read_csv('event_datafile_new.csv', encoding = 'utf-8')
df.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6820 entries, 0 to 6819
Data columns (total 11 columns):
artist           6820 non-null object
firstName        6820 non-null object
gender           6820 non-null object
itemInSession    6820 non-null int64
lastName         6820 non-null object
length           6820 non-null float64
level            6820 non-null object
location         6820 non-null object
sessionId        6820 non-null int64
song             6820 non-null object
userId           6820 non-null int64
dtypes: float64(1), int64(3), object(7)
memory usage: 586.2+ KB


In [6]:
print("Unique number within features:")
print("artist:     ", df['artist'].unique().size)
print("sessionId:  ", df['sessionId'].unique().size)
print("song:       ", df['song'].unique().size)
print("userId:     ", df['userId'].unique().size)


Unique number within features:
artist:      3148
sessionId:   776
song:        5190
userId:      96


### Create the Cluster and Keyspace

In [7]:
# Make a connection to a Cassandra instance on the local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster

try:
    cluster = Cluster()
    # instantiate session by establishing connection to begin executing queries
    session = cluster.connect()
except Exception as e:
    print(e)


In [8]:
# Create a KEYSPACE to work in
try:
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS stephanie
        WITH REPLICATION = 
        { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
    """)
except Exception as e:
    print(e)


In [9]:
# Connect to our KEYSPACE
try:
    session.set_keyspace('stephanie')
except Exception as e:
    print(e)


---
### The following three questions will be asked of the data

**1.** Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession = 4  
**2.** Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182  
**3.** Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'  


---
### Query 1:
- Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

**PRIMARY KEY:**  
This is a compound key consisting of a PARTITION by **sessionId** with CLUSTERING on the **itemInSession** column. These fields are used because the question asks for a group of records from a specific item in a session of a particular session Id.

**Table Data:**  
The data inserted into the tables are **sessionId** and **itemInSession** for the PRIMARY KEY, with **artist**, **song**, and **length** for the fields to be retrieved by the query.


#### Create and load the table

In [10]:
query = "CREATE TABLE IF NOT EXISTS music_1_song_item_in_session "
query = query + "(sessionId int, itemInSession int,  \
                  artist text, song text, length float,  \
                  PRIMARY KEY (sessionId, itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)


In [11]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)    # skip header

    for line in csvreader:
        # insert the music app event record
        query = "INSERT INTO music_1_song_item_in_session (  \
                                     sessionId, itemInSession,  \
                                     artist, song, length)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        values = (int(line[8]), int(line[3]), line[0], line[9], float(line[5]))
        try:
            session.execute(query, values)
        except Exception as e:
            print(e)


#### Run the SELECT query to answer the question

In [12]:
query = """
    SELECT artist, song, length
    FROM   music_1_song_item_in_session 
    WHERE  sessionId = 338 AND 
           itemInSession = 4
"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

print("Artist name with song title and song's length for the 4th item heard during sessionId 338")

# create DataFrame for displaying the results
cols = ['artist', 'song', 'length']
results = []

# insert the query results (rows) into the list
for row in rows:
    results.append((row.artist, row.song, round(row.length, 1)))

df_q1 = pd.DataFrame(results, columns = cols)
df_q1


Artist name with song title and song's length for the 4th item heard during sessionId 338


Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.3


**Double check with the data from the DataFrame**

In [13]:
df[(df.sessionId == 338) & (df.itemInSession == 4)][cols]

Unnamed: 0,artist,song,length
1258,Faithless,Music Matters (Mark Knight Dub),495.3073


---
### Query 2: 
- Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

**PRIMARY KEY:**  
This is a compound key consisting of a PARTITION by **userId** with CLUSTERING on the **sessionId** and **itemInSession** columns. These fields are used because the question asks for a group of records from a specific session Id of a particular user Id, with the returned songs sorted by their item position in the session.

**Table Data:**  
The data inserted into the tables are **userId**, **sessionId** and **itemInSession** for the PRIMARY KEY, with **artist**, **song**, **firstName** 
and **lastName** for the fields to be retrieved by the query.

#### Create and load the table

In [14]:
query = "CREATE TABLE IF NOT EXISTS music_2_song_item_for_user_session "
query = query + "(userId int, sessionId int, itemInSession int,  \
                  artist text, song text, firstName text, lastName text,  \
                  PRIMARY KEY ((userId), sessionId, itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)


In [15]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)    # skip header

    for line in csvreader:
        # insert the music app event record
        query = "INSERT INTO music_2_song_item_for_user_session (  \
                                     userId, sessionId, itemInSession,  \
                                     artist, song, firstName, lastName)"
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        values = (int(line[10]), int(line[8]), int(line[3]), 
                  line[0], line[9], line[1], line[4])
        try:
            session.execute(query, values)
        except Exception as e:
            print(e)


#### Run the SELECT query to answer the question

In [16]:
query = """
    SELECT artist, song, firstName, lastName
    FROM   music_2_song_item_for_user_session
    WHERE  userId = 10 AND
           sessionId = 182
"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

# Print the output:

print("List of songs for userid 10 during sessionid 182 :")
print("(sorted by item in session with artist name, and user's first and last name)\n")

# create DataFrame for displaying the results
cols = ['artist', 'song', 'firstName', 'lastName']
results = []

# insert the query results (rows) into the list
for row in rows:
    results.append((row.artist, row.song, row.firstname, row.lastname))

df_q2 = pd.DataFrame(results, columns = cols)
df_q2


List of songs for userid 10 during sessionid 182 :
(sorted by item in session with artist name, and user's first and last name)



Unnamed: 0,artist,song,firstName,lastName
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


**Double check with the data from the DataFrame**

In [17]:
df[(df.userId == 10) & (df.sessionId == 182)][cols]

Unnamed: 0,artist,song,firstName,lastName
6054,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
6055,Three Drives,Greece 2000,Sylvie,Cruz
6056,Sebastien Tellier,Kilometer,Sylvie,Cruz
6057,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


---
### Query 3: 
- Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

**PRIMARY KEY:**  
This is a compound key consisting of a PARTITION by **song** with CLUSTERING on the **userId** column. These fields are used because the question asks for a list of users who have listened to a particular song. The **userId** column is necessary for clustering in order to distinguish users who may have the same name.

**Table Data:**  
The data inserted into the tables are **song** and **userId** for the PRIMARY KEY, with **firstName** and **lastName** for the fields to be retrieved by the query.

#### Create and load the table

In [18]:
query = "CREATE TABLE IF NOT EXISTS music_3_all_users_with_song "
query = query + "(song text, userId int, firstName text, lastName text,  \
                  PRIMARY KEY (song, userId))"
try:
    session.execute(query)
except Exception as e:
    print(e)


In [19]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)    # skip header

    for line in csvreader:
        # insert the music app event record
        query = "INSERT INTO music_3_all_users_with_song (  \
                                     song, userId, firstName, lastName)"
        query = query + " VALUES (%s, %s, %s, %s)"
        values = (line[9], int(line[10]), line[1], line[4])
        try:
            session.execute(query, values)
        except Exception as e:
            print(e)


#### Run the SELECT query to answer the question

In [20]:
query = """
    SELECT firstName, lastName
    FROM   music_3_all_users_with_song
    WHERE  song = 'All Hands Against His Own'
"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

# Print the output:

print("All users who listened to the song 'All Hands Against His Own' :")

# create DataFrame for displaying the results
cols = ['firstName', 'lastName']
results = []

# insert the query results (rows) into the list
for row in rows:
    results.append((row.firstname, row.lastname))

df_q3 = pd.DataFrame(results, columns = cols)
df_q3


All users who listened to the song 'All Hands Against His Own' :


Unnamed: 0,firstName,lastName
0,Jacqueline,Lynch
1,Tegan,Levine
2,Sara,Johnson


**Double check with the data from the DataFrame**

In [21]:
df[df.song == 'All Hands Against His Own'][cols]

Unnamed: 0,firstName,lastName
1804,Sara,Johnson
1935,Tegan,Levine
3541,Jacqueline,Lynch


### Drop the tables before closing out the sessions

In [22]:
query = "DROP TABLE music_1_song_item_in_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [23]:
query = "DROP TABLE music_2_song_item_for_user_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [24]:
query = "DROP TABLE music_3_all_users_with_song"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

### Close the session and cluster connection¶

In [25]:
session.shutdown()
cluster.shutdown()