# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Importing Python packages 
import pandas as pd, numpy as np
import cassandra, re, os, glob, json, csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '\event_data'
# print(filepath)

file_path_list = []

for root, dirs, files in os.walk(filepath): # for loop to create a list of files and collect each filepath
    file_path_list = glob.glob(os.path.join(root,'*')) # join the file path and roots with the subdirectories using glob based on pattern, '*' meaning all files
#     print(file_path_list)

C:\Users\harik\Desktop\Udacity Data Modeling\project-1b-project-template.ipynb


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
full_data_rows_list = [] # initiating an empty list of rows that will be generated from each file
line_number = 0 # Created this variable to know which line might have an issue in a certain file 

for f in file_path_list: # for every filepath in the file path list 
    try:
        with open(f, 'r', encoding = 'utf8', newline='') as csvfile: # reading csv file
            csvreader = csv.reader(csvfile) # creating a csv reader object 
            next(csvreader) # Reading the next line in the csv file
            
            for line in csvreader: # Extracting each data row one by one and append it        
                line_number += 1 # Incrementing the line number
                #print(line)
                full_data_rows_list.append(line) 
                
    except Exception as e:
        print("Check if there might be an issue with file: {} in line number {} else look at the system error below".format(f, line_number),"\n")
        print(e)
            
print(len(full_data_rows_list)) # To get total number of rows 
# print(full_data_rows_list) # Uncomment the code below if you would like to check to see what the list of event data rows will look like

# The function register_dialect will creates our own dialect with the name "myDialect"
# QUOTE_ALL will enable quotes around all fields 
# skipinitialspace will ignore the spaces right after the delimiter
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

# Creating a event_datafile_full csv that will be used to insert data into the apache Cassandra tables with exception handling
try:
    with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
        writer = csv.writer(f, dialect='myDialect') # Using our new dialect to csv writer
        writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                    'level','location','sessionId','song','userId']) # Creating columns in the csv in the first row
        
        for row in full_data_rows_list:
            
            if (row[0] == ''): #Excluding rows without artist name
                continue
            
            writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

except Exception as e:
    print(e)


8056


In [4]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f: # check the number of rows in your csv file
    print(sum(1 for line in f))

6821


# Part II. Apache Cassandra coding 

## The CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [6]:
# Creating a cassandra local connection with exception handling
from cassandra.cluster import Cluster

try:
    cluster = Cluster(['127.0.0.1']) #If you have a locally installed Apache Cassandra instance
    session = cluster.connect() # To establish connection and begin executing queries, need a session
    print("Connection to local host successful and session created")
except Exception as e:
    print(f"Check you connection details and session active or not or check below \n {e}")

Connection to local host successful and session created


#### Create Keyspace

In [7]:
# Creating a Keyspace with exception handling 
try:
    create_query = """CREATE KEYSPACE IF NOT EXISTS sparkify 
                    with REPLICATION = 
                    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
                """
    session.execute(create_query)
    print("Key Space created")
except Exception as e:
    print(f"Key Space creation failed \n {e}")

Key Space created


#### Set Keyspace

In [8]:
try:
    session.set_keyspace('sparkify') # Set KEYSPACE to the keyspace specified above, this is similar to USE DataBase in SQL
    print("Key space set successfully")
except Exception as e:
    print(f"Setting Key space failed \n {e}")

Key space set successfully


## With Apache Cassandra we model the database tables based on the queries we want to run and creating the tables according to the below queries is our requirement
#### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4
        > Based on the above requirement we will need to set primary key sessionId, itemInSession. The create table will need to be in the order of primary keys, cluster keys and other columns that are in the select statment i.e., artist, song, length 
#### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
        > Based on the above requirement the create table will need to be in the order of composite primary keys (userId, sessionId), and cluster key itemInSession and then remaining columns in the select statement in proposed order i.e., artist, song, firstname, lastname. The order of select, insert and create will be same. This is the requirement of Cassandra
#### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
        > This query also follows a similar structure as above. Song would be primary key, but we are considering a scenario where multiple users will be accessing the same song and this will result in duplicates, hence we will create a composite primary key (song, userId). The create table will be in the order song, userId, firstname, lastname. Insert will also follow the same order


In [9]:
# Select statements for above requirement
select_query1_table1 = "select artist, song, length from music_app_history_by_session_item where sessionId = 338 and itemInSession = 4"
select_query1_table2 = "select artist, song, firstname, lastname from music_app_history_by_user_session where userId = 10 and sessionId = 182 order by itemInSession Asc "
select_query1_table3 = "select firstname, lastname from music_app_history_by_song where song = 'All Hands Against His Own'"                  

### Now we create tables and insert data from "event_datafile_new" to run the above queries

