# Part I. ETL Pipeline for Pre-Processing the Files

#### Import required Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Create list of filepaths to process original event csv data files

In [2]:
# Get the current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    

#### Process the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading the csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
# extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Apache Cassandra coding portion of the project

## The new csv file <font color=red>event_datafile_new.csv</font> is generated with below fields is used as source for loading all the tables: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

Below is the screenshot of the same file <font color=red>**event_datafile_new.csv**</font>

<img src="images/image_event_datafile_new.jpg">

#### Create the Cluster

In [5]:
# Create a connection to the local Cassandra instance

from cassandra.cluster import Cluster
cluster = Cluster()

# Establish the connection return a session
session = cluster.connect()

#### Create Keyspace

In [6]:
# Create the Keyspace "songplay" for the tables
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS songplay 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)


#### Set Keyspace

In [7]:
# Set KEYSPACE to "songplay"

try:
    session.set_keyspace('songplay')
except Exception as e:
    print(e)

### Next step - create the tables to run the following queries.

## Tables are created to model to answer below queries

#### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


#### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

#### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'


#### Query 1: Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

The table that will be created is names as "song_session" to show case that the query is centered around song and the sesson.

Below will be the Keys for the table - 

PRIMARY KEY - sessionid, iteminsession (Since the user query is filtered on these columns, these are included in Primary Key)

COMPOSITE KEY - artistname, songtitle, songlength (Since the query wants these columns in the result set, these are added as Composite Keys)

### Create the table in Apache Cassandra

In [8]:
# Query 1:  Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4
# Below commands create the table "song_session" in the keyspace

query = "CREATE TABLE IF NOT EXISTS song_session "
query = query + "(session_id int, item_in_session int, artist text, song_title text, song_length float, \
PRIMARY KEY ((session_id, item_in_session), artist, song_title, song_length))"
try:
    session.execute(query)
except Exception as e:
    print(e)


### Insert the data into the table

In [9]:
# Below commands insert the data into the table - song_session

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO song_session (session_id, item_in_session, artist, song_title, song_length) "
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))


#### Do a SELECT to verify that the data in the table

In [10]:
# SELECT statement to validate the data that was entered into the table - song_session for Query 1

query = "SELECT artist, song_title, song_length FROM song_session WHERE session_id = 338 AND item_in_session = 4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)


for row in rows:
    print ("Artist:",row.artist, "\tSong Title:", row.song_title, "\tSong Length:", row.song_length)


Artist: Faithless 	Song Title: Music Matters (Mark Knight Dub) 	Song Length: 495.30731201171875


#### Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

The table that will be created is names as "user_session" to show case that the query is centered around user and the sesson.

Below will be the Keys for the table - 

PRIMARY KEY - user_id, sessionid (Since the user query is filtered on these columns, these are included in Primary Key)

COMPOSITE KEY - artistname, iteminsessio, songtitle, username (firstname and lastname) (Since the query wants these columns in the result set, these are added as Composite Keys)

### Create the table in Apache Cassandra

In [11]:
# Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
# Below commands create the table - user_session in the keyspace

query = "CREATE TABLE IF NOT EXISTS user_session "
query = query + "(user_id int, session_id int, artist_name text, item_in_session int, song_title text, user_first_name text, user_last_name text, \
PRIMARY KEY ((user_id, session_id), artist_name, item_in_session, song_title, user_first_name, user_last_name))"
try:
    session.execute(query)
except Exception as e:
    print(e)


### Insert the data into the table

In [12]:
# Below commands insert rows into the table - user_session from the event_datafile_new.csv

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO user_session (user_id, session_id, artist_name, item_in_session, song_title, user_first_name, user_last_name) "
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[10]), int(line[8]), line[0], int(line[3]), line[9], line[1], line[4]))


#### Do a SELECT to verify that the data in the table

In [13]:
# Validate the Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

query = "SELECT artist_name, song_title, user_first_name, user_last_name FROM user_session WHERE user_id = 10 AND session_id = 182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    print ("Artist Name:",row.artist_name, "\tSong Title:",row.song_title, "\tUser First Name:",row.user_first_name, "\tUser Last Name:",row.user_last_name)


Artist Name: Down To The Bone 	Song Title: Keep On Keepin' On 	User First Name: Sylvie 	User Last Name: Cruz
Artist Name: Lonnie Gordon 	Song Title: Catch You Baby (Steve Pitron & Max Sanna Radio Edit) 	User First Name: Sylvie 	User Last Name: Cruz
Artist Name: Sebastien Tellier 	Song Title: Kilometer 	User First Name: Sylvie 	User Last Name: Cruz
Artist Name: Three Drives 	Song Title: Greece 2000 	User First Name: Sylvie 	User Last Name: Cruz


#### Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

The table that will be created is names as "user_song" to show case that the query is centered around user and the song.

Below will be the Keys for the table - 

PRIMARY KEY - songtitle (Since the user query is filtered based on songtitle, it is included in Primary Key)

COMPOSITE KEY - username (firstname and lastname), user_id (Since the query wants these columns in the result set, these are added as Composite Keys. userid is included in the key just to make the rows as unique; it is not being queried here)

### Create the table in Apache Cassandra

In [14]:
# Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
# Below commands create the table user_song to answer Query 3

query = "CREATE TABLE IF NOT EXISTS user_song "
query = query + "(song_title text, user_first_name text, user_last_name text, user_id int, \
PRIMARY KEY ((song_title), user_first_name, user_last_name, user_id))"

try:
    session.execute(query)
except Exception as e:
    print(e)

### Insert the data into the table

In [15]:
# Below commands insert data into user_song table from event_datafile_new.csv file

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO user_song (song_title, user_first_name, user_last_name, user_id) "
        query = query + "VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[9], line[1], line[4], int(line[10])))


#### Do a SELECT to verify that the data in the table

In [16]:
# Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
# SELECT statement to validate the data for user_song table to validate Query 3

query = "SELECT user_first_name, user_last_name FROM user_song WHERE song_title = 'All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    print ("User First Name:",row.user_first_name, "\tUser Last Name:",row.user_last_name)


User First Name: Jacqueline 	User Last Name: Lynch
User First Name: Sara 	User Last Name: Johnson
User First Name: Tegan 	User Last Name: Levine


### Drop the tables before closing out the sessions

In [17]:
# Drop the table - song_session (for Query 1) before closing out the sessions

query = "DROP TABLE song_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)


In [18]:
# Drop the table user_session (for Query 2) before closing out the sessions

query = "DROP TABLE user_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [19]:
# Drop the table user_song (for Query 3) before closing out the sessions

query = "DROP TABLE user_song"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)


In [20]:
# Close the session and cluster connection

session.shutdown()
cluster.shutdown()