# The requirement it to provide a Cassandra based platform to perform analysis of the Music Application User Session Data

## The Following are the requirements for business. 


### For a given session and session items provide a report on the artist, song title and song length
##### e.g artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### For a given user and session information provide a report on artist, song, user information sorted by session items
#### e.g. name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### For a given song provide me a report of the user first and last name
#### e.g user name (first and last) who listened to the song 'All Hands Against His Own'



# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking current working directory
print(os.getcwd())

# Get current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part 2. Database pipeline to process data and provide desired output

#### Creating a Cluster

In [5]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

In [6]:
# Create a Keyspace
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS project2 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [7]:
# Set KEYSPACE to the keyspace specified above
try:
    session.set_keyspace('project2')
except Exception as e:
    print(e)


### For a given session and session items provide a report on the artist, song title and song length
##### e.g artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

music_app_session_artist table is created with session_id,item_number_in_session,artist,song_length,song_title  
Partition key is session_id which uniquely identify the partition.  
Clustering Column is item_number_in_session to uniquely identify the row within the partition  
This table facilitates to get data about artist, song length and song title based on session and session item number

In [8]:
## music_app_session_artist is created with session_id,item_number_in_session,artist,song_length,song_title \
## Partition key is session_id which uniquely identify the partition. \
## Clustering Column is item_number_in_session to uniquely identify the row within the partition \
## This table facilitates to get data about artist, song length and song title based on session and session item number \
##
query = "CREATE TABLE IF NOT EXISTS music_app_session_artist "
query = query + "(session_id int, item_number_in_session int, artist text,  song_length decimal,  song_title text, "
query = query + "PRIMARY KEY ((session_id), item_number_in_session))"
try:
    session.execute(query)
except Exception as e:
    print(e)


                    

In [9]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
## INSERT statements into the `query` variable
        query = "INSERT INTO music_app_session_artist(session_id,item_number_in_session,artist,song_length,song_title)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]),int(line[3]),line[0],float(line[5]),line[9]))


#### SELECT to verify that the data have been inserted into each table

In [10]:
## SELECT statement to verify the data was entered into the table
query = "select artist, song_length, song_title from music_app_session_artist"
query = query + " WHERE session_id = %s and item_number_in_session = %s"
try:
    rows = session.execute(query, (int(338), int(4)))
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song_length, row.song_title)

Faithless 495.3073 Music Matters (Mark Knight Dub)


### For a given user and session information provide a report on artist, song, user information sorted by session items

music_app_session_artist_user table is created with session_id,user_id,item_number_in_session,artist,first_name, last_name, song_title  
Partition key is session_id which uniquely identify the partition.  
Clustering Column is user_id, item_number_in_session to uniquely identify the row within the partition.  
This table facilitates to get data about artist, song title, user first and last name based on session and user id, sorted by user_id, session item number in descending order  

In [11]:
## music_app_session_artist_user is created with session_id,user_id,item_number_in_session, \
# artist,first_name, last_name, song_title \
## Partition key is session_id, user_id which uniquely identify the partition. \
## Clustering Column is item_number_in_session to uniquely identify the row within the partition \
## This table facilitates to get data about artist, song title, user first and last name \
# based on session and user id, sorted by user_id, session item number in descending order \
##
query = "CREATE TABLE IF NOT EXISTS music_app_session_artist_user"
query = query + "(session_id int, user_id int, item_number_in_session int, artist text,  first_name text,  last_name text,  song_title text, "
query = query + "PRIMARY KEY ((session_id, user_id), item_number_in_session))"
query = query + "WITH CLUSTERING ORDER BY (item_number_in_session DESC)"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [12]:
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
## INSERT statements into the `query` variable
        query = "INSERT INTO music_app_session_artist_user(session_id,user_id,item_number_in_session,artist,first_name, last_name,song_title)"
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]),int(line[10]),int(line[3]),line[0],line[1],line[4],line[9]))


#### SELECT to verify that the data have been inserted into each table

In [13]:
## SELECT statement to verify the data was entered into the table
query = "select artist, song_title, first_name, last_name from music_app_session_artist_user"
query = query + " WHERE session_id = %s and user_id = %s"
try:
    rows = session.execute(query, (int(182), int(10)))
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song_title, row.first_name, row.last_name)
                    

Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Down To The Bone Keep On Keepin' On Sylvie Cruz


### For a given song provide me a report of the user first and last name
#### e.g user name (first and last) who listened to the song 'All Hands Against His Own'

music_app_song_user table is created with song_title, user_id, first_name, last_name  
Partition key is song_title which uniquely identify the partition.  
Clustering Column is user_id to uniquely identify the row within the partition  
This table facilitates to get data about user first and last name based on song name  

In [14]:
## music_app_song_user is created with song_title, user_id, first_name, last_name\
## Partition key is song_title which uniquely identify the partition. \
## Clustering Column is user_id to uniquely identify the row within the partition \
## This table facilitates to get data about user first and last name based on song name \
##

query = "CREATE TABLE IF NOT EXISTS music_app_song_user"
query = query + "(song_title text, user_id int, first_name text, last_name text, "
query = query + "PRIMARY KEY ((song_title), user_id))"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [15]:
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
## INSERT statements into the `query` variable
        query = "INSERT INTO music_app_song_user(song_title, user_id, first_name, last_name)"
        query = query + " VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[9],int(line[10]),line[1], line[4],))

#### SELECT to verify that the data have been inserted into each table

In [16]:
query = "select first_name, last_name from music_app_song_user"
query = query + " WHERE song_title=%s "
try:
    rows = session.execute(query,["All Hands Against His Own"])
except Exception as e:
    print(e)

for row in rows:
    print (row.first_name, row.last_name)

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Drop the tables before closing out the sessions

In [17]:
query = "drop table music_app_session_artist"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "drop table music_app_session_artist_user"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
	
query = "drop table music_app_song_user"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

### Close the session and cluster connection¶

In [18]:
session.shutdown()
cluster.shutdown()