In [29]:
# Query 1:  Give me the artist, song title and song's length in the music app history that was heard during
create_table1 = "CREATE TABLE IF NOT EXISTS music_app_history_by_session_item (sessionId INT, itemInSession INT, artist TEXT, song TEXT, length FLOAT, PRIMARY KEY (sessionId, itemInSession))"

try:
    session.execute(create_table1)
    print("music_app_history_by_session_item table created")
except Exception as e:
    print(f"music_app_history_by_session_item table Creation Failed \n {e}")        
    
file = 'event_datafile_new.csv' # Input file name

# Insert code for music_app_history_by_session_item
try:
    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader) # skip header
        
        for line in csvreader:
            query = "INSERT INTO music_app_history_by_session_item (sessionId, itemInSession,  artist, song, length)"
            query = query + "VALUES (%s, %s, %s, %s, %s)"
            session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))
    print("music_app_history_by_session_item insert completed")
except Exception as e:
    print(f"music_app_history_by_session_item insert failed \n {e}")

music_app_history_by_session_item table created
music_app_history_by_session_item insert completed


In [30]:
# Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name)
create_table2 = "CREATE TABLE IF NOT EXISTS music_app_history_by_user_session (userId INT,sessionId INT,itemInSession INT,artist TEXT,song TEXT,firstName TEXT,lastName TEXT,PRIMARY KEY ((userId, sessionid), itemInSession)) "
try:
    session.execute(create_table2)
    print("music_app_history_by_user_session table created")
except Exception as e:
    print(f"music_app_history_by_user_session table Creation Failed \n {e}") 
    
# Insert code for music_app_history_by_user_session
try:
    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader) # skip header
        
        for line in csvreader:
            query = "INSERT INTO music_app_history_by_user_session (userId,sessionId,itemInSession,artist,song,firstName,lastName)"
            query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
            session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))
    print("music_app_history_by_user_session insert completed")
except Exception as e:
    print(f"music_app_history_by_user_session insert failed \n {e}")

music_app_history_by_user_session table created
music_app_history_by_user_session insert completed


In [33]:
# Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
create_table3 = "CREATE TABLE IF NOT EXISTS music_app_history_by_song (song TEXT, userId INT, firstname TEXT, lastname TEXT, PRIMARY KEY((song), userId))"
try:
    session.execute(create_table3)
    print("music_app_history_by_song table created")
except Exception as e:
    print(f"music_app_history_by_song table Creation Failed \n {e}") 

    
# Insert code for music_app_history_by_song
try:
    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader) # skip header
        
        for line in csvreader:
            query = "INSERT INTO music_app_history_by_song (song, userId, firstname, lastname)"
            query = query + "VALUES (%s, %s ,%s, %s)"
            session.execute(query, (line[9], int(line[10]),line[1], line[4]))
    print("music_app_history_by_song insert completed")
except Exception as e:
    print(f"music_app_history_by_song insert failed \n {e}")

music_app_history_by_song table created
music_app_history_by_song insert completed


#### Doing a SELECT to verify that the data have been inserted into each table

In [34]:
# Select for music_app_history_by_song
try:
    rows = session.execute(select_query1_table1)
except Exception as e:
    print(f"Select for music_app_history_by_song Failed \n {e}")
    
for row in rows:
    print(row)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', length=495.30731201171875)


In [35]:
# Select for music_app_history_by_user_session
try:
    rows = session.execute(select_query1_table2)
except Exception as e:
    print(f"Select for music_app_history_by_user_session Failed \n {e}")
    
for row in rows:
    print(row)

Row(artist='Down To The Bone', song="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz')
Row(artist='Three Drives', song='Greece 2000', firstname='Sylvie', lastname='Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', firstname='Sylvie', lastname='Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz')


In [36]:
# Select for music_app_history_by_song
try:
    rows = session.execute(select_query1_table3)
except Exception as e:
    print(f"Select for music_app_history_by_song Failed \n {e}")
    
for row in rows:
    print(row)

Row(firstname='Jacqueline', lastname='Lynch')
Row(firstname='Tegan', lastname='Levine')
Row(firstname='Sara', lastname='Johnson')


### Drop the tables before closing out the sessions

In [37]:
## Drop the table before closing out the sessions
drop1 = "DROP TABLE IF EXISTS music_app_history_by_session_item"
drop2 = "DROP TABLE IF EXISTS music_app_history_by_user_session"
drop3 = "DROP TABLE IF EXISTS music_app_history_by_song"
try:
    session.execute(drop1)
    session.execute(drop2)
    session.execute(drop3)
    print("Drop completed")
except Exception as e:
    print(f"Drop failed \n {e}")

Drop completed


### Close the session and cluster connection

In [38]:
session.shutdown()
cluster.shutdown()