# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [9]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv
from datetime import datetime

#### Creating list of filepaths to process original event csv data files

In [10]:
# Create the filepath for the provided folder_ext

filepath = os.getcwd() + '/event_data'
print('{} is your root directory. \n'.format(filepath))

# Gather all of the file paths for .csv files specifically
# in our defined path

for root, dirs, files in os.walk(filepath):
    # Looks like there is this folder hidden/created during
    # the project so this is instituted to ignore it 
    if ('.ipynb_checkpoints' in root):
        continue
    file_path_list = glob.glob(os.path.join(root,'*.csv'))   

path_len = len(file_path_list)
print('{} files found. \n'.format(path_len))
   

/home/workspace/event_data is your root directory. 

30 files found. 



#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [11]:
# Create a list containing all of the data from the list
# of files that were found

full_data_rows_list = [] 
for i, f in enumerate(file_path_list, 1): 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        for line in csvreader:
            full_data_rows_list.append(line)
print('File data has been processed into a single list data type. \n')

# If a data file already exists we will rename it with todays date and
# place it into an archive folder. If this is run multiple times in a 
# day the archived file will be overwritten.

if os.path.exists('event_datafile_new.csv'):
    tday = datetime.now().strftime('%Y_%m_%d')
    os.makedirs('Archive',exist_ok = True)
    os.rename('event_datafile_new.csv','Archive/{}.csv'.format(tday))
    print('event_datafile_new.csv already exists and has been archived as {}.csv. \n'.format(tday))


# Create a dialect to write a new csv file (event_datafile_new) with
# making everything a string data type. The first row is a header row
# and we only pull relevant columns from *full_data_rows_list*. If the
# first row is empty it is know to not be relevant event data.

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True) 

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName',\
                    'length','level','location','sessionId','song','userId'])
    for i, row in enumerate(full_data_rows_list, 1):
        if (row[0] == ''): 
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7],\
                         row[8], row[12], row[13], row[16]))  

print('event_data has been processed and stored in event_datafile_new. \n')

File data has been processed into a single list data type. 

event_datafile_new.csv already exists and has been archived as 2021_11_13.csv. 

event_data has been processed and stored in event_datafile_new. 



# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [12]:
from cassandra.cluster import Cluster
cluster = Cluster()
session = cluster.connect()

#### Create Keyspace

In [13]:
# If there is no sparkify keyspace, build one.
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

<cassandra.cluster.ResultSet at 0x7f787172b390>

#### Set Keyspace

In [14]:
# Set our keyspace to sparkify, which was just created.
session.set_keyspace('sparkify')

print('sparkify keyspace is created and set. \n')

sparkify keyspace is created and set. 



### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




## Query Design

1. Query 1 was designed as the "session" table. We need to filter by sessionId and itemInSession so these two columns were used as a composite key because the combinationis unique, and both are required filters for this query. Artist, song title and song length were all added as data columns as well.

2. Query 2 was designed as the "user" table. We need to filter by userid and sessionid so these two columns were used as a composite key because the combination is unique and both are necessary as filters. ItemInSession is then used as a clustering column so the return is sorted by this column. Artist, song title, user first, and user last name are all added as data columns as well.

3. Query 3 was designed as the "song" table. We need to filter by song, but that doesn't provide a unique id to each row. Users may have listened to the song multiple times. To make sure our Primary Key isn't used on multiple rows, sessionID and itemInSession are used as clustering columns.

In [17]:
# CREATE TABLES

session_create = ("""
CREATE TABLE IF NOT EXISTS
session_library(
sessionId int,
itemInSession int,
artist text, 
length decimal,
song text,
PRIMARY KEY (sessionId, itemInSession)
)
""")

user_create = ("""
CREATE TABLE IF NOT EXISTS 
user_library( 
userId int,  
sessionId int,
itemInSession int,
artist text, 
song text, 
firstName text,  
lastName text,
PRIMARY KEY ((userId,sessionId), itemInSession)
)
""")

song_create = ("""
CREATE TABLE IF NOT EXISTS
song_library( 
song text,
sessionId int,
itemInSession int,
firstName text,
lastName text,
PRIMARY KEY (song,sessionId,itemInSession)
)
""")

# DROP TABLES

session_drop = ("""DROP TABLE IF EXISTS session_library""")

user_drop = ("""DROP TABLE IF EXISTS user_library""")

song_drop = ("""DROP TABLE IF EXISTS song_library""")


# INSERT RECORDS

session_insert =("""
INSERT INTO 
session_library(
sessionId, 
itemInSession, 
artist, 
length, 
song
) 
VALUES(%s,%s,%s,%s,%s)
""")
user_insert =   ("""
INSERT INTO user_library(
userId
sessionId, 
itemInSession, 
artist, 
song, 
firstName, 
lastName, 
) 
VALUES(%s,%s,%s,%s,%s,%s,%s)
""")

