# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import csv
from cassandra.cluster import Cluster

#### Create an Output Folder to Save CSVs

In [2]:
if not os.path.isdir('./output'):
    os.mkdir('./output')

#### Creating list of filepaths to process original event csv data files

In [3]:
# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    os.path.join(root, '*')
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [4]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('./output/event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

In [5]:
# check the number of rows in new csv file
with open('./output/event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(f'The new CSV file has {sum(1 for line in f)} rows.')

The new CSV file has 6821 rows.


# Part II. ETL Pipeline for Transferring Data from CSV to Apache Cassandra Data Base

## Setting up Apache Cassandra

#### Create a Cluster and Initiate Session

In [6]:
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

#### Create Keyspace

In [7]:
query = '''CREATE KEYSPACE IF NOT EXISTS sparkify
           WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor' : 1}'''

session.execute(query);


#### Set Keyspace

In [8]:
session.set_keyspace('sparkify')

## Create Data Tables Based on Queries

In non-relational data modeling, tables should be designed around the queries that they will be used for. Therefore, prior to building our tables, we will think about the queries that we will ask and decide how to partition our data based on those queries.

### Query 1:
### Return the artist, song title, and song length that was heard as item 4 in session 338. 

#### Write Query

In [9]:
query1 = "SELECT artist, song, length FROM music_library_by_session_item WHERE sessionID = 338 AND itemInSession = 4"

Based on this query, we can see that we should partition our data on `sessionId` and `itemInSession`.

#### Create Table for Query 1

In [10]:
create_query = 'CREATE TABLE IF NOT EXISTS music_library_by_session_item'
create_query += '(sessionId INT, itemInSession INT, artist TEXT, length DECIMAL, song TEXT, PRIMARY KEY(sessionId, itemInSession))'
session.execute(create_query);       

#### Insert Data into Table for Query 1

In [11]:
# Define filepath as CSV created earlier
file = './output/event_datafile_new.csv'

# Read data from CSV asnd transfer to Apache Cassandra data table
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO music_library_by_session_item (artist, itemInSession, length, sessionId, song)"
        query += "VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (line[0], int(line[3]), float(line[5]), int(line[8]), line[9]))

#### Perform Query
In the cell below, we will query the `music_library_by_session_item` table to get the `artist_name`, `song_title`, and `song_length` for Item 4 of Session 338.

***Query:***
```CQL
SELECT artist, song, length 
FROM music_library_by_session_item 
WHERE sessionID = 338 AND itemInSession = 4
```

In [12]:
rows = session.execute(query1)

artists = []
titles = []
lengths = []

for row in rows:
    artists.append(row.artist)
    titles.append(row.song)
    lengths.append(row.length)

session_338_item_04_df = pd.DataFrame({'artist_name': artists, 'song_title': titles, 'song_length': lengths})

***Output:***

In [13]:
session_338_item_04_df

Unnamed: 0,artist_name,song_title,song_length
0,Faithless,Music Matters (Mark Knight Dub),495.3073


As shown above, the 4th item of Session 338 was Music Matters (Mark Knight Dub) by Faithless.

### Query 2:
### Return the artist name, song title, and user (first and last name) for user 10  that was heard during session 182. Sort the results by number of item in the session.

#### Write Query

In [14]:
query2 = 'SELECT artist, song, firstName, lastName FROM music_library_by_user_session_item WHERE userId = 10 AND sessionId = 182'

From the above query, we know that the data should be partitioned by `userId` and `sessionId`. In order to sort the data by the item in the session, we can cluster on `itemInSession`.

#### Create Table for Query 2

In [15]:
create_query = 'CREATE TABLE IF NOT EXISTS music_library_by_user_session_item'
create_query += '(userId INT, sessionId INT, itemInSession INT, artist TEXT, song TEXT, firstName TEXT, lastName TEXT, PRIMARY KEY((userId, sessionId), itemInSession))'
session.execute(create_query);

#### Insert Values into Table for Query 2

In [16]:
file = './output/event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO music_library_by_user_session_item (artist, firstName, itemInSession, lastName, sessionId, song, userId)"
        query += "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (line[0], line[1], int(line[3]), line[4], int(line[8]), line[9], int(line[10])))

#### Perform Query
In the cell below, we will query the `music_library_by_user_session_item` table to get the `artist_name`, `song_title`, and `user_name` for user 10 during session 182.

***Query:***
```CQL
SELECT artist, song, firstName, lastName
FROM music_library_by_user_session_item
WHERE userId = 10 AND sessionId = 182
```

In [17]:
rows = session.execute(query2)

artists = []
songs = []
users = []

for row in rows:
    artists.append(row.artist)
    songs.append(row.song)
    users.append(row.firstname + ' ' + row.lastname)

user_10_session_182_df = pd.DataFrame({'artist_name': artists, 'song_title': songs, 'user_name': users})

***Output:***

In [18]:
user_10_session_182_df

Unnamed: 0,artist_name,song_title,user_name
0,Down To The Bone,Keep On Keepin' On,Sylvie Cruz
1,Three Drives,Greece 2000,Sylvie Cruz
2,Sebastien Tellier,Kilometer,Sylvie Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie Cruz


From the above output, we can see that `userId = 10` is Sylvie Cruz, and the four songs that she listened to during `sessionId = 182` are shown.

### Query 3:
### Return the user's name for all users who listened to the song 'All Hands Against His Own' 

#### Write Query

In [19]:
query3 = "SELECT firstName, lastName FROM all_hands_song_user_library WHERE song = 'All Hands Against His Own'"

From the above query, we know that the data should be partitioned by `song`. In order to make the Primary Key unique, we will cluster by `userId`.

#### Create Table for Query 3

In [20]:
create_query = 'CREATE TABLE IF NOT EXISTS all_hands_song_user_library'
create_query += '(song TEXT, userId INT, firstName TEXT, lastName TEXT, PRIMARY KEY((song), userId))'
session.execute(create_query);

#### Insert Data into Table for Query 3

In [21]:
file = './output/event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO all_hands_song_user_library (firstName, lastName, song, userId)"
        query += "VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[1], line[4], line[9], int(line[10])))

#### Perform Query
In the cell below, we will query the `all_hands_song_user_library` table to get the names of users who listened to the song "All Hands Against His Own".

***Query:***
```CQL
SELECT firstName, lastName
FROM all_hands_song_user_library
WHERE song = "All Hands Against His Own"
```

In [22]:
rows = session.execute(query3)

first_names = []
last_names = []

for row in rows:
    first_names.append(row.firstname)
    last_names.append(row.lastname)

all_hands_song_users_df = pd.DataFrame({'first_name': first_names, 'last_name': last_names})

***Output***

In [23]:
all_hands_song_users_df

Unnamed: 0,first_name,last_name
0,Jacqueline,Lynch
1,Tegan,Levine
2,Sara,Johnson


From the above query, we can see that Jacqueline Lynch, Sara Johnson, and Tegan Levine all listened to the song "All Hands Against His Own".

### Drop the tables to prevent duplicate data

In [24]:
# Drop Table for 1st Query
drop_query = "DROP TABLE IF EXISTS music_library_by_session_item"
session.execute(drop_query);

# Drop Table for 2nd Query
drop_query = 'DROP TABLE IF EXISTS music_library_by_user_session_item'
session.execute(drop_query);

# Drop Table for 3rd Query
drop_query = 'DROP TABLE IF EXISTS all_hands_song_user_library'
session.execute(drop_query);


### Close the session and cluster connection¶

In [25]:
session.shutdown()
cluster.shutdown()