# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [2]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [3]:
# checking the current working directory
print(os.getcwd())

# Getting the current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/workspace/home


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [4]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# print(len(full_data_rows_list))
# print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [5]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


#### Creating a Cluster

The following code imports the Cassandra library, creates a cluster and connects to a new session.

In [6]:
import cassandra

from cassandra.cluster import Cluster
try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)

    session = cluster.connect()

#### Create Keyspace

Create a Keyspace and replicate the data to a single node


In [7]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS user_music
    WITH REPLICATION = 
    {'class' : 'SimpleStrategy',
    'replication_factor' : 1}    
    """)
except Exception as e:
    print(e)

#### Set Keyspace

Setting a keyspace user_music to save the tables

In [8]:
# Set KEYSPACE to the keyspace specified abovetry:
try:
    session.set_keyspace('user_music')
except Exception as e:
    print(e)


### Query1

#### The first query requires a list of the artists, song title and length where session_id = 338 and itemInSession  = 4

#### Create a new table for first query requirements


Approach:

1. I drop any existing tables with the same name. 
2. Next, i create a new table songs_played to store the variable names, as needed in the select query later. 
3. I have defined the appropriate data types for each variable, depending on the values they will store. 
4. For the primary key, i have used the sessionid as the partition key, while the iteminsession will be my cluster key.
5. Since the sessionid is the partition key, Cassandra will create a partition for each sessionid value, while the cluster key will be the sorting value for my table. 

In [9]:
query = "Drop table if exists songs_played"

query1 = "CREATE TABLE IF NOT EXISTS songs_played (sessionId int, itemInSession int, artist text, song text, length float, primary key (sessionid, itemInSession))"

try:
    session.execute(query)
    session.execute(query1)
except Exception as e:
    print(e)

#### Insert data into the newly created table as per first query's requirements.


Approach: 
1. In this section, i read the data from the original data set. 
2. Next, i insert the required variables into my newly defined table created in the previous step. 

In [10]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "insert into songs_played (sessionId, itemInSession, artist, song, length)"
        query = query + "values(%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]),int(line[3]), line[0], line[9], float(line[5])))

#### Verify the data with a select statement

Approach: 
1. I use the select statement, along with the required variables, and filter on the data, as needed. 
2. The sessionid is the partition key. As soon as i partition on sessionid =338, Cassandra looks for the required partition and filters on it basis my criteria.
3. Finally, the results are printed

In [11]:
## Verifying the data was entered into the table

try:
    rows=session.execute("Select artist, song, length from songs_played where sessionId = 338 and itemInSession = 4")
except Exception as e:
    print(e)
    
for row in rows:
    print(row.artist, row.song, row.length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


### Query 2

#### The second query requires the name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

1. I have used a composite partition key consisting of the userid and the sessionid. With a composite key consisting of two columns, a unique partition is created, making it easier for Cassandra to query the table.
2. The itemInSession is the clustering column, which is used for sorting the data, as highlighted in the requirements. 

In [35]:
query = "Drop table if exists artist_filters"

query1 = "CREATE TABLE IF NOT EXISTS artist_filters (userid int, sessionid int, itemInSession int, \
song text, artist text, firstName text, lastName text,  primary key ((userid, sessionId), itemInSession))"

try:
    session.execute(query)
    session.execute(query1)
except Exception as e:
    print(e)

#### Inserting values into the table

In [13]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "insert into artist_filters (firstName, lastName, artist, sessionid, itemInSession, song, userid)"
        query = query + "values(%s, %s, %s, %s, %s,%s, %s)"
        session.execute(query, (line[1], line[4], line[0], int(line[8]), int(line[3]),line[9], int(line[10])))

#### Verify the data with a select statement

In [14]:
query = "Select artist, song, firstname, lastname from artist_filters where userid = 10 and sessionid = 182"

try:
    rows=session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.artist, row.song, row.firstname, row.lastname)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


### Query 3

#### List every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'


In [30]:
query = "Drop table if exists music_users"

query1 = "CREATE TABLE IF NOT EXISTS music_users (song text, user_id int, firstName text, lastName text, artist text, \
primary key (song, user_id))"

try:
    session.execute(query)
    session.execute(query1)
except Exception as e:
    print(e)

#### Inserting values into the table

In [32]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "insert into music_users (song, user_id, firstName, lastName, artist)"
        query = query + "values(%s, %s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), line[1], line[4], line[0]))

#### Verifying the data with a select statement

In [33]:
query3 = "select firstname, lastname from music_users where song = 'All Hands Against His Own'"

try:
    rows = session.execute(query3)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.firstname, row.lastname)

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Drop the tables before closing out the sessions

In [86]:
# Dropping tables created above

In [61]:
drop_tbl1 = "Drop table if exists songs_played"
drop_tbl2 = "Drop table if exists artist_filters"
drop_tbl3 = "Drop table if exists music_users"

### Close the session and cluster connection¶

In [62]:
session.shutdown()
cluster.shutdown()