# Design the ETL pipeline
- The purpose of this notebook is to design and test the etl pipeline to sparkfy database

In [1]:
import cassandra
import pandas as pd
import os

from CQL_quries import *
from CreateTables import ConnectToCassandra , createKeySpace

In [2]:
# Get the paths of every csv file in the event_data directory
file_paths = []

for root,dirs,files in os.walk('event_data' , topdown=True):
    for file in files:
        file_paths.append(os.path.join(root , file))

## Concatenate every csv file into one csv file

In [3]:
# Create empty dataframs has only the name of the columns
full_df_cols = ['artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName',
               'length', 'level', 'location', 'method', 'page', 'registration',
               'sessionId', 'song', 'status', 'ts', 'userId']
full_df = pd.DataFrame(columns = full_df_cols)

# Concatenate every csv file into one csv file
for file in file_paths:
    df = pd.read_csv(file)
    full_df = pd.concat([full_df , df] , axis=0)
    
# Drop the columns which we will not use in queries
full_df.drop(['auth' , 'method' , 'page' , 'registration' , 'ts' , 'status'], axis = 1,inplace = True)
full_df.dropna(inplace = True)

In [4]:
full_df.shape

(6820, 11)

In [5]:
full_df.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
2,Des'ree,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",139,You Gotta Be,8
4,Mr Oizo,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",139,Flat 55,8
5,Tamba Trio,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",139,Quem Quiser Encontrar O Amor,8
6,The Mars Volta,Kaylee,F,5,Summers,380.42077,free,"Phoenix-Mesa-Scottsdale, AZ",139,Eriatarka,8
7,Infected Mushroom,Kaylee,F,6,Summers,440.2673,free,"Phoenix-Mesa-Scottsdale, AZ",139,Becoming Insane,8


In [6]:
# Replace evey single apostrophe by a two apostrophes to make it easy to insert string type data into database
full_df['artist'] = full_df['artist'].str.replace("'" , "''")
full_df['song'] = full_df['song'].str.replace("'" , "''")
full_df['firstName'] = full_df['firstName'].str.replace("'" , "''")
full_df['firstName'] = full_df['firstName'].str.replace("'" , "''")
full_df.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
2,Des''ree,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",139,You Gotta Be,8
4,Mr Oizo,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",139,Flat 55,8
5,Tamba Trio,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",139,Quem Quiser Encontrar O Amor,8
6,The Mars Volta,Kaylee,F,5,Summers,380.42077,free,"Phoenix-Mesa-Scottsdale, AZ",139,Eriatarka,8
7,Infected Mushroom,Kaylee,F,6,Summers,440.2673,free,"Phoenix-Mesa-Scottsdale, AZ",139,Becoming Insane,8


# Connect to cassandra

In [7]:
session , cluster = ConnectToCassandra()
createKeySpace(session)

# Create Sessions  Table
- Create Sessions Table be able to run the following query
- Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

In [8]:
session_col_names = "sessionId int,  itemInSession int ,artist text , song_title text ,song_length  float"
create_table(session,
             table_name = 'sessions',
             column_names= session_col_names,
             primary_keys= "(sessionId , itemInSession)")

In [9]:
for _ , row in full_df.iterrows():
    insert_into_table(session , table_name = 'sessions' , 
                 values = f"{row['sessionId']},{row['itemInSession']},'{row['artist']} ', '{row['song']}' , {row['length']}")

In [10]:
query = "SELECT artist , song_title ,  song_length FROM sessions WHERE  sessionId = 338 AND  itemInSession = 4"



try:
    rows = session.execute(query)
except Exception as e:
    print(e)

    
for row in rows:
    print(row)

Row(artist='Faithless ', song_title='Music Matters (Mark Knight Dub)', song_length=495.30731201171875)


# Create Usesrs Table
- Create Users Table to be able to run the following query
- Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [11]:
users_col_names = "userId int,  sessionId int, itemInSession int,artist text , song_title text , firstName text ,lastName text "
create_table(session,
             table_name = 'users',
             column_names= users_col_names,
             primary_keys= "(userId , sessionId) , itemInSession")

In [12]:
for _ , row in full_df.iterrows():
    insert_into_table(session , table_name = 'users' ,
                     values=f"{int(row['userId'])},{int(row['sessionId'])},{int(row['itemInSession'])},'{row['artist']}','{row['song']}','{row['firstName']}','{row['lastName']}'")

In [13]:
query = "SELECT itemInSession , artist , song_title , firstName , lastName FROM users  WHERE  userId = 10 AND sessionId = 182"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.iteminsession , row.artist , row.song_title , row.firstname , row.lastname)
    

0 Down To The Bone Keep On Keepin' On Sylvie Cruz
1 Three Drives Greece 2000 Sylvie Cruz
2 Sebastien Tellier Kilometer Sylvie Cruz
3 Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


# Create songs Table
- Create Artists Table to be able to run the following query
- Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [14]:
songs_col_names = "song_title text , userId int, firstName text ,lastName text "
create_table(session,
             table_name = 'songs',
             column_names= songs_col_names,
             primary_keys= "(song_title) , userId")

In [15]:
for _ , row in full_df.iterrows():
    insert_into_table(session , table_name = 'songs' ,
                      values=f"'{row['song']}'  , {int(row['userId'])}, '{row['firstName']}' , '{row['lastName']}' ")

In [16]:
query = "SELECT firstName , lastName FROM songs  WHERE song_title = 'All Hands Against His Own'"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)

    
for row in rows:
    print( row.firstname , row.lastname)
    

Jacqueline Lynch
Tegan Levine
Sara Johnson


In [17]:
query = "DROP TABLE sessions "
session.execute(query)

query = "DROP TABLE users "
session.execute(query)

query = "DROP TABLE songs "
session.execute(query)

<cassandra.cluster.ResultSet at 0x1e1422dae88>

In [18]:
#Close the connecion
session.shutdown()
cluster.shutdown()