# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [95]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [96]:
filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    #skip jupyter notebook checkpoint files
    if dirs != []:
        file_path_list = glob.glob(os.path.join(root,'*'))
        print(file_path_list)

['/home/workspace/event_data/2018-11-27-events.csv', '/home/workspace/event_data/2018-11-04-events.csv', '/home/workspace/event_data/2018-11-07-events.csv', '/home/workspace/event_data/2018-11-09-events.csv', '/home/workspace/event_data/2018-11-19-events.csv', '/home/workspace/event_data/2018-11-05-events.csv', '/home/workspace/event_data/2018-11-22-events.csv', '/home/workspace/event_data/2018-11-16-events.csv', '/home/workspace/event_data/2018-11-26-events.csv', '/home/workspace/event_data/2018-11-24-events.csv', '/home/workspace/event_data/2018-11-29-events.csv', '/home/workspace/event_data/2018-11-15-events.csv', '/home/workspace/event_data/2018-11-20-events.csv', '/home/workspace/event_data/2018-11-06-events.csv', '/home/workspace/event_data/2018-11-18-events.csv', '/home/workspace/event_data/2018-11-21-events.csv', '/home/workspace/event_data/2018-11-10-events.csv', '/home/workspace/event_data/2018-11-23-events.csv', '/home/workspace/event_data/2018-11-02-events.csv', '/home/work

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [97]:
full_data_rows_list = []
    
for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile)
        next(csvreader)   
        for line in csvreader:
            full_data_rows_list.append(line)
            
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        #skip rows where artists is not provided
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))



In [98]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Setup Apache Cassandra, define and execute required queries.

## The CSV file titled <font color=red>event_datafile_new.csv</font>, located within the directory now contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster

In [99]:
from cassandra.cluster import Cluster
cluster = Cluster()

session = cluster.connect()

#### Create Keyspace

In [100]:
try: 
    session.execute(
        "CREATE KEYSPACE IF NOT EXISTS practice WITH REPLICATION = {'class' : 'SimpleStrategy','replication_factor': 1}")
except Exception as e:
    print(f"Failed to create keyspace. Error message: {e}")

#### Set Keyspace

In [101]:
try: 
    session.set_keyspace('practice')
except Exception as e:
    print(f"Failed to set keyspace. Error message: {e}")

### Create a generic error handling functions

In [102]:
def error(query, e):
    print(f"Failed to execute query {query}. Error: {e}")

## Create three queries that return data meeting the following requirements: 
1. Returns artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

2. Returns only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

3. Returns every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'


### Query 1

**Requirement**: Returns artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

**Description:** Our PRIMARY KEY will consist of 'sessionID', which will be used as the PARTITIONING KEY allowing filtering of our data by this value, and 'itemInSession' which will be used as the CLUSTERING column, allowing the query to return the nth song of a given session sorted in DESC order. 

In [103]:
query = "CREATE TABLE IF NOT EXISTS sessions \
        (sessionId int, itemInSession int, artist text, song text, length decimal, \
        PRIMARY KEY (sessionId, itemInSession))"

try: 
    session.execute(query)
except Exception as e:
    error(query, e)
    

query1 = "SELECT * FROM system_schema.columns WHERE keyspace_name = 'practice' AND table_name = 'sessions'"


try: 
    rows = session.execute(query1)
except Exception as e:
    error(query1, e)

for row in rows:
    print(row)

Row(keyspace_name='practice', table_name='sessions', column_name='artist', clustering_order='none', column_name_bytes=b'artist', kind='regular', position=-1, type='text')
Row(keyspace_name='practice', table_name='sessions', column_name='iteminsession', clustering_order='asc', column_name_bytes=b'iteminsession', kind='clustering', position=0, type='int')
Row(keyspace_name='practice', table_name='sessions', column_name='length', clustering_order='none', column_name_bytes=b'length', kind='regular', position=-1, type='decimal')
Row(keyspace_name='practice', table_name='sessions', column_name='sessionid', clustering_order='none', column_name_bytes=b'sessionid', kind='partition_key', position=0, type='int')
Row(keyspace_name='practice', table_name='sessions', column_name='song', clustering_order='none', column_name_bytes=b'song', kind='regular', position=-1, type='text')


In [104]:
file = 'event_datafile_new.csv'
line_ct = 0

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = """INSERT INTO sessions (sessionId, itemInSession, artist, song, length)"""
        query = query + """ VALUES (%s, %s, %s, %s, %s);"""
        try: 
            session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))
            line_ct +=1
        except Exception as e:
            print(line)
            print(e)
