#  ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking current working directory
print(os.getcwd())

# Getting current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Creating a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# joining the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))


/Users/ziyadashraf/Documents/Data Engineering with AWS/Course 1/Project


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:
# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
#
#print(len(full_data_rows_list))
#
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# checking the number of rows in my csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821




### The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Apache Cassandra code in the cells below

#### Creating a Cluster

In [5]:
from cassandra.cluster import Cluster

try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
    
except Exception as e:
    print (e)

#### Create Keyspace

In [6]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1}
    """)
    
except Exception as e:
    print(e)

#### Set Keyspace

In [7]:
try:
    session.set_keyspace('sparkify')
    
except Exception as e:
    print(e)


## Sparkify wants to know the following:

### 1. The artist, song title and song's length in the music app history that was heard during the Session ID: 338, and Item In Session: 4


### 2. Name of artist, song (sorted by Item in Session) and user (first and last name) for User ID: 10  and Session ID: 182
    

### 3. Every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'






# Task 1: The artist, song title and song's length in the music app history

-Created a table with the columns Artist Name, Song Name, Song Length, Session ID, Item in Session because Sparkify wants to know The artist, song title and song's length in a certain Session ID and Item in Session.

-Chose Session ID and Item in Session as the composite key to avoid ALLOW FILTERING and to be able to put them in my WHERE clause without any problems.

In [8]:
#CREATING TABLE artist_songs_length

query = "CREATE TABLE IF NOT EXISTS artist_songs_length "
query = query + "(session_id int, item_in_session int, artist_name text, song text, song_length float, PRIMARY KEY (session_id, item_in_session))"
try:
    session.execute (query)
except Exception as e:
    print (e)                    

In [9]:
#Setting up csv file
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    
    for line in csvreader:
#Assigning the INSERT statements 
        query = "INSERT INTO artist_songs_length (session_id, item_in_session, artist_name, song, song_length) "
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        
        #Assigning which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

### SELECT Statement for Query 1

-This query is just getting the data Sparkify wanted and listing them based on the specific Session ID and Item in Session given.

In [10]:
query = "SELECT artist_name, song, song_length, session_id, item_in_session FROM artist_songs_length \
        WHERE session_id = 338 AND item_in_session = 4"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    

df = pd.DataFrame(rows, columns=['Artist Name', 'Song', 'Song length', 'Session ID', 'Item in Session ID'])

print (df)

  Artist Name                             Song  Song length  Session ID  \
0   Faithless  Music Matters (Mark Knight Dub)   495.307312         338   

   Item in Session ID  
0                   4  


# Task 2:  Name of artist, song (sorted by Item in Session) and user (first and last name)

-Created a table with the columns Artist Name, Song Name, user first and last names, User ID, Item in Session ID and Session ID because Sparkify wants to know The artist of each song, the song title sorted with respect to Item in Session ID, the user name based on a given session ID and user ID.

-Chose User ID and Session ID as the partition keys to avoid ALLOW FILTERING and to be able to put them in my WHERE clause without any problems. Added the clustering column Item in Session ID to the primary key to be able to sort my songs with respect to the Item in Session ID.

In [11]:
#CREATING TABLE user_songs

query = "CREATE TABLE IF NOT EXISTS user_songs "
query = query + "(user_id int, session_id int, item_in_session int, artist_name text, song text,  first_name text, last_name text,  PRIMARY KEY ((user_id, session_id), item_in_session))"


try:
    session.execute (query)
except Exception as e:
    print (e)        


                    

In [12]:
#Setting up csv file
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    
    for line in csvreader:
    #Assign the INSERT statements 
        query = "INSERT INTO user_songs (user_id, session_id, item_in_session, artist_name, song,  first_name, last_name )"
    
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        
        #Assigning which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

### SELECT Statement for Query 2

-This query is just getting the data Sparkify wanted and listing them based on the specific Session ID and User ID given.

In [13]:
# Give me only the following: name of artist, song (sorted by itemInSession) 
#and user (first and last name) for userid = 10, sessionid = 182


query = "SELECT user_id, session_id, item_in_session, artist_name, song,  first_name, last_name  \
        FROM user_songs \
        WHERE session_id=182 \
        AND user_id = 10"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
df = pd.DataFrame(rows, columns=['User ID', 'Session ID', 'Item in Session', 'Artist Name', 'Song', 'First Name' , 'Last Name'])
print(df)


   User ID  Session ID  Item in Session        Artist Name  \
0       10         182                0   Down To The Bone   
1       10         182                1       Three Drives   
2       10         182                2  Sebastien Tellier   
3       10         182                3      Lonnie Gordon   

                                                Song First Name Last Name  
0                                 Keep On Keepin' On     Sylvie      Cruz  
1                                        Greece 2000     Sylvie      Cruz  
2                                          Kilometer     Sylvie      Cruz  
3  Catch You Baby (Steve Pitron & Max Sanna Radio...     Sylvie      Cruz  


# Task 3: Every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

-Created a table with the columns First Name, Last Name, Song Length, User ID because Sparkify wanted a list of their customers who listened to 'All Hands Against His Own'. I added User ID to be able to list all the customers who listened to the song, not just one customer.

-Chose song name and user ID as the primary key to be able to get a list of all the customers who listened to the song. If I put the song name only as the primary key this would result in printing just one customer and not all the customers because the primary key has to be unique.

In [14]:
#CREATING TABLE users

query = "CREATE TABLE IF NOT EXISTS users "
query = query + "( song text, user_id int , first_name text, last_name text, PRIMARY KEY(song, user_id))"


try:
    session.execute (query)
except Exception as e:
    print (e)        

In [15]:
#Setting up csv file
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    
    for line in csvreader:
#Assign the INSERT statements 
        query = "INSERT INTO users (song, user_id, first_name, last_name)"
    
        query = query + "VALUES (%s, %s, %s, %s)"
        
        #Assigning which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (  line[9], int(line[10]), line[1],  line[4]))

-This query is just getting the data Sparkify wanted and listing them based on the User ID and Song Name

In [16]:
#Every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'


query = "SELECT first_name, last_name, song, user_id \
        FROM users \
        WHERE song = 'All Hands Against His Own'"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
df = pd.DataFrame(rows, columns=['First Name', 'Last Name', 'Song', 'User ID'])

print (df)

   First Name Last Name                       Song  User ID
0  Jacqueline     Lynch  All Hands Against His Own       29
1       Tegan    Levine  All Hands Against His Own       80
2        Sara   Johnson  All Hands Against His Own       95


### Dropping the tables

In [17]:
query = "drop table artist_songs_length"

try:
    session.execute(query)
except Exception as e:
    print (e)

In [18]:
query = "drop table user_songs"

try:
    session.execute(query)
except Exception as e:
    print (e)

In [19]:
query = "drop table users"

try:
    session.execute(query)
except Exception as e:
    print (e)

### Closing the session and cluster connection

In [20]:
session.shutdown()
cluster.shutdown()