# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv
from cassandra.query import dict_factory

#### Create list of filepaths to process original event csv data files
- Get event data current folder and sub folder
- Create a for loop to create list of files and filepath

In [2]:
filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))


#### Processs the files to create the 'event_datafile_new.csv' data file csv that will be used for Apache Casssandra tables
- Read csv files and extract each data row one by one and append it to list
- Write data from the list to a new event data file

In [3]:
full_data_rows_list = [] 
    
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. ETL Pipeline to import data into Apache Cassandra tables.

#### Create a Cluster session to make connection to Cassandra instance

In [5]:
from cassandra.cluster import Cluster
try: 
    cluster = Cluster(['127.0.0.1']) 
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace 'udacity'

In [6]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)



#### Set the session to 'udacity' Keyspace

In [7]:
try:
    session.set_keyspace('udacity')
    session.row_factory = dict_factory
except Exception as e:
    print(e)


#### Create table 'songplayed_session' to answer below query. The query gets artist, song title and song length for a particular Session ID and Item in Session
- select artist, song, length from songplayed_session WHERE sessionid=338 AND itemInSession = 4

##### Primary Key:
A compund primary key is used to uniquely identify the rows with 'sessionId' as partition key and 'itemInSession' as clustering key 


In [8]:
query = "CREATE TABLE IF NOT EXISTS songplayed_session "
query = query + "(sessionId int, itemInSession int, artist text, song text, length double, PRIMARY KEY (sessionId, itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)

#### The below code extracts data from 'event_datafile_new' csv file and inserts into the songplayed_session

In [9]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO songplayed_session (sessionid, itemInSession, artist, song, length)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

#### Below is a SELECT query to verify the data and uses data frame to display the query output by making use of dictionary result row format and then converting it to data frame

In [10]:
songplayed_session_query = "select artist, song, length from songplayed_session WHERE sessionid=338 AND itemInSession = 4"
try:
    rows = session.execute(songplayed_session_query)
    df = pd.DataFrame.from_dict(rows)
    print(df)
except Exception as e:
    print(e) 

      artist    length                             song
0  Faithless  495.3073  Music Matters (Mark Knight Dub)


#### Create table 'songplaylist_session' to answer below query. The query gets artist name, song title ordered by itemInSession and user's first and last name for a particular userid and sessionid.
- select artist, song, firstname, lastname from songplaylist_session WHERE userid=10 AND sessionid=182

##### Primary Key:
A compund primary key is used to uniquely identify the rows with 'userid' and 'sessionId' as composite partition key and 'itemInSession' as clustering key in descending order. 'itemInSession' is used in descending order as descending queries are faster due to the nature of the storage engine and ascending order is more efficient to store.
A composite partition key is used to break data into smaller chunks/logical sets to facilitate data retrieval and avoid hotspotting in writting data to one node repeatedly.

In [11]:
query = "CREATE TABLE IF NOT EXISTS songplaylist_session "
query = query + "(userid int, sessionId int, itemInSession int, artist text, song text, firstname varchar, lastname varchar, PRIMARY KEY ((userid, sessionid), itemInSession)) \
                    WITH CLUSTERING ORDER BY (itemInSession DESC)"
try:
    session.execute(query)
except Exception as e:
    print(e)        

#### The below code extracts data from 'event_datafile_new' csv file and inserts into the songplaylist_session

In [12]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO songplaylist_session (userid, sessionid, itemInSession, artist, song, firstname, lastname)"
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

#### Below is a SELECT query to verify the data and uses data frame to display the query output by making use of dictionary result row format and then converting it to data frame

In [13]:
songplaylist_session_query = "select artist, song, firstname, lastname from songplaylist_session WHERE userid=10 AND sessionid=182"
try:
    rows = session.execute(songplaylist_session_query)
    df = pd.DataFrame.from_dict(rows)
    df['user'] = df['firstname'] + ' ' + df['lastname']
    print(df[['artist', 'song', 'user']])
except Exception as e:
    print(e)                    

              artist                                               song  \
0      Lonnie Gordon  Catch You Baby (Steve Pitron & Max Sanna Radio...   
1  Sebastien Tellier                                          Kilometer   
2       Three Drives                                        Greece 2000   
3   Down To The Bone                                 Keep On Keepin' On   

          user  
0  Sylvie Cruz  
1  Sylvie Cruz  
2  Sylvie Cruz  
3  Sylvie Cruz  


#### Create table 'userlist_ahaho' (ahaho stands for the song 'All Hands Against His Own' ) to answer below query. The query gets artist name, song title ordered by itemInSession and user's first and last name for a particular userid and sessionid.
- select firstname, lastname from userlist_ahaho WHERE song = 'All Hands Against His Own'

##### Primary Key:
A compund primary key is used to uniquely identify the rows with 'song' as pertition key and 'userid' as clustering key. 

#### The below code extracts data from 'event_datafile_new' csv file and inserts into the userlist_ahaho

#### Below is a SELECT query to verify the data and uses data frame to display the query output by making use of dictionary result row format and then converting it to data frame

               user
0  Jacqueline Lynch
1      Tegan Levine
2      Sara Johnson


### Drop the tables before closing out the sessions

### Close the session and cluster connection¶

In [18]:
session.shutdown()
cluster.shutdown()