# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

# Get current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    print(file_path_list)

/workspace/home
['/workspace/home/event_data/2018-11-10-events.csv', '/workspace/home/event_data/2018-11-04-events.csv', '/workspace/home/event_data/2018-11-23-events.csv', '/workspace/home/event_data/2018-11-03-events.csv', '/workspace/home/event_data/2018-11-28-events.csv', '/workspace/home/event_data/2018-11-14-events.csv', '/workspace/home/event_data/2018-11-22-events.csv', '/workspace/home/event_data/2018-11-15-events.csv', '/workspace/home/event_data/2018-11-01-events.csv', '/workspace/home/event_data/2018-11-06-events.csv', '/workspace/home/event_data/2018-11-19-events.csv', '/workspace/home/event_data/2018-11-18-events.csv', '/workspace/home/event_data/2018-11-16-events.csv', '/workspace/home/event_data/2018-11-07-events.csv', '/workspace/home/event_data/2018-11-25-events.csv', '/workspace/home/event_data/2018-11-21-events.csv', '/workspace/home/event_data/2018-11-17-events.csv', '/workspace/home/event_data/2018-11-08-events.csv', '/workspace/home/event_data/2018-11-12-events.c

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
full_data_rows_list = [] 
    
for f in file_path_list:

    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
         for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            

print(len(full_data_rows_list))
print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


8056


In [50]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print('number of rows in csv file', sum(1 for line in f))

number of rows in csv file 6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [51]:
from cassandra.cluster import Cluster
cluster = Cluster()

session = cluster.connect()

#### Create Keyspace

In [54]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS soundify 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [55]:
try:
    session.set_keyspace('soundify')
except Exception as e:
    print(e)

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




In [138]:
file = 'event_datafile_new.csv'

In [165]:
session.execute("DROP TABLE IF EXISTS session_history")
c1_query = "CREATE TABLE IF NOT EXISTS session_history(artist TEXT, song TEXT, length FLOAT, ItemlnSession INT, firstName TEXT, lastName TEXT, userId INT, sessionId INT, primary key(sessionId, itemlnsession))"
try:
    session.execute(c1_query)
except Exception as e:
    print(e)


In [166]:
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        i1_query = "INSERT INTO session_history (artist, song, length, ItemlnSession, firstName, lastName, userId, sessionId)"
        i1_query = i1_query + " VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
        
        session.execute(i1_query, (line[0], line[9], float(line[5]), int(line[3]), line[1], line[4], int(line[10]), int(line[8])))

In [141]:
session.execute("DROP TABLE IF EXISTS music_history")
c2_query = "CREATE TABLE IF NOT EXISTS music_history(artist TEXT, song TEXT, firstName TEXT, lastName TEXT, user TEXT, userId INT, sessionId INT, ItemlnSession INT, primary key((sessionId, userId), ItemlnSession))"
try:
    session.execute(c2_query)
except Exception as e:
    print(e)

In [142]:
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        user_name = f"{line[1]} {line[4]}"
        i2_query = "INSERT INTO music_history (artist, song, firstName, lastName, user, userId, sessionId, ItemlnSession)"
        i2_query = i2_query + " VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
        session.execute(i2_query, (line[0], line[9], line[1], line[4], user_name, int(line[10]), int(line[8]), int(line[3])))

In [147]:
session.execute("DROP TABLE IF EXISTS music_app_history")
c3_query = "CREATE TABLE IF NOT EXISTS music_app_history(song TEXT, firstName TEXT, lastName TEXT, user_name TEXT, userId INT, primary key(song, user_name))"
try:
    session.execute(c3_query)
except Exception as e:
    print(e)

In [148]:
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        full_name = f"{line[1]} {line[4]}"
        i3_query = "INSERT INTO music_app_history (song, firstName, lastName, user_name, userId)"
        i3_query = i3_query + " VALUES (%s, %s, %s, %s, %s)"
        
        session.execute(i3_query, (line[9], line[1], line[4], full_name, int(line[10])))

### Validation that the data has been inserted into each table

In [89]:
rows = session.execute("SELECT * FROM session_history;")
for i in rows:
    print(i)

Row(sessionid=23, userid=24, artist='Max Richter', firstname='Layla', itemlnsession=33, lastname='Griffin', length=193.384033203125, song='Organum')
Row(sessionid=660, userid=26, artist='Mariah Carey', firstname='Ryan', itemlnsession=2, lastname='Smith', length=261.8248596191406, song='Bye Bye')
Row(sessionid=893, userid=80, artist='After 7', firstname='Tegan', itemlnsession=21, lastname='Levine', length=241.94566345214844, song='Nights Like This')
Row(sessionid=53, userid=54, artist='Bloodhound Gang', firstname='Kaleb', itemlnsession=0, lastname='Cook', length=260.20526123046875, song='Uhn Tiss Uhn Tiss Uhn Tiss')
Row(sessionid=987, userid=49, artist='Black Eyed Peas', firstname='Chloe', itemlnsession=18, lastname='Cuevas', length=229.61587524414062, song="Let's Get It Started")
Row(sessionid=91, userid=92, artist='Usher featuring will.i.am', firstname='Ryann', itemlnsession=2, lastname='Smith', length=395.728515625, song='OMG')
Row(sessionid=128, userid=25, artist='Edward Sharpe & Th

In [90]:
rows = session.execute("SELECT * FROM music_history;")
for i in rows:
    print(i)

Row(sessionid=1049, userid=73, itemlnsession=0, artist='Manowar', firstname='Jacob', lastname='Klein', song='Shell Shock')
Row(sessionid=1049, userid=73, itemlnsession=1, artist='Morcheeba', firstname='Jacob', lastname='Klein', song='Women Lose Weight (Feat: Slick Rick)')
Row(sessionid=1049, userid=73, itemlnsession=2, artist='Maroon 5', firstname='Jacob', lastname='Klein', song="Won't Go Home Without You")
Row(sessionid=1049, userid=73, itemlnsession=3, artist='Train', firstname='Jacob', lastname='Klein', song='Hey_ Soul Sister')
Row(sessionid=1049, userid=73, itemlnsession=4, artist='LMFAO', firstname='Jacob', lastname='Klein', song="I'm In Miami Bitch")
Row(sessionid=1049, userid=73, itemlnsession=5, artist='DJ Dizzy', firstname='Jacob', lastname='Klein', song='Sexy Bitch')
Row(sessionid=1049, userid=73, itemlnsession=6, artist='Fish Go Deep & Tracey K', firstname='Jacob', lastname='Klein', song='The Cure & The Cause (Dennis Ferrer Remix)')
Row(sessionid=1049, userid=73, itemlnsessi

Row(sessionid=70, userid=71, itemlnsession=3, artist='OutKast', firstname='Ayleen', lastname='Wise', song='Ova Da Wudz')
Row(sessionid=541, userid=66, itemlnsession=0, artist='Method Man / Busta Rhymes', firstname='Kevin', lastname='Arellano', song="What's Happenin'")
Row(sessionid=541, userid=66, itemlnsession=1, artist='MGMT', firstname='Kevin', lastname='Arellano', song='Time To Pretend')
Row(sessionid=486, userid=29, itemlnsession=1, artist='The Mossie', firstname='Jacqueline', lastname='Lynch', song='Interlude #1 (Mama)')
Row(sessionid=486, userid=29, itemlnsession=2, artist='Anastacia', firstname='Jacqueline', lastname='Lynch', song='One Day In Your Life')
Row(sessionid=486, userid=29, itemlnsession=3, artist='John Mayer', firstname='Jacqueline', lastname='Lynch', song='Good Love Is On The Way')
Row(sessionid=486, userid=29, itemlnsession=4, artist='In Flames', firstname='Jacqueline', lastname='Lynch', song='My sweet shadow')
Row(sessionid=486, userid=29, itemlnsession=5, artist=

Row(sessionid=436, userid=85, itemlnsession=1, artist='Frumpies', firstname='Kinsley', lastname='Young', song='Fuck Kitty')
Row(sessionid=436, userid=85, itemlnsession=2, artist='Sheena Easton', firstname='Kinsley', lastname='Young', song='For Your Eyes Only')
Row(sessionid=436, userid=85, itemlnsession=3, artist='Arctic Monkeys', firstname='Kinsley', lastname='Young', song='Still Take You Home')
Row(sessionid=436, userid=85, itemlnsession=4, artist='Justin Timberlake', firstname='Kinsley', lastname='Young', song="Let's Take A Ride")
Row(sessionid=436, userid=85, itemlnsession=5, artist='Coldplay', firstname='Kinsley', lastname='Young', song='Clocks')
Row(sessionid=436, userid=85, itemlnsession=6, artist='Kat DeLuna', firstname='Kinsley', lastname='Young', song='Como Un SueÃ\x83Â±o (Am I Dreaming)')
Row(sessionid=436, userid=85, itemlnsession=7, artist='Todd Rundgren', firstname='Kinsley', lastname='Young', song="Lord Chancellor's Nightmare Song")
Row(sessionid=436, userid=85, itemlnse

Row(sessionid=393, userid=16, itemlnsession=1, artist='AndrÃ\x83Â©s Cepeda', firstname='Rylan', lastname='George', song='CÃ\x83Â³mo Puede Ser')
Row(sessionid=393, userid=16, itemlnsession=2, artist='Irish Tenors', firstname='Rylan', lastname='George', song='Danny Boy')
Row(sessionid=393, userid=16, itemlnsession=3, artist='Justin Bieber', firstname='Rylan', lastname='George', song='Runaway Love')
Row(sessionid=549, userid=66, itemlnsession=0, artist='Marc Et Claude', firstname='Kevin', lastname='Arellano', song="I Need Your Lovin' (Like The Sunshine) (Radio Edit)")
Row(sessionid=312, userid=88, itemlnsession=0, artist='Spoon', firstname='Mohammad', lastname='Rodriguez', song='The Underdog (Album version)')
Row(sessionid=312, userid=88, itemlnsession=1, artist='Blind Pilot', firstname='Mohammad', lastname='Rodriguez', song='Things I Cannot Recall')
Row(sessionid=312, userid=88, itemlnsession=2, artist='Delirious?', firstname='Mohammad', lastname='Rodriguez', song='History Maker')
Row(se

In [91]:
rows = session.execute("SELECT * FROM music_app_history;")
for i in rows:
    print(i)

Row(song="Wonder What's Next", userid=49, firstname='Chloe', lastname='Cuevas')
Row(song="In The Dragon's Den", userid=49, firstname='Chloe', lastname='Cuevas')
Row(song='Too Tough (1994 Digital Remaster)', userid=44, firstname='Aleena', lastname='Kirby')
Row(song='Rio De Janeiro Blue (Album Version)', userid=49, firstname='Chloe', lastname='Cuevas')
Row(song='My Place', userid=15, firstname='Lily', lastname='Koch')
Row(song='My Place', userid=73, firstname='Jacob', lastname='Klein')
Row(song='The Lucky Ones', userid=24, firstname='Layla', lastname='Griffin')
Row(song='I Want You Now', userid=80, firstname='Tegan', lastname='Levine')
Row(song='Why Worry', userid=88, firstname='Mohammad', lastname='Rodriguez')
Row(song='TvÃ\x83Â¡rÃ\x83Â\xad v TvÃ\x83Â¡r', userid=97, firstname='Kate', lastname='Harrell')
Row(song="Lord Chancellor's Nightmare Song", userid=85, firstname='Kinsley', lastname='Young')
Row(song='Misfit Love', userid=25, firstname='Jayden', lastname='Graves')
Row(song='Eat To 

Row(song='Fear', userid=73, firstname='Jacob', lastname='Klein')
Row(song='Black Hole', userid=88, firstname='Mohammad', lastname='Rodriguez')
Row(song='332', userid=86, firstname='Aiden', lastname='Hess')
Row(song='Lotus', userid=16, firstname='Rylan', lastname='George')
Row(song='Lotus', userid=85, firstname='Kinsley', lastname='Young')
Row(song='Someday', userid=16, firstname='Rylan', lastname='George')
Row(song='Someday', userid=80, firstname='Tegan', lastname='Levine')
Row(song='Beer And Bones', userid=16, firstname='Rylan', lastname='George')
Row(song='Taste In Men', userid=42, firstname='Harper', lastname='Barrett')
Row(song='Nada Que Perder (Live)', userid=80, firstname='Tegan', lastname='Levine')
Row(song='Guest List', userid=73, firstname='Jacob', lastname='Klein')
Row(song='Teenage Crush', userid=72, firstname='Hayden', lastname='Brock')
Row(song='Dusty', userid=97, firstname='Kate', lastname='Harrell')
Row(song='Boom', userid=29, firstname='Jacqueline', lastname='Lynch')
Ro

Row(song="Rockin' My Boogie", userid=86, firstname='Aiden', lastname='Hess')
Row(song='White & Nerdy (Parody of "Ridin\'" by Chamillionaire featuring Krayzie Bone)', userid=84, firstname='Shakira', lastname='Hunt')
Row(song='Definitive', userid=16, firstname='Rylan', lastname='George')
Row(song='Checking On My Baby', userid=15, firstname='Lily', lastname='Koch')
Row(song='The Ghost Of Tom Joad', userid=29, firstname='Jacqueline', lastname='Lynch')
Row(song='Living In Love', userid=82, firstname='Avery', lastname='Martinez')
Row(song='Dirty Word', userid=29, firstname='Jacqueline', lastname='Lynch')
Row(song='The Cynic', userid=97, firstname='Kate', lastname='Harrell')
Row(song='Ripping Flesh', userid=29, firstname='Jacqueline', lastname='Lynch')
Row(song='Mary (Album)', userid=97, firstname='Kate', lastname='Harrell')
Row(song='In The Flowers', userid=26, firstname='Ryan', lastname='Smith')
Row(song='Me In You', userid=29, firstname='Jacqueline', lastname='Lynch')
Row(song='Persephone'

Row(song='The Gift', userid=82, firstname='Avery', lastname='Martinez')
Row(song='The Gift', userid=88, firstname='Mohammad', lastname='Rodriguez')
Row(song='The Gift', userid=101, firstname='Jayden', lastname='Fox')
Row(song='Michael', userid=97, firstname='Kate', lastname='Harrell')
Row(song='All Will Be Forgotten (Album Version)', userid=97, firstname='Kate', lastname='Harrell')
Row(song='the king of wishful thinking', userid=33, firstname='Bronson', lastname='Harris')
Row(song='the king of wishful thinking', userid=50, firstname='Ava', lastname='Robinson')
Row(song='Same Old Thing', userid=36, firstname='Matthew', lastname='Jones')
Row(song='Song Sung Blue', userid=97, firstname='Kate', lastname='Harrell')
Row(song='Song Away', userid=44, firstname='Aleena', lastname='Kirby')
Row(song='Seduction (Album Version)', userid=35, firstname='Molly', lastname='Taylor')
Row(song='Wake Up', userid=29, firstname='Jacqueline', lastname='Lynch')
Row(song='Wake Up', userid=80, firstname='Tegan',

### Question 1: Artist, Song title, Song length for:
#### -sessionId = 338
#### -itemInSession = 4

In [167]:
try:
    Q1 = session.execute("SELECT artist, song, length FROM session_history WHERE sessionId = 338 AND itemlnsession = 4")
    for i in Q1:
        print(f"Artist: {i.artist}, Song: {i.song}, Length: {i.length}")
except Exception as e: 
    print (e)
                    

Artist: Faithless, Song: Music Matters (Mark Knight Dub), Length: 495.30731201171875


### Question 2: artist, song, name of users for:
#### userid = 10
#### sessionid = 182
#### ordered by itemInSession

In [162]:
try:
    Q2 = session.execute("SELECT artist, song, user, ItemlnSession FROM music_history WHERE sessionId = 182 AND userid = 10 ORDER BY ItemlnSession")
    for i in Q2:
        print(f"Artist: {i.artist}; Song: {i.song}; User: {i.user}")
except Exception as e: 
    print (e)

Artist: Down To The Bone; Song: Keep On Keepin' On; User: Sylvie Cruz
Artist: Three Drives; Song: Greece 2000; User: Sylvie Cruz
Artist: Sebastien Tellier; Song: Kilometer; User: Sylvie Cruz
Artist: Lonnie Gordon; Song: Catch You Baby (Steve Pitron & Max Sanna Radio Edit); User: Sylvie Cruz


### Question 3: Name of users who listened to song 'All Hands Against His Own':

In [151]:
try:
    Q3 = session.execute("SELECT user_name FROM music_app_history WHERE song = 'All Hands Against His Own'")
    for i in Q3:
        print(i.user_name)
except Exception as e: 
    print (e)

Jacqueline Lynch
Sara Johnson
Tegan Levine


### Drop the tables before closing out the sessions

In [134]:
session.execute("DROP TABLE IF EXISTS session_history")
session.execute("DROP TABLE IF EXISTS music_history")
session.execute("DROP TABLE IF EXISTS music_app_history")

<cassandra.cluster.ResultSet at 0x7a4d1c3e1208>

### Close the session and cluster connection¶

In [None]:
session.shutdown()
cluster.shutdown()