# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [None]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [None]:
# Getting current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Creates a list of files that will be used to extract data
for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [None]:
full_data_rows_list = [] 

for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)    
        for line in csvreader:
            full_data_rows_list.append(line) 
            
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    
    #Writes header
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    
    for row in full_data_rows_list:
        #Skips rows without an artist
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


#### Creating a Cluster

In [None]:
from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

#### Create Keyspace

In [None]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS musical_events
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
""")

#### Set Keyspace

In [None]:
session.set_keyspace('musical_events')

## Table 1

### This table is set up to answer queries that will retrieve an artist, song title and song length based on a given sessionId and the itemInSession number.

itemInSession and sessionId were chosen to be the composite primary key as they make a unique pair together and are what will be used to retrieve rows from this table.

In [None]:
query = """
    CREATE TABLE IF NOT EXISTS artist_library (
        itemInSession int,
        sessionId int,
        artist text,
        song_title text,
        song_length double,
        PRIMARY KEY(itemInSession, sessionId))
"""
session.execute(query)
                    

In [None]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO artist_library (itemInSession, sessionId, artist, song_title, song_length)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[3]), int(line[8]), line[0], line[9], float(line[5])))

### Example Query

In [None]:
rows = session.execute("""
    SELECT
        artist,
        song_title,
        song_length
    FROM
        artist_library
    WHERE itemInSession=4 AND sessionId=338
""")

for row in rows:
    print(row)

## Table 2

### This table is set up to answer queries that will retrieve an artist, user (first and last name) and song title sorted by the item number in the session based on a given user id and session id.

user_id and session_id were chosen to be the composite primary key as they are the columns that will be used in the WHERE clause for this query, itemInSession is added as a clustering column to make the PRIMARY KEY unique and to sort the retrieved rows so that they are in the order the user listened to them this session.

In [None]:
query = """
    CREATE TABLE IF NOT EXISTS artist_user_lib (
        user_id int,
        session_id int,
        itemInSession int,
        artist text,
        song_title text,
        user_first_name text,
        user_last_name text,
        PRIMARY KEY((user_id, session_id), itemInSession)
    ) WITH CLUSTERING ORDER BY (itemInSession ASC)
"""
session.execute(query)
                    

In [None]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = """
            INSERT INTO artist_user_lib (
                user_id,
                session_id,
                itemInSession,
                artist,
                song_title,
                user_first_name,
                user_last_name
            )
        """     
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

### Example Query

In [None]:
rows = session.execute("""
    SELECT
        artist,
        song_title,
        user_first_name,
        user_last_name
    FROM
        artist_user_lib
    WHERE user_id=10 AND session_id=182
""")

for row in rows:
    print(row)

## Table 3

### This table is set up to answer queries that will get a user (first and last name) based on a song_title

song_title and user_id were chosen as the composite PRIMARY KEY here to create a unique key. user_id was preferred to using user_first_name and user_last_name as I felt it would more evenly divide the data.

In [None]:
query = """
    CREATE TABLE IF NOT EXISTS user_library (
        song_title text,
        user_id int,
        user_first_name text,
        user_last_name text,
        PRIMARY KEY(song_title, user_id))
"""
session.execute(query)
                    

In [None]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO user_library (song_title, user_id, user_first_name, user_last_name)"
        query = query + " VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

### Example Query

In [None]:
rows = session.execute("""
    SELECT
        user_first_name,
        user_last_name
    FROM
        user_library
    WHERE song_title='All Hands Against His Own'
""")

for row in rows:
    print(row)

### Drop the tables before closing out the sessions

In [None]:
## TO-DO: Drop the table before closing out the sessions
drop_query = """DROP TABLE IF EXISTS {}"""

session.execute(drop_query.format('artist_library'))
session.execute(drop_query.format('artist_user_lib'))
session.execute(drop_query.format('user_library'))

### Close the session and cluster connection¶

In [None]:
session.shutdown()
cluster.shutdown()