# Part I. ETL Pipeline for Pre-Processing the Files

In [1]:
import numpy as np
import pandas as pd
import cassandra
import re
import os
import glob
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# Current working directory
print(os.getcwd())

/home/workspace


In [3]:
# event data 
filepath = os.getcwd() + '/event_data'

# Create list of files to collect each filepath
for root, dirs, files in os.walk(filepath):
    # glob enables joining of file path and roots with subdirectories
    file_path_list = glob.glob(os.path.join(root,'*'))

print(len(file_path_list), "files")

30 files


#### Processing files to create a central data file csv to be used for Apache Casssandra keyspace

In [4]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:
    # reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        # extracting each data row one by one and append it        
        for line in csvreader:
            full_data_rows_list.append(line) 

print("sample data:\n", full_data_rows_list[0], '\n\n', full_data_rows_list[1])

sample data:
 ['Barry Tuckwell/Academy of St Martin-in-the-Fields/Sir Neville Marriner', 'Logged In', 'Mohammad', 'M', '0', 'Rodriguez', '277.15873', 'paid', 'Sacramento--Roseville--Arden-Arcade, CA', 'PUT', 'NextSong', '1.54051E+12', '961', 'Horn Concerto No. 4 in E flat K495: II. Romance (Andante cantabile)', '200', '1.54328E+12', '88'] 

 ['Jimi Hendrix', 'Logged In', 'Mohammad', 'M', '1', 'Rodriguez', '239.82975', 'paid', 'Sacramento--Roseville--Arden-Arcade, CA', 'PUT', 'NextSong', '1.54051E+12', '961', 'Woodstock Inprovisation', '200', '1.54328E+12', '88']


In [5]:
print(len(full_data_rows_list), "rows in all files")

8056 rows in all files


create event_datafile_new.csv

In [6]:
# smaller event data csv file will be used to insert data into Apache Cassandra keyspace
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

# checking number of rows in new csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Setting up Apache Cassandra keyspace.

### The file  <font color=red>event_datafile_new.csv</font>, located within the Workspace directory, contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

A screenshot of the denormalized data appears here <br>

<img src="images/image_event_datafile_new.jpg" height=1050 width=1050>


#### Create a Cluster, then Keyspace

In [7]:
# Connection to a Cassandra instance on local machine (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, a session is needed
session = cluster.connect()

In [8]:
# Create Keyspace
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

# Set Keyspace

try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

### Primary Key for Query 1

The PK selected was based on the filter of `sessionId=338 and itemInSession=4`.
For this, these two columns serve as a proper composite PK.

In [9]:
query = "CREATE TABLE IF NOT EXISTS song_info_item4_session338"
query += "(sessionId int, itemInSession int, artist text, song text, length float, PRIMARY KEY (sessionId, itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [10]:
# set up the CSV file
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO song_info_item4_session338 (sessionId , itemInSession , artist , song , length )"
        query += "VALUES (%s, %s, %s, %s, %s)"
        ## Map corresponding column elements to named column in the INSERT statement.
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], round(float(line[5]), 2)))

#### verify proper insertion of data into each table

In [11]:
query = "SELECT artist, song, length FROM song_info_item4_session338 WHERE sessionId = 338 AND itemInSession = 4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

# preview
for row in rows:
    print(row.artist , row.song , round(row.length, 2) , sep='\t\t')

Faithless		Music Matters (Mark Knight Dub)		495.31


In [12]:
# display in Pandas DataFrame
rows = session.execute(query)

for row in rows:
    df1 = pd.DataFrame(
        [[row.artist , row.song , round(row.length, 1)]],
        index=[0],
        columns=['artist name' , 'song title', 'song length']
    )

df1

Unnamed: 0,artist name,song title,song length
0,Faithless,Music Matters (Mark Knight Dub),495.3


### Primary Key for Query 2

For a similar reason, the composite PK columns here were selected based on the filter of `userId=10 and sessionId=182`.
Keep in mind that the order of PK columns matters.

