# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
import pandas as pd
import cassandra
import os
import glob
import csv

from etl import *
from create_tables import *
from cql_queries import *

#### Creating list of filepaths to process original event csv data files

In [2]:
root = os.getcwd()
directory = 'event_data'
filepath = root + '/' + directory
files_path_list = get_filepath_list(filepath=filepath)

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
path = root + '/' + directory
filename = 'event_datafile_new'
files_path_list = get_filepath_list(path)
full_data_rows_list = []
for filepath in files_path_list:
    full_data_rows_list.extend(read_csv(filepath))

header = ['artist', 'firstName', 'gender', 'itemInSession', \
          'lastName', 'length', 'level', 'location', 'sessionId', \
          'song', 'userId']

to_csv(full_data_rows_list, root, filename, header)

In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Apache Cassandra

## The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Create and Set Keyspace

In [5]:
cluster, session = create_keyspace()

#### Drops (if exists) Tables

In [6]:
drop_tables(session)

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




In [7]:
## Query 1:  Give me the artist, song title and song's length in the music app history that was heard during \
## sessionId = 338, and itemInSession = 4

## CREATES TABLE 1
session.execute(song_info_by_session_table_create) 

## INSERT RECORDS INTO TABLE 1
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        if line[0] == '':
            continue
            
        song_info_by_session_line = [int(line[8]), int(line[3]), line[0], line[9], float(line[5])]
        session.execute(song_info_by_session_table_insert, song_info_by_session_line)

#### Do a SELECT to verify that the data have been inserted into table 1

In [8]:
try:
    query1_result = session.execute(song_info_by_session_table_select, (338, 4))
    
    query1_result_data = []
    for row in query1_result:
        query1_result_data.append(row)
        
    query1_result_data_df = pd.DataFrame(query1_result_data, columns=query1_result.column_names)
    display(query1_result_data_df)
        
except Exception as e:
    print(e)

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


In [9]:
## Query 2: Give me only the following: name of artist, song (sorted by itemInSession) \
## and user (first and last name) for userid = 10, sessionid = 182

## CREATES TABLE 2
session.execute(song_user_info_by_user_session_table_create) 

## INSERT RECORDS INTO TABLE 2
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        if line[0] == '':
            continue
            
        song_user_info_by_user_session_line = \
        [int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]]
        session.execute(song_user_info_by_user_session_table_insert, song_user_info_by_user_session_line)     

#### Do a SELECT to verify that the data have been inserted into table 2

In [10]:
try:
    query2_result = session.execute(song_user_info_by_user_session_table_select, (10, 182))
    
    query2_result_data = []
    for row in query2_result:
        query2_result_data.append(row)
    
    query2_resul_data_df = pd.DataFrame(query2_result_data, columns=query2_result.column_names)
    display(query2_resul_data_df)
    
except Exception as e:
    print(e)

Unnamed: 0,artist,song,firstname,lastname
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


In [11]:
## Query 3: Give me every user name (first and last) in my music app history who listened \
## to the song 'All Hands Against His Own'

## CREATES TABLE 3
session.execute(user_info_by_given_song_table_create) 

## INSERT RECORDS INTO TABLE 3
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        if line[0] == '':
            continue
            
        user_info_by_given_song_line = [line[9], int(line[10]), line[1], line[4]]
        session.execute(user_info_by_given_song_table_insert, user_info_by_given_song_line)

#### Do a SELECT to verify that the data have been inserted into table 3

In [12]:
try:
    query3_result = session.execute(user_info_by_given_song_table_select, ('All Hands Against His Own',))
    
    query3_result_data = []
    for row in query3_result:
        query3_result_data.append(row)
        
    query3_result_data_df = pd.DataFrame(query3_result_data, columns=query3_result.column_names)
    display(query3_result_data_df)
    
except Exception as e:
    print(e)

Unnamed: 0,firstname,lastname
0,Jacqueline,Lynch
1,Tegan,Levine
2,Sara,Johnson


### Drop the tables before closing out the sessions

In [13]:
drop_tables(session)

### Close the session and cluster connection¶

In [14]:
session.shutdown()
cluster.shutdown()