# ETL Pipeline for Sparkify Apache Cassandra Database

## Part 1. Data Pre-Processing

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


## Part 2. Create Apache Cassandra Tables and Insert Data

### 2.1 Data Description

The data in the CSV is comprised of the following columns and datatype:

| Field                     | Data Type  |
 |------------------------  | ---------- |
| artist                    | text       |
| firstname                 | text       |
| gender                    | text       |
| item number in session    | int        |
| last name of user         | text       |
| length of the song        | float      |
| level (paid or free song) | text       |
| location of the user      | text       |
| sessionId                 | int        |
| song title                | text       |
| userId                    | int        |

Below is a screenshot of the csv

![top five](./images/image_event_datafile_new.jpg)


### 2.2 Create Cluster, Keyspacae, and set Keyspace

In [5]:
# Create Cluster
from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])

In [8]:
# Connect to the cluster and establish a session
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

In [9]:
# Create the Keyspace
session.execute("""
CREATE KEYSPACE IF NOT EXISTS sparkifydb 
WITH REPLICATION = 
{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

<cassandra.cluster.ResultSet at 0x7fea2a85e4a8>

In [10]:
# Set Keyspace
session.set_keyspace('sparkifydb')

### 2.3 Create Table, which upon query will return artist, song title, song's length during a session and item in session. (Query 1)

#### 2.3.1 Create table.
Primary Key is the sessionId and the itemInSession.  These fields are the fields which will be queried against, as such they were desginated as the Primary Key.  The remaining fields were created in order based upon thier usage.  For example, the query given asks to return the artist, song title, and song length, these are created after the Primary Key fields of sessionID and itemInSession.

In [11]:
query = "CREATE TABLE IF NOT EXISTS music_by_seesionId "
query = query + "(artist text, firstName text, gender text, itemInSession int, lastName text, length float, level text, location text, sessionId int, song text, userId int, PRIMARY KEY (sessionId, itemInSession))"
session.execute(query)  

<cassandra.cluster.ResultSet at 0x7fea2a858828>

#### 2.3.2 Insert data into table, inserts are ordered from Primary Keys to expected query select fields to the reaming fields.

In [12]:
# Read in the datasete
df= pd.read_csv('event_datafile_new.csv')

In [13]:
# Build the query
query = "INSERT INTO music_by_seesionId (sessionId, itemInSession, artist, song, length, firstName, gender, lastName, level, location, userId)"
query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"

In [14]:
# Insert the data into the table
for index, row in df.iterrows():
    session_data = [row['sessionId'], row['itemInSession'], row['artist'], row['song'], row['length'], row['firstName'], row['gender'], row['lastName'], row['level'], row['location'], row['userId']]
    session.execute(query, session_data)

#### 2.3.3 Query the music_by_sesssionID to see if it returns the expected results

In [15]:
# Build the query
query = " select artist, song, length, location from music_by_seesionId WHERE sessionid=338 and iteminsession=4"

In [16]:
# Execute the query and view the results in a dataframe
qdf = pd.DataFrame(list(session.execute(query)))
qdf

Unnamed: 0,artist,song,length,location
0,Faithless,Music Matters (Mark Knight Dub),495.307312,"New Haven-Milford, CT"


### 2.4 Create Table, which upon query will return the artist, song (sorted by itemInSession) and user (firstName and lastName) based on userId and sessionId. (Query 2)

#### 2.4.1 Create table.
Primary Key is the uderId and sessionId with a clustering key of itemInSession.  These fields are the fields which will be queried against, as such they were desginated as the Primary Key.  The remaining fields were created in order based upon thier usage.  For example, the query given asks to return the artist, song title, and user, these are created after the Primary Key fields of sessionID and itemInSession.

In [17]:
query = "CREATE TABLE IF NOT EXISTS music_by_userid "
query = query + "(userId int, sessionId int, itemInSession int, artist text, song text, firstName text, lastName text, length float, gender text, level text, location text, PRIMARY KEY ((userId, sessionId), itemInSession))"
session.execute(query)

<cassandra.cluster.ResultSet at 0x7fea006655c0>

#### 2.4.2 Insert data into table, inserts are ordered from Primary Keys to clustering key to expected query select fields to the reaming fields.

In [18]:
# Read in the dataset
df= pd.read_csv('event_datafile_new.csv')

In [19]:
# Build the query
query = "INSERT INTO music_by_userid ( userId, sessionId, itemInSession, artist, song, firstName, lastName, length, gender, level, location)"
query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" 

In [20]:
# Insert the data into the table
for idex, row in df.iterrows():
    session_data = [row['userId'], row['sessionId'], row['itemInSession'], row['artist'], row['song'], row['firstName'], row['lastName'], row['length'], row['gender'], row['level'], row['location']]
    session.execute(query, session_data)

#### 2.4.3 Query the music_by_userid to see if it returns the expected results

In [21]:
# Build the query
query = " select artist, song, firstName, lastName from music_by_userid WHERE userId=10 and sessionid = 182 "

In [22]:
# Execute the query and view the results in a dataframe
qdf = pd.DataFrame(list(session.execute(query)))
qdf

Unnamed: 0,artist,song,firstname,lastname
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


### 2.5 Create Table, which upon query will return the first and last name of a user who has listened to a song (Query 3)

#### 2.5.1 Create table.
Primary Key is the songId and userId.  The songId is the will be queried against, but to make it unquie the userId needs to be added to make it a composite key, as such they were desginated as the Primary Key.  The remaining fields were created in order based upon thier usage.  For example, the query given asks to return the artist, song title, and user, these are created after the Primary Key fields of sessionID and itemInSession.

In [23]:
query = "CREATE TABLE IF NOT EXISTS userinfo_by_song "
query = query + "(song text, userId int, firstName text, lastName text, sessionId int, itemInSession int, artist text, length float, gender text, level text, location text, PRIMARY KEY (song, userId))"
session.execute(query) 

<cassandra.cluster.ResultSet at 0x7fea2a89ab38>

#### 2.5.2 Insert data into table, inserts are ordered from Primary Keys to clustering key to expected query select fields to the reaming fields.

In [24]:
# Read in the dataset
df= pd.read_csv('event_datafile_new.csv')

In [25]:
# Build the query
query = "INSERT INTO userinfo_by_song ( song, userId, firstName, lastName, sessionId, itemInSession, artist, length, gender, level, location)"
query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" 

In [26]:
# Insert the data into the table
for idex, row in df.iterrows():
    session_data = [row['song'], row['userId'], row['firstName'], row['lastName'], row['sessionId'], row['itemInSession'], row['artist'], row['length'], row['gender'], row['level'], row['location']]
    session.execute(query, session_data)

#### 2.5.3 Query the userinfo_by_song to see if it returns the expected results

In [27]:
# Build the query
query = " select firstName, lastName from userinfo_by_song WHERE song='All Hands Against His Own' "

In [28]:
# Execute the query and view the results in a dataframe
qdf = pd.DataFrame(list(session.execute(query)))
qdf

Unnamed: 0,firstname,lastname
0,Jacqueline,Lynch
1,Tegan,Levine
2,Sara,Johnson


## Part 3. Drop Tables and close session

#### Drop Tables

In [29]:
query = "drop table music_by_seesionId"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "drop table music_by_userid"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "drop table userinfo_by_song"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

#### Close the session and cluster connection

In [30]:
session.shutdown()
cluster.shutdown()