In [13]:
## Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name)\
## for userid = 10, sessionid = 182
query = "CREATE TABLE IF NOT EXISTS song_info_session182_user10"
query += "(userId int, sessionId int, itemInSession int, artist varchar, song varchar, firstName varchar, lastName varchar, PRIMARY KEY (userId , sessionId , itemInSession ))"
try:
    session.execute(query)
except Exception as e:
    print(e)

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO song_info_session182_user10 (userId , sessionId , itemInSession , artist , song , firstName , lastName)"
        query += "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, 
                        (
                            int(line[10]), # userId ,
                            int(line[8]),  # sessionId ,
                            int(line[3]),      # itemInSession ,
                            line[0],       # artist ,
                            line[9],       # song ,
                            line[1],       # firstName ,
                            line[4]       # lastName
                        )
                       )

#### verify data write

In [14]:
query = "SELECT artist, song, firstname, lastname FROM song_info_session182_user10 WHERE userid = 10 AND sessionid = 182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

# preview
for row in rows:
    print(row.artist , row.song , row.firstname , row.lastname, sep='\t\t\t')

Down To The Bone			Keep On Keepin' On			Sylvie			Cruz
Three Drives			Greece 2000			Sylvie			Cruz
Sebastien Tellier			Kilometer			Sylvie			Cruz
Lonnie Gordon			Catch You Baby (Steve Pitron & Max Sanna Radio Edit)			Sylvie			Cruz


In [15]:
# display in Pandas DataFrame
rows = session.execute(query)

i = -1
df2 = pd.DataFrame(data={'artist name': None , 'song title': None, 'first name': None, 'last name': None},
                  index=[i])

for row in rows:
    i += 1
    dfnew = pd.DataFrame(data={'artist name': row.artist ,
                               'song title': row.song, 
                               'first name': row.firstname , 
                               'last name': row.lastname}, index=[i]
                        )
    df2 = pd.concat([df2, dfnew], axis=0)
    
df2.drop(index=[-1], axis=0, inplace=True)

df2

Unnamed: 0,artist name,song title,first name,last name
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


### Primary Key for Query 3

This query calls for a simple, rather than composite, PK: `song='All Hands Against His Own'`.
Though there is no guarantee that this title is unique,
we can safely infer its uniqueness by observation.

In [16]:
## Query 3: Give me every user name (first and last) in my music app history
## who listened to the song 'All Hands Against His Own'
query = "CREATE TABLE IF NOT EXISTS users_of_song"
query += "(song varchar, userId int, firstName varchar, lastName varchar, PRIMARY KEY (song, userId))"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [17]:
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO users_of_song (song , userId , firstName , lastName )"
        query += "VALUES (%s, %s, %s, %s)"
        session.execute(query, 
                        (
                            line[9],       # song ,
                            int(line[10]), # userId ,
                            line[1],       # firstName ,
                            line[4]       # lastName
                        )
                       )

#### verify data write

In [18]:
query = "SELECT firstname, lastname, song FROM users_of_song WHERE song = 'All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    df3 = pd.DataFrame(
        [[row.firstname , row.lastname, row.song]],
        index=[0],
        columns=['first name' , 'last name', 'song title']
    )

df3

Unnamed: 0,first name,last name,song title
0,Jacqueline,Lynch,All Hands Against His Own


## Outcome

Apache Cassandra keyspace has been *successfully* employed to perform data modeling to support the 3 above queries:
* retrieval of artists and songs heard during a given sessionId and itemInSession.
* retrieval of artists, songs, and user based on given userId and sessionId.
* retrieval of users with history of having accessed a given song title.

# Close connection

### Drop the tables before closing out the sessions

In [19]:
for table in ['song_info_item4_in_session338', 'song_info_session182_user10', 'users_of_song']:
    query = "drop table {}".format(table)
    try:
        rows = session.execute(query)
    except Exception as e:
        print(e)

Error from server: code=2200 [Invalid query] message="unconfigured table song_info_item4_in_session338"


### Close the session and cluster connection¶

In [20]:
session.shutdown()
cluster.shutdown()