# Part I. ETL Pipeline for Pre-Processing the Files

##### PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [79]:
# 1. Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

In [80]:
def duplicate_check(in_df):
    """ 
    Summary line. 
    Check for duplicates
  
    Parameters: 
    arg1 (dataframe)
  
    Returns: 
    duplicates.shape
    """    
    bool_series = in_df.duplicated() 
    # display data 
    #print(df_clean[bool_series].shape)
    return in_df[bool_series].shape

def remove_duplicates(in_df):
    """ 
    Summary line. 
    Removes duplicates
  
    Parameters: 
    arg1 (in_df)
  
    Returns: 
    return unique dataframe
    """    
    bool_series = in_df.duplicated() 
    # removing duplicates
    df_ndup = in_df[~bool_series]
    return df_ndup

def show_duplicates(in_df):
    """ 
    Summary line. 
    Show duplicates
  
    Parameters: 
    arg1 (in_df)
  
    Returns: 
    returns rows having duplicates
    """        
    bool_series = in_df.duplicated()
    return in_df[bool_series]

def run_cql(query, session):
    """ 
    Summary line. 
    Run Cassandra Queries
  
    Parameters: 
    arg1 (query) : CQL
    arg2 (session)
  
    Returns: 
    None
    """        
    try:
        rs1 = session.execute(query)
    except Exception as e:
        print(e)

    for row in rs1:
        print (row)


# GATHER

#### Creating list of filepaths to process original event csv data files

