### Sparkify Analysis Queries using Cassandra


#### Prepared by Robin Fladebo, September 2019

### Purpose: Use Cassandra to query the history from the Sparkify music application.

In [1]:
# Import Python packages
import cassandra
import os
import glob
import csv

### Process input files (code supplied by Udacity)

In [2]:
# Create a list of filepaths to process original event csv data files

# Check current working directory
print(os.getcwd())

# Get the current folder and subfolder containing event data
filepath = os.getcwd() + '/event_data'

# Use a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    # Join the file path and root path with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))

# Process the files to create the comma-separated data file that will be
# used for Apache Casssandra tables

# Initiate an empty list of rows to be generated from each file
full_data_rows_list = []

# Read every file in the file path list
for f in file_path_list:
    with open(f, 'r', encoding='utf8', newline='') as csvfile:
        # creating a csv reader object
        csvreader = csv.reader(csvfile)
        next(csvreader)
        # Append each line in the file to the full list of rows        
        for line in csvreader:
            full_data_rows_list.append(line)

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL,
    skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding='utf8', 
newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist_name', 'user_first_name', 'user_gender', 'item_in_session',
                     'user_last_name','song_length', 'user_level','location','session_id',
                     'song_title','user_id'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], 
                         row[7], row[8], row[12], row[13], row[16]))

/home/workspace


In [3]:
# Report the number of rows in the csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


### Create and connect to Cassandra cluster, session and keyspace. Keyspace name is sparkify.

In [4]:
# Connect to a Cassandra instance
from cassandra.cluster import Cluster
cluster = Cluster()

# Initiate a session
session = cluster.connect()

# Create a keyspace for sparkify
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)
except Exception as e:
    print(e)
    
# Connect to the keyspace for sparkify
try:
    session.set_keyspace('sparkify')
except Exception as e:
    print(e)

### Create tables and insert data

The music_history_session table is designed to support queries that select based on session ID and item within session. Specifically: From the music app history file, show the artist, song title and song length that was played as item 4 in session 338.

In [5]:
# Create history_session table
create = "CREATE TABLE IF NOT EXISTS history_session "
create = create + "(session_id int, item_in_session int, artist_name text, song_title text, song_length float, PRIMARY KEY (session_id, item_in_session))"
try:
    session.execute(create)
except Exception as e:
    print(e)

In [6]:
file = 'event_datafile_new.csv'

In [7]:
# Insert data to the history_session table
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        ins = "INSERT INTO history_session (session_id, item_in_session, artist_name, song_title, song_length) "
        ins = ins + "VALUES (%s, %s, %s, %s, %s)"
        session.execute(ins, (int(line[8]), int(line[3]), line[0], 
            line[9], float(line[5])))

The history_session_user table is designed to support queries that select based on user ID and session ID. Specifically: Return the name of the artist, song title and first and last name of the user for userid = 10, sessionid = 182; sort by item in session.

In [8]:
# Create history_session_user table
# Compound partition key will determine how data is divided among nodes. Clustering key will determine how data is 
# sorted within a partition (Reference: https://stackoverflow.com/questions/24949676  dated 2014)
create = "CREATE TABLE IF NOT EXISTS history_session_user "
create = create + "(user_id int, session_id int, item_in_session int, artist_name text, song_title text, user_first_name text, user_last_name text, PRIMARY KEY ((user_id, session_id), item_in_session))"
try:
    session.execute(create)
except Exception as e:
    print(e)

In [9]:
# Insert data into the history_session_user table
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        ins = "INSERT INTO history_session_user (user_id, session_id, item_in_session, artist_name, song_title, user_first_name, user_last_name) "
        ins = ins + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(ins, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

The history_song_user table is designed to support queries that select based on song title. Specifically: From the music app history files, display the user name (first and last) who listened to the song 'All Hands Against His Own'.

In [10]:
# Create history_song_user table
# Session ID is part of the key to satisfy uniqueness where user listens to same song more than once
create = "CREATE TABLE IF NOT EXISTS history_song_user "
create = create + "(song_title text, user_id int, session_id int, user_first_name text, user_last_name text, PRIMARY KEY (song_title, user_id))"
try:
    session.execute(create)
except Exception as e:
    print(e)   

In [11]:
# Insert data into the history_song_user table
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        ins = "INSERT INTO history_song_user (user_id, song_title, user_first_name, user_last_name) "
        ins = ins + "VALUES (%s, %s, %s, %s)"
        session.execute(ins, (int(line[10]), line[9], line[1], line[4]))

### Run queries and validate results

#### Query 1: Show the artist name, song title and song length which were played 4th in session 338

In [12]:
query = "SELECT artist_name, song_title, song_length from history_session WHERE session_id = 338 and item_in_session = 4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist_name, row.song_title, row.song_length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


#### .

#### Query 2: Return the name of the artist, the song title and the first and last name of the user for userid = 10, sessionid = 182; sort by item in session

In [13]:
query = "SELECT artist_name, song_title, user_first_name, user_last_name FROM history_session_user WHERE user_id = 10 AND session_id = 182 ORDER BY item_in_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist_name, row.song_title, row.user_first_name, row.user_last_name)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


#### .

#### Query 3: From the music app history files, display the user name (first and last) who listened to the song 'All Hands Against His Own'.

In [14]:
query = "SELECT user_first_name, user_last_name FROM history_song_user WHERE song_title = 'All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.user_first_name, row.user_last_name)

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Drop tables. Close session and cluster connection

In [15]:
drop = "DROP TABLE history_session"
try:
    rows = session.execute(drop)
except Exception as e:
    print(e)
    
drop = "DROP TABLE history_session_user"
try:
    rows = session.execute(drop)
except Exception as e:
    print(e)
    
drop = "DROP TABLE history_song_user"
try:
    rows = session.execute(drop)
except Exception as e:
    print(e)

In [16]:
session.shutdown()
cluster.shutdown()