# Part I. ETL Pipeline for Pre-Processing the Files

This section reads in the individual files in the `event_data` folder and creates a single `event_data_new.csv` file that will be used to populate our Apache Cassandra tables.

#### Import Python packages 

In [88]:
import pandas as pd
from cassandra.cluster import Cluster
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [89]:
curr_dir = os.getcwd()
print('Current working directory: ', curr_dir)

# Get absolute path to event data folder
filepath = curr_dir + '/event_data'

# Walk the event_data/ folder and collect each filepath
for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))

Current working directory:  C:\src\personal\cassandra_etl


#### Processing the files to create the data file csv that will be used for Apache Cassandra tables

In [90]:
full_data_rows_list = [] 
    
for f in file_path_list:
    # read csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        # extract each data row one by one and append it        
        for line in csvreader:
            full_data_rows_list.append(line) 
            
print("Found %s rows of data." %len(full_data_rows_list))

# create a single event data csv filethat will be used to insert data into Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

new_filename = 'event_datafile_new.csv'
print('Writing data to %s' %new_filename)

with open(new_filename, 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

print('Done writing data to new file.')

Found 8056 rows of data.
Writing data to event_datafile_new.csv
Done writing data to new file.


In [91]:
# get the number of rows in new csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    num_rows = sum(1 for line in f)
    print("New data file contains %s rows" %num_rows)

New data file contains 6821 rows


# Part II. Load Data Into Tables and Query the Tables

The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data looks like:

<img src="images/image_event_datafile_new.jpg">

#### Create cluster and establish a connection

In [138]:
# This should make a connection to the Cassandra server
cluster = Cluster()

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

In [139]:
session.execute("CREATE KEYSPACE IF NOT EXISTS sparkify WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")

<cassandra.cluster.ResultSet at 0x158474d2b38>

#### Set Keyspace

In [140]:
session.execute("USE sparkify;")

<cassandra.cluster.ResultSet at 0x15847ac6940>

### Helper Methods
A few methods to help make data insertion easier

In [141]:
from IPython.display import display, clear_output

DATA_FILE = 'event_datafile_new.csv'

def get_data_by_key(row, key):
    """ Helper method which maps column keywords to the value in the 
        data row which has been parse to the appropriate data type 
        
        Arguments:
            row {str[]} -- [represents a single row from event_datafile_new.csv, split by column]
            key {str} -- [the column which you want to parse]
    """
    
    map = {
        'artist': row[0],
        'firstName': row[1],
        'gender': row[2],
        'itemInSession': int(row[3]),
        'lastName': row[4],
        'length': float(row[5]),
        'level': row[6],
        'location': row[7],
        'sessionId': int(row[8]),
        'song': row[9],
        'userId': int(row[10])
    }
    return map[key]

def insert_data_into_table(insertStatement, dataToBeInserted):
    """ Helper method which executes the provided CQL statement, passing in the
        values represented by the keys passed in
        
        Arguments:
            insertStatement {str} -- [CQL insert statement where each data to be inserted has been replaced with %s]
            dataToBeInserted {str[]} -- [array of keys (in order) to be inserted from each row in event_datafile_new.csv]
    """

    with open(DATA_FILE, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader) # skip header
        for i, line in enumerate(csvreader):
            clear_output(wait=True)
            display('{}/{} rows processed.'.format(i, num_rows))
            session.execute(insertStatement, (get_data_by_key(line, key) for key in dataToBeInserted))
        clear_output(wait=True)
        display('All rows processed.')

## Create tables/queries to ask the following three questions of the data

### 1. Get the title, artist, and length of the "i-th" song played by a user during a particular session

#### Table Structure
Because we want to report on the `song_title`, `artist`, and `length` of a song, we will include this data in our table.  In addition, because we want to filter based on their values, we will include `session_id` and `item_in_session` in the table as a `PRIMARY KEY`, with `session_id` as the partition key and `item_in_session` as a clustering column.  This makes sense since, together, the pair comprise a unique identifier for any row in the table and parititioning by `session_id` will minimize partitions that need to be read (should be 1 for our use-case) and ensure that the data is spread evenly across potential nodes

#### Validation Query
To demonstrate our ability to perform this query our `SELECT` statement will find the artist, song title and song's length that was heard during sessionId = 338, and itemInSession  = 4.

In [96]:
song_sessions_table_drop = "DROP TABLE IF EXISTS song_sessions"

song_sessions_table_create = """
CREATE TABLE IF NOT EXISTS song_sessions(
    session_id INT, 
    item_in_session INT,
    artist VARCHAR,
    song_title VARCHAR,
    length DECIMAL,
    PRIMARY KEY (session_id, item_in_session)
);
"""

song_sessions_insert = """
INSERT INTO song_sessions (session_id, item_in_session, artist, song_title, length)
    VALUES (%s, %s, %s, %s, %s)
"""

song_sessions_select = """
SELECT artist, song_title, length
FROM song_sessions
WHERE session_id = 338
AND item_in_session = 4
"""

In [97]:
# Drop and Create Table
session.execute(song_sessions_table_drop)
session.execute(song_sessions_table_create)

<cassandra.cluster.ResultSet at 0x158473750f0>

In [98]:
# Insert Data into Table
insert_data_into_table(song_sessions_insert, ['sessionId', 'itemInSession', 'artist', 'song', 'length'])

'All rows processed.'

In [99]:
# Query table to answer question
rows = session.execute(song_sessions_select)
for row in rows:
    print(row.artist, row.song_title, row.length)

Faithless Music Matters (Mark Knight Dub) 495.3073


### 2. Get user, song, and artist information for all songs played during a session in order.

#### Table Structure
Because we want to report on the `first_name`, and `last_name` of a user and the`song_title` and `artist` of each song in a session, we will include this data in our table.  In addition, because we want to filter and order based on their values, we will include `user_id`, `session_id` and `item_in_session` in the table as a `PRIMARY KEY`, with `user_id` as the partition key and both `session_id` and `item_in_session` as clustering columns (in that order).  This makes sense since, together, these three comprise a unique identifier for any row in the table and parititioning by `user_id` will allow us to utilize the data's hierarchy (`user_id` has a one-to-many relationship with `session_id` to perform additional interesting queries.  Furthermore, `item_in_session` is specified with the `WITH CLUSTERING ORDER BY` clause so that when `user_id` and `session_id` are specified, songs are returned in the order that they were played by default, without the need for an `ORDER BY` clause in the `SELECT` statement`

#### Validation Query
To demonstrate our ability to perform this query our `SELECT` statement will find the name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182.

In [126]:
user_sessions_table_drop = "DROP TABLE IF EXISTS user_sessions"

user_sessions_table_create = """
CREATE TABLE IF NOT EXISTS user_sessions(
    user_id INT,
    session_id INT, 
    item_in_session INT,
    artist VARCHAR,
    song_title VARCHAR,
    first_name VARCHAR,
    last_name VARCHAR,
    PRIMARY KEY (user_id, session_id, item_in_session)
) WITH CLUSTERING ORDER BY (session_id ASC, item_in_session ASC);
"""

user_sessions_insert = """
INSERT INTO user_sessions (user_id, session_id, item_in_session, artist, song_title, first_name, last_name)
    VALUES (%s, %s, %s, %s, %s, %s, %s)
"""

user_sessions_select = """
SELECT artist, song_title, first_name, last_name
FROM user_sessions
WHERE user_id = 10
AND session_id = 182 
"""

In [127]:
# Drop and Create Table
session.execute(user_sessions_table_drop)
session.execute(user_sessions_table_create)

<cassandra.cluster.ResultSet at 0x158472726d8>

In [128]:
# Insert Data into Table
insert_data_into_table(user_sessions_insert, ['userId', 'sessionId', 'itemInSession', 'artist', 'song', 'firstName', 'lastName'])

'All rows processed.'

In [129]:
# Query table to answer question
rows = session.execute(user_sessions_select)
for row in rows:
    print("%20s\t%55s\t%10s\t%10s" %(row.artist, row.song_title, row.first_name, row.last_name))

    Down To The Bone	                                     Keep On Keepin' On	    Sylvie	      Cruz
        Three Drives	                                            Greece 2000	    Sylvie	      Cruz
   Sebastien Tellier	                                              Kilometer	    Sylvie	      Cruz
       Lonnie Gordon	   Catch You Baby (Steve Pitron & Max Sanna Radio Edit)	    Sylvie	      Cruz


### #3. Get all users who have listened to a particular song.

#### Table Structure
Because we want to report on the `first_name`, and `last_name` of a user, we will include this data in our table.  In addition, because we want to filter based on its value, we will include`song_title` in the table as a partition key. Additionally we will add `user_id` as a clustering column since it uniquely identifies `first_name` and `last_name` which could be shared between users.

#### Validation Query
To demonstrate our ability to perform this query our `SELECT` statement will find every user name (first and last) who listened to the song 'All Hands Against His Own'

In [146]:
user_songs_table_drop = "DROP TABLE IF EXISTS user_songs"

user_songs_table_create = """
CREATE TABLE IF NOT EXISTS user_songs(
    song_title VARCHAR,
    user_id INT,
    first_name VARCHAR,
    last_name VARCHAR,
    PRIMARY KEY (song_title, user_id)
);
"""

user_songs_insert = """
INSERT INTO user_songs (song_title, user_id, first_name, last_name)
    VALUES (%s, %s, %s, %s)
    IF NOT EXISTS
"""

user_songs_select = """
SELECT first_name, last_name
FROM user_songs
WHERE song_title = 'All Hands Against His Own'
"""

In [147]:
# Drop and Create Table
session.execute(user_songs_table_drop)
session.execute(user_songs_table_create)

<cassandra.cluster.ResultSet at 0x15848507518>

In [148]:
# Insert Data into Table
insert_data_into_table(user_songs_insert, ['song', 'userId', 'firstName', 'lastName'])

'All rows processed.'

In [149]:
# Query table to answer question
rows = session.execute(user_songs_select)
for row in rows:
    print("%s %s" %(row.first_name, row.last_name))

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Drop the tables to clean up

In [150]:
drop_statements = [song_sessions_table_drop, user_sessions_table_drop, user_songs_table_drop]
for drop in drop_statements:
    session.execute(drop)

### Close the session and cluster connection

In [151]:
session.shutdown()
cluster.shutdown()