song_insert =   ("""
INSERT INTO song_library(
song, 
sessionId, 
itemInSession
firstName, 
lastName, 
) 
VALUES(%s,%s,%s,%s,%s)
""")

# TEST QUERIES

session_query = (""" 
SELECT artist, song, length 
FROM session_library 
WHERE sessionId = 338 AND itemInSession = 4
""")

user_query = ("""
SELECT artist, song, firstName, lastName 
FROM user_library 
WHERE userId = 10 AND sessionId = 182
""")

song_query = ("""
SELECT firstName, lastName 
FROM song_library 
WHERE song = 'All Hands Against His Own'
""")

query1 = ("""
Give me the artist, song title and song's length in the music app history
that was heard during sessionId = 338, and itemInSession = 4
""")

query2 = ("""
Give me only the following: name of artist, song (sorted by itemInSession)
and user (first and last name) for userid = 10, sessionid = 182
""")

query3 = ("""
Give me every user name (first and last) in my music app history who listened
to the song 'All Hands Against His Own'
""")

drop_table_queries = {'session_drop': session_drop, 'user_drop': user_drop, 'song_drop': song_drop}
create_table_queries = {'session_create': session_create, 'user_create':user_create, 'song_create':song_create}
insert_queries = {'session_insert':session_insert,'user_insert': user_insert,'song_insert': song_insert}
check_queries = {query1: session_query, query2: user_query, query3: song_query}

In [18]:
# Create any pre-defined tables from the sql_queries 'library'
for query in create_table_queries:
    session.execute(create_table_queries[query])
    print('{} table in the sparkify keyspace has been added.'.format(query[:-7]))

session table in the sparkify keyspace has been added.
user table in the sparkify keyspace has been added.
song table in the sparkify keyspace has been added.


In [19]:
# Open our event data file and read it line by line, uploading relevant data
with open('event_datafile_new.csv', encoding = 'utf8') as f:
    rowtot = sum(1 for lines in f)
with open('event_datafile_new.csv', encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # Skip header
    printcount = 0
    for i, line in enumerate(csvreader,1):
        session.execute(insert_queries['session_insert'], (line[0], int(line[3]),\
                        float(line[5]), int(line[8]), line[9]))
        session.execute(insert_queries['user_insert'], (line[0], line[9], line[1],\
                        int(line[3]), line[4], int(line[8]), int(line[10])))
        session.execute(insert_queries['song_insert'], (line[1], line[4], line[9], \
                        int(line[8]), int(line[3])))
        printcount += 1
        if printcount == 1000 or i == rowtot:
            print('{}/{} rows uploaded to keyspace'.format(i,rowtot), end = "\r", flush = True)
            printcount = 0

print('Relevant data from event_datafile_new.csv has been uploaded to the sparkify keyspace. \n')

Relevant data from event_datafile_new.csv has been uploaded to the sparkify keyspace. 



#### Do a SELECT to verify that the data have been inserted into each table

In [20]:
for i,query in enumerate(check_queries,1):
    rows = session.execute(check_queries[query])
    print('Query #{}:\n{}:\n'.format(i,query))
    df = pd.DataFrame(list(rows))
    print('\n{}\n'.format(df))
print('\nAll queries have ran\n')

Query #1:

Give me the artist, song title and song's length in the music app history
that was heard during sessionId = 338, and itemInSession = 4
:


      artist                             song    length
0  Faithless  Music Matters (Mark Knight Dub)  495.3073

Query #2:

Give me only the following: name of artist, song (sorted by itemInSession)
and user (first and last name) for userid = 10, sessionid = 182
:


              artist                                               song  \
0   Down To The Bone                                 Keep On Keepin' On   
1       Three Drives                                        Greece 2000   
2  Sebastien Tellier                                          Kilometer   
3      Lonnie Gordon  Catch You Baby (Steve Pitron & Max Sanna Radio...   

  firstname lastname  
0    Sylvie     Cruz  
1    Sylvie     Cruz  
2    Sylvie     Cruz  
3    Sylvie     Cruz  

Query #3:

Give me every user name (first and last) in my music app history who listened
to

### Drop the tables before closing out the sessions

In [21]:
# Drop any pre-defined tables from the sql_queries 'library'
for query in drop_table_queries:
    session.execute(drop_table_queries[query])
    print('{} table in the sparkify keyspace has been dropped.'.format(query[:-5]))

session table in the sparkify keyspace has been dropped.
user table in the sparkify keyspace has been dropped.
song table in the sparkify keyspace has been dropped.


### Close the session and cluster connection¶

In [22]:
session.shutdown()
print('session is shutdown \n')
cluster.shutdown()
print('cluster is shutdown \n')

session is shutdown 

cluster is shutdown 