In [81]:
# 2. checking your current working directory
print('Current Working Directory : ',os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'
print(filepath)

# Create a for loop to create a list of files and collect each filepath
# 3. join the file path and roots with the subdirectories using glob
#    get all files matching extension from directory
all_files = []
for root, dirs, files in os.walk(filepath):
    files = glob.glob(os.path.join(root,'*.csv'))
    for f in files :
        all_files.append(os.path.abspath(f))

# get total number of files found
num_files = len(all_files)
print('{} files found in {}'.format(num_files, filepath))

Current Working Directory :  /home/workspace
/home/workspace/event_data
33 files found in /home/workspace/event_data


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [82]:
# 4. Read CSV file one by one and append to list

# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 

print('Total Files = ',len(all_files))
# for every filepath in the file path list 
count = 0
for f in all_files:
    print("File {} : {}".format(count, f))
# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile:         
        count+=1
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
print('Total Records = ',len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
print('Sample Record')
print(full_data_rows_list[0])

# 5. creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

#Not Added : ts
with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId', 'ts'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16], row[15]))                

Total Files =  33
File 0 : /home/workspace/event_data/2018-11-04-events.csv
File 1 : /home/workspace/event_data/2018-11-01-events.csv
File 2 : /home/workspace/event_data/2018-11-22-events.csv
File 3 : /home/workspace/event_data/2018-11-27-events.csv
File 4 : /home/workspace/event_data/2018-11-24-events.csv
File 5 : /home/workspace/event_data/2018-11-18-events.csv
File 6 : /home/workspace/event_data/2018-11-03-events.csv
File 7 : /home/workspace/event_data/2018-11-20-events.csv
File 8 : /home/workspace/event_data/2018-11-11-events.csv
File 9 : /home/workspace/event_data/2018-11-29-events.csv
File 10 : /home/workspace/event_data/2018-11-02-events.csv
File 11 : /home/workspace/event_data/2018-11-26-events.csv
File 12 : /home/workspace/event_data/2018-11-13-events.csv
File 13 : /home/workspace/event_data/2018-11-28-events.csv
File 14 : /home/workspace/event_data/2018-11-25-events.csv
File 15 : /home/workspace/event_data/2018-11-30-events.csv
File 16 : /home/workspace/event_data/2018-11-12-

# ASSESS

In [83]:
# check the number of rows in your csv file
print('Total records after removing empty lines')
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

Total records after removing empty lines
7235


In [84]:
# 6. Read New Event Data File : event_datafile_new.csv : df
df = pd.read_csv('event_datafile_new.csv')
print('(row, column) = ', df.shape)

(row, column) =  (7234, 12)


In [85]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7234 entries, 0 to 7233
Data columns (total 12 columns):
artist           7234 non-null object
firstName        7234 non-null object
gender           7234 non-null object
itemInSession    7234 non-null int64
lastName         7234 non-null object
length           7234 non-null float64
level            7234 non-null object
location         7234 non-null object
sessionId        7234 non-null int64
song             7234 non-null object
userId           7234 non-null int64
ts               7234 non-null float64
dtypes: float64(2), int64(3), object(7)
memory usage: 678.3+ KB


In [86]:
df.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId,ts
0,Professor Longhair,Ann,F,0,Banks,214.20363,free,"Salt Lake City, UT",124,Mean Ol'World,99,1541290000000.0
1,Gary Hobbs,Jahiem,M,1,Miles,245.52444,free,"San Antonio-New Braunfels, TX",42,En Mi Mundo,43,1541300000000.0
2,Lifehouse,Jahiem,M,2,Miles,203.59791,free,"San Antonio-New Braunfels, TX",42,We'll Never Know,43,1541300000000.0
3,Olivia Ruiz,Jahiem,M,3,Miles,254.74567,free,"San Antonio-New Braunfels, TX",42,Cabaret Blanco,43,1541300000000.0
4,Jordan Rudess,Cecilia,F,1,Owens,1367.84934,free,"Atlanta-Sandy Springs-Roswell, GA",225,Tarkus,6,1541310000000.0


In [87]:
bool_series = df.duplicated() 
  
# display data 
print(df[bool_series].shape)
df[bool_series].head()

dups = df[bool_series].copy()
dups.sort_values(['artist', 'firstName', 'itemInSession'], axis = 0, ascending = True, inplace = True, na_position ='first') 
dups.head()

(414, 12)


Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId,ts
7037,+ / - {Plus/Minus},Matthew,M,11,Jones,318.98077,paid,"Janesville-Beloit, WI",998,The Queen of Nothing,36,1543580000000.0
6935,1 Mile North,Ryan,M,0,Smith,352.73098,free,"San Jose-Sunnyvale-Santa Clara, CA",1068,Black Lines,26,1543550000000.0
7116,12 Stones,Jayden,M,5,Fox,184.0322,free,"New Orleans-Metairie, LA",1060,Anthem For The Underdog,101,1543590000000.0
7134,3 Doors Down,Rylan,M,8,George,233.74322,paid,"Birmingham-Hoover, AL",1076,Kryptonite,16,1543590000000.0
6990,3OH!3,Chloe,F,20,Cuevas,192.522,paid,"San Francisco-Oakland-Hayward, CA",1079,My First Kiss (Feat. Ke$ha) [Album Version],49,1543560000000.0


In [88]:
# Duplicate Verification 1
df[(df.artist == '12 Stones') & (df.firstName == 'Jayden') & (df.userId == 101)]

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId,ts
3085,12 Stones,Jayden,M,5,Fox,184.0322,free,"New Orleans-Metairie, LA",1060,Anthem For The Underdog,101,1543590000000.0
7116,12 Stones,Jayden,M,5,Fox,184.0322,free,"New Orleans-Metairie, LA",1060,Anthem For The Underdog,101,1543590000000.0


In [89]:
# Duplicate Verification 2
df[(df.artist == '3 Doors Down') & (df.firstName == 'Rylan') & (df.userId == 16)]

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId,ts
3103,3 Doors Down,Rylan,M,8,George,233.74322,paid,"Birmingham-Hoover, AL",1076,Kryptonite,16,1543590000000.0
7134,3 Doors Down,Rylan,M,8,George,233.74322,paid,"Birmingham-Hoover, AL",1076,Kryptonite,16,1543590000000.0


In [90]:
# Duplicate Verification 3
df[(df.artist == 'Survivor') & (df.userId == 101)]

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId,ts
154,Survivor,Jayden,M,0,Fox,245.36771,free,"New Orleans-Metairie, LA",100,Eye Of The Tiger,101,1541110000000.0
6830,Survivor,Jayden,M,0,Fox,245.36771,free,"New Orleans-Metairie, LA",100,Eye Of The Tiger,101,1541110000000.0


In [91]:
df[(df.song == 'Float On') & (df.userId == 97)]

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId,ts
197,Modest Mouse,Kate,F,5,Harrell,209.52771,paid,"Lansing-East Lansing, MI",828,Float On,97,1542890000000.0
199,Modest Mouse,Kate,F,7,Harrell,209.52771,paid,"Lansing-East Lansing, MI",828,Float On,97,1542890000000.0
3440,Modest Mouse,Kate,F,9,Harrell,209.52771,paid,"Lansing-East Lansing, MI",293,Float On,97,1541530000000.0


### Quality
1. Remove duplicates from df
2. Check of duplicates & remove after splitting df

### Tidiness
1. Change ts column from float to timestamp to string.  
   a. When trying to load cassandra table, encountered error for this timestamp column. But it seemed to load successfully when dataframe timestamp column is of type(string). So converting this timestamp column to string.  
2. Split df into 3 separate dataframes(appHist, songHist, userHist) to store rows as per queries

# CLEAN

In [92]:
#Create copies of dataframe
df_clean = df.copy()

#### Issue 1 : Convert ts from float64 to timestamp
##### Define
Tidiness : 1. Change ts column from float to timestamp

##### Code

In [93]:
df_clean['ts'] = pd.to_datetime(df_clean['ts'], unit='ms')
df_clean['ts'] = df_clean.ts.astype(str)

##### Test

In [94]:
df_clean.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7234 entries, 0 to 7233
Data columns (total 12 columns):
artist           7234 non-null object
firstName        7234 non-null object
gender           7234 non-null object
itemInSession    7234 non-null int64
lastName         7234 non-null object
length           7234 non-null float64
level            7234 non-null object
location         7234 non-null object
sessionId        7234 non-null int64
song             7234 non-null object
userId           7234 non-null int64
ts               7234 non-null object
dtypes: float64(1), int64(3), object(8)
memory usage: 678.3+ KB


#### Issue 2 : Removing duplicates  

##### Define
Quality : 1. Remove duplicates from df  

##### Code

In [95]:
print('df : Duplicate Check : ',duplicate_check(df_clean))

df_ndup = df_clean.copy()

# removing duplicates
df_ndup = remove_duplicates(df_clean)

print('Unique Rows : ',df_ndup.shape)

df_clean = df_ndup.copy()

df : Duplicate Check :  (414, 12)
Unique Rows :  (6820, 12)


##### Test

In [96]:
# Duplicate Verification 1
df_clean[(df_clean.artist == '12 Stones') & (df_clean.firstName == 'Jayden') & (df_clean.userId == 101)]

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId,ts
3085,12 Stones,Jayden,M,5,Fox,184.0322,free,"New Orleans-Metairie, LA",1060,Anthem For The Underdog,101,2018-11-30 15:00:00


#### Issue 3 : Split df to separate dataframes
##### Define
Tidiness : 2. Split df into 3 separate dataframes(appHist, songHist, userHist) to store rows as per queries

##### Code

In [97]:
appHist_df = df_clean[['sessionId', 'itemInSession', 'artist', 'song', 'length']]
userHist_df = df_clean[['userId', 'sessionId', 'itemInSession', 'firstName', 'lastName', 'artist', 'song']]
songHist_df = df_clean[['song', 'userId', 'firstName', 'lastName', 'ts']]

##### Test

In [98]:
#Query 1
appHist_df[(appHist_df.sessionId == 338) & (appHist_df.itemInSession == 4)]

Unnamed: 0,sessionId,itemInSession,artist,song,length
3728,338,4,Faithless,Music Matters (Mark Knight Dub),495.3073


In [99]:
#Query 2
userHist_df[(userHist_df.userId == 10) & (userHist_df.sessionId == 182)]

Unnamed: 0,userId,sessionId,itemInSession,firstName,lastName,artist,song
1899,10,182,0,Sylvie,Cruz,Down To The Bone,Keep On Keepin' On
1900,10,182,1,Sylvie,Cruz,Three Drives,Greece 2000
1901,10,182,2,Sylvie,Cruz,Sebastien Tellier,Kilometer
1902,10,182,3,Sylvie,Cruz,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...


In [100]:
#Query 3
songHist_df[(songHist_df.song =='All Hands Against His Own')]

Unnamed: 0,song,userId,firstName,lastName,ts
998,All Hands Against His Own,95,Sara,Johnson,2018-11-03 18:33:20
4491,All Hands Against His Own,29,Jacqueline,Lynch,2018-11-14 12:53:20
6106,All Hands Against His Own,80,Tegan,Levine,2018-11-15 11:06:40


#### Issue 4 : Check & Remove duplicates in new dataframes

##### Define
Quality : 2. Check of duplicates & remove after splitting df

##### Code

In [101]:
print(duplicate_check(appHist_df))
print(duplicate_check(userHist_df))
print(duplicate_check(songHist_df))

(0, 5)
(0, 7)
(19, 5)


In [102]:
# Removing duplicates from song_history
songHist_df = remove_duplicates(songHist_df)

In [103]:
songHist_df[(songHist_df.song == 'Float On') & (songHist_df.userId == 97)]

Unnamed: 0,song,userId,firstName,lastName,ts
197,Float On,97,Kate,Harrell,2018-11-22 12:33:20
3440,Float On,97,Kate,Harrell,2018-11-06 18:46:40


##### Test

In [104]:
show_duplicates(songHist_df)

Unnamed: 0,song,userId,firstName,lastName,ts


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [105]:
from cassandra.cluster import Cluster
try: 
    # This should make a connection to a Cassandra instance your local machine 
    # (127.0.0.1)   
    cluster = Cluster(['127.0.0.1']) #If you have a locally installed Apache Cassandra instance
    # To establish connection and begin executing queries, need a session    
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [106]:
# TO-DO: Create a Keyspace 
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)
except Exception as e:
    print(e)

#### Set Keyspace

In [107]:
# TO-DO: Set KEYSPACE to the keyspace specified above
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

#### Create Apache Cassandra tables

In [108]:
# DROP QUERIES
appHist_DROP = """DROP TABLE IF EXISTS app_history;"""
userHist_DROP = """DROP TABLE IF EXISTS user_history;"""
songHist_DROP = """DROP TABLE IF EXISTS song_history;"""

# CREATE QUERIES
appHist_CREATE = (
"""
CREATE TABLE IF NOT EXISTS app_history(
sessionId INT,
itemInSession INT,
artist VARCHAR,
song VARCHAR,
length DECIMAL,
PRIMARY KEY(sessionId, itemInSession)
);
""")

userHist_CREATE = (
"""
CREATE TABLE IF NOT EXISTS user_history (
userId INT,
sessionId INT,
itemInSession INT,
firstName VARCHAR,
lastName VARCHAR,
artist VARCHAR,
song VARCHAR,
PRIMARY KEY(userId, sessionId, itemInSession)
);
""")

songHist_CREATE = (
"""
CREATE TABLE IF NOT EXISTS song_history (
song VARCHAR,
userId INT,
firstName VARCHAR,
lastName VARCHAR,
ts timestamp,
PRIMARY KEY(song, userId, ts)
);
""")

# INSERT QUERIES
appHist_INSERT = ("""
INSERT INTO app_history (
sessionId, itemInSession, artist, song, length)
VALUES (%s, %s, %s, %s, %s);
""")

userHist_INSERT = ("""
INSERT INTO user_history (
userId, sessionId, itemInSession, firstName, lastName, artist, song)
VALUES (%s, %s, %s, %s, %s, %s, %s);
""")

songHist_INSERT = ("""
INSERT INTO song_history (
song, userId, firstName, lastName, ts)
VALUES (%s, %s, %s, %s, %s);
""")

try:
    session.execute(appHist_DROP)
    session.execute(userHist_DROP)
    session.execute(songHist_DROP)

    session.execute(appHist_CREATE)
    session.execute(userHist_CREATE)
    session.execute(songHist_CREATE)
except Exception as e:
    print(e)

#### Confirm table creation

In [109]:
# Confirm Table Creation
Q1 = "select count(*) as appHist from app_history ;"
Q2 = "select count(*) as userHist from user_history ;"
Q3 = "select count(*) as songHist from song_history ;"

run_cql(Q1, session)
run_cql(Q2, session)
run_cql(Q3, session)

Row(apphist=0)
Row(userhist=0)
Row(songhist=0)


#### Load table app_history

In [110]:
for i, row in appHist_df.iterrows():
    try:
        row_data = (row.sessionId, row.itemInSession, row.artist, row.song, row.length)
        session.execute(appHist_INSERT, row_data)
    except Exception as e:
        print(e)    
        print(i, row)

#### Load table user_history

In [111]:
for i, row in userHist_df.iterrows():
    try:
        row_data = (row.userId, row.sessionId, row.itemInSession, row.firstName, row.lastName, row.artist, row.song)
        session.execute(userHist_INSERT, row_data)
    except Exception as e:
        print(e)    
        print(i, row)

#### Load table song_history

In [112]:
for i, row in songHist_df.iterrows():
    try:
        row_data = (row.song, row.userId, row.firstName, row.lastName, row.ts)
        session.execute(songHist_INSERT, row_data)
    except Exception as e:
        print(e)    
        print(i, row)

In [113]:
# Check load counts
Q1 = "select count(*) as appHist from app_history ;"
Q2 = "select count(*) as userHist from user_history ;"
Q3 = "select count(*) as songHist from song_history ;"

run_cql(Q1, session)
run_cql(Q2, session)
run_cql(Q3, session)

Row(apphist=6820)
Row(userhist=6820)
Row(songhist=6801)


## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




In [114]:
## TO-DO: Query 1:  Give me the artist, song title and song's length in the music app history that was heard during \
## sessionId = 338, and itemInSession = 4
Q1 = "select artist, song from app_history where sessionId = 338 and itemInSession = 4"
run_cql(Q1, session)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)')


In [115]:
## TO-DO: Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name)\
## for userid = 10, sessionid = 182
Q2 = "select artist, song from user_history where userId = 10 and sessionId = 182"
run_cql(Q2, session)                   

Row(artist='Down To The Bone', song="Keep On Keepin' On")
Row(artist='Three Drives', song='Greece 2000')
Row(artist='Sebastien Tellier', song='Kilometer')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)')


In [116]:
## TO-DO: Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
Q3 = "select firstName, lastName from song_history where song = 'All Hands Against His Own'"
run_cql(Q3, session)

Row(firstname='Jacqueline', lastname='Lynch')
Row(firstname='Tegan', lastname='Levine')
Row(firstname='Sara', lastname='Johnson')


### Drop the tables before closing out the sessions

In [117]:
run_cql(appHist_DROP, session)
run_cql(userHist_DROP, session)
run_cql(songHist_DROP, session)

### Close the session and cluster connection¶

In [118]:
session.shutdown()
cluster.shutdown()