print(f"lines added: {line_ct}")


lines added: 6820


#### Verify that the data have been inserted into each table

In [105]:
check_q1 = """SELECT artist, song, length from sessions WHERE sessionId = 338 AND itemInSession = 4;"""
try: 
    res = session.execute(check_q1)
except Exception as e:
    error(check_q1, e)
    
for r in res:
    print(r)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', length=Decimal('495.3073'))


### Query 2

**Requirement**: Returns only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

**Description:** For this query, we'll create a COMPOSITE PARTITIONING KEY using 'userId' and 'sessionId'. This will allow rows to be uniquely identified (and filtered) by a combination of both columns. 'itemInSession' will be used as the CLUSTERING COLUMN allowing the query to return the nth song of a given session for a specific user. 'itemInSession' will be omitted from our query statement, to accomodate the requirement that our query returns **ONLY** the columns representing the artist's name, song title, and user identification details. 

In [106]:
query = "CREATE TABLE IF NOT EXISTS user_sessions \
        (userId int, sessionId int, itemInSession int, lastName text, firstName text, artist text, song text, \
        PRIMARY KEY ((userId, sessionid), itemInSession));"

try: 
    session.execute(query)
except Exception as e:
    error(query, e)
    

file = 'event_datafile_new.csv'
line_ct = 0

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = """INSERT INTO user_sessions (userId, sessionId, itemInSession, lastName, firstName, artist, song)"""
        query = query + """ VALUES (%s, %s, %s, %s, %s, %s, %s);"""
        try: 
            session.execute(query, (int(line[3]), int(line[8]), int(line[3]), line[4], line[1], line[8], line[9],))
            line_ct +=1
        except Exception as e:
            print(e)
print(f"lines added: {line_ct}")
                    

lines added: 6820


In [107]:
check_q2 = """SELECT userId, lastName, firstName, artist, song from user_sessions WHERE userId = 10 and sessionId = 182;"""

try: 
    res = session.execute(check_q2)
    if not res.current_rows:
        print("None")
    for r in res:
        print(r)
except Exception as e:
    error(check_q2, e)

None


* Note: The try and except block was altered to return 'None' after it was determined that the requested data does not exists within our data. 

### Query 3

**Requirement**: Returns every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

**Description:** For this query, we'll PRIMARY KEY from a combination the 'song' and 'userId' columns. 'song' will be our PARTITIONING KEY. 'userId' will be our CLUSTERING KEY allowing query results to be sorted and returned in DESC order. 

In [108]:
query = "CREATE TABLE IF NOT EXISTS user_songs \
        (song text, lastName text, firstName text, userId int, \
        PRIMARY KEY (song, userId))"

try: 
    session.execute(query)
except Exception as e:
    error(query, e)
    

file = 'event_datafile_new.csv'
line_ct = 0

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = """INSERT INTO user_songs (song, userId, lastName, firstName)"""
        query = query + """ VALUES (%s, %s, %s, %s);"""
        try: 
            session.execute(query, (line[9],int(line[10]), line[4], line[1]))
            line_ct +=1
        except Exception as e:
            print(e)
print(f"lines added: {line_ct}")

check_q3 = """SELECT userId, lastName, firstName from user_songs WHERE song = 'All Hands Against His Own';"""
try: 
    res = session.execute(check_q3)
except Exception as e:
    error(check_q3, e)
    
for r in res:
    print(r)
                    

lines added: 6820
Row(userid=29, lastname='Lynch', firstname='Jacqueline')
Row(userid=80, lastname='Levine', firstname='Tegan')
Row(userid=95, lastname='Johnson', firstname='Sara')


### Drop the tables before closing out the sessions

In [109]:
tbl_ls = ['sessions', 'user_sessions', 'user_songs']

for t in tbl_ls:
    try: 
        session.execute(f"DROP TABLE {t};")
        print(f"Dropped tabled {t}")
    except Exception as e:
        print(e)

Dropped tabled sessions
Dropped tabled user_sessions
Dropped tabled user_songs


### Close the session and cluster connection¶

In [110]:
session.shutdown()
cluster.shutdown()

### Resources Used 
1. Casting data into the correct dtypes when executing a CQL statement : https://stackoverflow.com/questions/55290883/invalid-string-constant-error-in-apache-cassandra-using-python


2. Handling empty ReturnSets: https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/cluster/#cassandra.cluster.ResultSet



3. Finding a better way to check that a give table has been created: https://docs.datastax.com/en/dse/5.1/cql/cql/cql_using/useQuerySystemTable.html