# Part I. ETL Pipeline for Pre-Processing the Files

## The following code is to preprocess the .CSV files containing the data.

#### Import Python packages 

In [71]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [74]:
# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [75]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


# Part II. Create tables to answer specific question in Apache Cassandra and load data into those tables. 

## The event_datafile_new.csv created in PART-I contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Apache Cassandra code starts below

#### Creating a Cluster

In [78]:
# Connect to Cassandra cluster
# (127.0.0.1)

from cassandra.cluster import Cluster
try:
    #connect to the local cluster and create a session to execute queries
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print("Error while trying to create cluster")
    print(e)

#### Create Keyspace

In [79]:
# Create the Keyspace:sparkify 
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify
    WITH REPLICATION = 
    {'class':'SimpleStrategy','replication_factor':1}"""
)
except Exception as e:
    print("Error while trying to create keyspace")
    print(e)

#### Set Keyspace

In [80]:
# Set keyspace to sparkify
try:
    session.set_keyspace('sparkify')
except Exception as e:
    print("Error while setting keyspace")
    print(e)

## The first table created below will be used to search the song that was played in a specific session and in a specific sequence number in that session. For example, 
## *Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4*

### Table Name: *song_played*

* The table ***song_played*** will be created with ***session_id*** as the PARTITION KEY since this query will be mainly searching based on Session IDs.
* The table will have ***item_in_session*** as the CLUSTERING COLUMN, because multiple songs can be played within a session. 
* The combination of ***session_id*** and ***item_in_session*** will result in a unique identifier of each song played in a session.  

In [81]:
#build and execute CREATE query
query = "CREATE TABLE IF NOT EXISTS song_played"
query = query + "(session_id int,item_in_session int,title text,artist text,length double, PRIMARY KEY(session_id,item_in_session))"
try:
    session.execute(query)
except Exception as e:
    print("Error while creating song_played table")
    print(e)                    

In [82]:
file = 'event_datafile_new.csv'

#open file
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
#read content line by line
    for line in csvreader:
        #build INSERT query
        query = "INSERT INTO song_played (session_id,item_in_session,title,artist,length)"
        query = query + "VALUES (%s,%s,%s,%s,%s)"
        try:
            #execute INSERT with values from the current line read from the file
            session.execute(query,(int(line[8]),int(line[3]),line[9],line[0],float(line[5])))
        except Exception as e:
            print("Error while inserting SessionID: {} and ItemId: {}".format(line[8],line[3]))
            print(e)

### Query the table created and loaded above to validate data

In [86]:
## Build SELECT query to validate data loaded above
query = "SELECT session_id,item_in_session,title,artist,length "
query = query + "FROM song_played "
query = query + "WHERE session_id=139 "
query = query + "AND item_in_session=4"

#execute query
try:
    rows = session.execute(query)
except Exception as e:
    print("Error while querying song_played")
    print(e)

#convert results inta a Data Frame
result = pd.DataFrame(list(rows),columns=['Session','Item in Session','Song','Artist','Song'])
result

Unnamed: 0,Session,Item in Session,Song,Artist,Song.1
0,139,4,Quem Quiser Encontrar O Amor,Tamba Trio,177.18812


## The second table created below will be used to get the list all songs played  by a specific user in a specific session. For example, 
## *Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182*

### Table Name: *all_songs_played_in_session*

* The table ***all_songs_played_in_session*** will be created with ***user_id & session_id*** as the PARTITION KEY since this query will be mainly searching based on these columns.
* The table will have ***item_in_session*** as the CLUSTERING COLUMN, because multiple songs could be played within a session by a user. It also serves the purpose of sorting the songs played by their sequence number. 
* The combination of ***session_id***, ***user_id***, and ***item_in_session*** will result in a unique identifier of each song played in a session by a user. 

In [87]:
#build and execute CREATE query
query = "CREATE TABLE IF NOT EXISTS all_songs_played_in_session"
query = query + "(session_id int,user_id int,item_in_session int,artist text,title text,fname text, lname text, "
query = query + "PRIMARY KEY((user_id,session_id),item_in_session))"
try:
    session.execute(query)
except Exception as e:
    print("Error while creating all_songs_played_in_session table")
    print(e)                    

In [88]:
#open file
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
#read content line by line
    for line in csvreader:
        #build INSERT query
        query = "INSERT INTO all_songs_played_in_session (session_id,user_id,item_in_session,artist,title,fname,lname)"
        query = query + "VALUES (%s,%s,%s,%s,%s,%s,%s)"
        try:
            #execute INSERT with values from the current line read from the file
            session.execute(query,(int(line[8]),int(line[10]),int(line[3]),line[0],line[9],line[1],line[4]))
        except Exception as e:
            print("Error while inserting SessionID: {}, UserID: {}, and Item:{}".format(line[8],line[10],line[3]))
            print(e)

### Query the table created and loaded above to validate data

In [90]:
## Build SELECT query to validate data loaded above
query = "SELECT artist,title,fname,lname "
query = query + "FROM all_songs_played_in_session "
query = query + "WHERE user_id=8 "
query = query + "AND session_id=139"

#execute query
try:
    rows = session.execute(query)
except Exception as e:
    print("Error while querying song_played")
    print(e)

#convert results inta DataFrame for better presentation
result = pd.DataFrame(list(rows),columns=['Artist','Song Title','User First Name','User Last Name'])
result

Unnamed: 0,Artist,Song Title,User First Name,User Last Name
0,Des'ree,You Gotta Be,Kaylee,Summers
1,Mr Oizo,Flat 55,Kaylee,Summers
2,Tamba Trio,Quem Quiser Encontrar O Amor,Kaylee,Summers
3,The Mars Volta,Eriatarka,Kaylee,Summers
4,Infected Mushroom,Becoming Insane,Kaylee,Summers
5,Blue October / Imogen Heap,Congratulations,Kaylee,Summers
6,Girl Talk,Once again,Kaylee,Summers


## The third table created below will be used to query list of all users who played a specific song. For example, 
## *Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'*

### Table Name: *all_users_who_played_song*

* The table ***all_users_who_played_song*** will be created with ***title*** as the CLUSTERING KEY since this query will be mainly searching based on specific song.
* The table will have ***session_id*** as the CLUSTERING COLUMN. 
* Though there is a possibility of a user playing a song multiple times within a session there is no need to capture that detail. 
* This table is only meant to search which users played a song; how times they played a song does not matter in this context. 

In [91]:
#build and execute CREATE query
query = "CREATE TABLE IF NOT EXISTS all_users_who_played_song"
query = query + "(song text,session_id int,fname text,lname text, "
query = query + "PRIMARY KEY(song,session_id))"
try:
    session.execute(query)
except Exception as e:
    print("Error while creating all_users_who_played_song table")
    print(e)                    

In [92]:
#open file
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
#read content line by line
    for line in csvreader:
        #build INSERT query
        query = "INSERT INTO all_users_who_played_song (song,session_id,fname,lname)"
        query = query + "VALUES (%s,%s,%s,%s)"
        try:
            #execute INSERT with values from the current line read from the file
            session.execute(query,(line[9],int(line[8]),line[1],line[4]))
        except Exception as e:
            print("Error while inserting Song: {} and Session: {}".format(line[9],line[8]))
            print(e)

### Query the table created and loaded above to validate data

In [94]:
## Build SELECT query to validate data loaded above
query = "SELECT fname,lname "
query = query + "FROM all_users_who_played_song "
query = query + "WHERE song='Secrets'"

#execute query
try:
    rows = session.execute(query)
except Exception as e:
    print("Error while querying song_played")
    print(e)

#convert results inta DataFrame for better presentation
result = pd.DataFrame(list(rows),columns=['User First Name','User Last Name'])
result

Unnamed: 0,User First Name,User Last Name
0,Sara,Johnson
1,Lily,Koch


### Drop the tables before closing out the sessions

In [95]:
#create DROP statements for all the three tables
drop_table1 = "DROP TABLE IF EXISTS song_played"
drop_table2 = "DROP TABLE IF EXISTS all_songs_played_in_session"
drop_table3 = "DROP TABLE IF EXISTS all_users_who_played_song"

#create list of all the above queries
drop_table_queries = [drop_table1,drop_table2,drop_table3]

#iterate through the list and execute each DROP statement
for drop_table in drop_table_queries:
    try:
        session.execute(drop_table)
    except Excetion as e:
        print("Error while executing: {}".format(drop_table))
        print(e)


### Close the session and cluster connection¶

In [96]:
session.shutdown()
cluster.shutdown()