# Part I. ETL Pipeline for Pre-Processing the Files

## The following code is pre-processing the event logs to produce a combined CSV file

#### Import Python packages 

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
filepath = os.path.join(os.getcwd(), 'event_data', '*.csv')
file_path_list = glob.glob(filepath)
print(f"Found {len(file_path_list)} CSV files in {filepath}")

Found 30 CSV files in /home/workspace/event_data/*.csv


#### Processing the files to create the datafile csv that will be used for creating the Apache Cassandra tables

In [3]:
data = pd.concat([pd.read_csv(filename) for filename in file_path_list])

output_columns = [
        'artist',
        'firstName',
        'gender',
        'itemInSession',
        'lastName',
        'length',
        'level',
        'location',
        'sessionId',
        'song',
        'userId'
    ]
output_data = data[data.artist.notnull()][output_columns]
output_data = output_data.astype({'userId': 'int64'})

output_data.to_csv('event_datafile_new.csv', index=False)

In [4]:
# Checking the number of rows in the csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(len(f.readlines()))

6821


# Part II. Using Apache Cassandra for data modelling and quering.

## Once pre-processing is done, the CSV file titled <font color=red>event_datafile_new.csv</font>, will be located within the Workspace directory. The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Connecting to Apache Cassandra and setting the keyspace

#### Creating a Cluster

In [5]:
from cassandra.cluster import Cluster
cluster = Cluster()
session = cluster.connect()

#### Create Keyspace

In [6]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity
    WITH REPLICATION = 
    {'class': 'SimpleStrategy', 'replication_factor': 1}
    """)

<cassandra.cluster.ResultSet at 0x7f04a04211d0>

#### Set Keyspace

In [7]:
session.set_keyspace("udacity")

## The following section create separate tables the queries that we are interested in..

### When using Apache Cassandra we need to model the database tables on the queries we want to run.

The following two methods are used for convenience. 
They are useful for inserting data from the CSV to the table and for reading data from the table.

In [8]:
def insert_to_table(table, columns, file='event_datafile_new.csv'):
    """
    Insert data from a CSV file to an Apache Cassandra table.
    
    Note, that the table should already exist in the database.
    """
    table = re.escape(table)
    columns = [re.escape(col).lower() for col in columns]

    columns_str = ','.join(columns)
    values_str = ','.join(['%s'] * len(columns))
    
    query = f"""INSERT INTO {table}
                ( {columns_str} )
                VALUES ( {values_str} )"""

    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader) # skip header
        for line in csvreader:
            data = {
                "artist": line[0],
                "firstname": line[1],
                "gender": line[2],
                "iteminsession": int(line[3]),
                "lastname":line[4],
                "length": float(line[5]),
                "level": line[6],
                "location": line[7],
                "sessionid": int(line[8]),
                "song": line[9],
                "userid": int(line[10]),
            }
            session.execute(query, [data[col] for col in columns])

def read_to_df(query):
    rows = session.execute(query)
    return pd.DataFrame(list(rows))

#### First we create a table as an example and insert data from the CSV file

In [9]:
session.execute("""CREATE TABLE IF NOT EXISTS song_info_session_example (
                   sessionId INT,
                   itemInSession INT,
                   artist TEXT,
                   song TEXT,
                   length FLOAT,
                   PRIMARY KEY (sessionId, itemInSession))""")   
columns = ['sessionId', 'itemInSession', 'artist', 'song', 'length']
insert_to_table("song_info_session_example", columns=columns)

#### Next we execute a SELECT to verify that the data have been inserted into the table

In [10]:
read_to_df("SELECT artist, song, length FROM song_info_session_example LIMIT 10")

Unnamed: 0,artist,song,length
0,Regina Spektor,The Calculation (Album Version),191.085266
1,Octopus Project,All Of The Champs That Ever Lived,250.957916
2,Tegan And Sara,So Jealous,180.061584
3,Dragonette,Okay Dolores,153.390564
4,Lil Wayne / Eminem,Drop The World,229.589752
5,Soulja Boy Tell'em,Let Me Get Em,201.116287
6,Bodo Wartke,Liebeslied (Sprachen: Deutsch_ Englisch_ Franz...,645.276306
7,Evanescence,Bring Me To Life,237.113022
8,Van Halen,Good Enough,243.173416
9,The Academy Is...,Paper Chase (Album Version),209.762817


### The following cells create separate tables for each of the queries that we are interested in.
We are storing the data in Apache Cassandra, denormalized and in a separate table for each query, in order to optimize for fast reads.

#### Question 1: Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

We need to read metadata about the songs that were requested during a particular session.
Each a session can have many items, and we want to include both columns in the *WHERE* clause.

Hence, both the *sessionId* and the *itemInSession*, will need to be parts of the Primary Key.

The *sessionId* will also be the partition key, which will determine how the data are stored in the cluster nodes.

In [11]:
session.execute("""CREATE TABLE IF NOT EXISTS song_info_session (
                   sessionId INT,
                   itemInSession INT,
                   artist TEXT,
                   song TEXT,
                   length FLOAT,
                   PRIMARY KEY (sessionId, itemInSession))""")
insert_to_table("song_info_session", columns=['sessionId', 'itemInSession', 'artist', 'song', 'length'])
read_to_df("SELECT artist, song, length FROM song_info_session WHERE sessionId = 338 AND itemInSession = 4")

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


#### Question 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

For this query, both the *userId* and *sessionId* are included in the *WHERE* clause, so they both need to be part of the Primary Key.
We will also need our result to be sorted by *itemInSession*, so this will be a clustering key.

Because we are going to query using both the *userId* and the *sessionId*, we will use both of them as the Partition Key, so that:
1. sessions from a single user are stored in the same node
2. sessions are being spread evenly among cluster nodes

In [12]:
session.execute("""CREATE TABLE IF NOT EXISTS song_playlist_session (
                   userId INT,
                   sessionId INT,
                   itemInSession INT,
                   artist TEXT,
                   song TEXT,
                   firstName TEXT,
                   lastName TEXT,
                   PRIMARY KEY ((userId, sessionId), itemInSession))""")
insert_to_table(
    "song_playlist_session",
    columns=['userId', 'sessionId', 'itemInSession', 'artist', 'song', 'firstName', 'lastName']
)
read_to_df("SELECT artist, song, firstName, lastName FROM song_playlist_session WHERE userId = 10 AND sessionId = 182") 

Unnamed: 0,artist,song,firstname,lastname
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


#### Question 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

For this query we need to use all the *song* and *userId* as the Primary Key, so that we can
guarantee that the results are unique.

If we don't include, for example, the *userId* as part of the Primary Key, then Cassandra will overwrite rows with the same song
and we won't get the full result back.

Additionally, choosing the *userId* as part of the primary key means that we will get the same *firstName* and *lastName*
multiple times as part of the result, for users who have the same name and have listened to the same song.

In [13]:
session.execute("""CREATE TABLE IF NOT EXISTS song_listeners (
                   song TEXT,
                   userId INT,
                   firstName TEXT,
                   lastName TEXT,
                   PRIMARY KEY (song, userId))""")
insert_to_table("song_listeners", columns=['song', 'userId', 'firstName', 'lastName'])
read_to_df("SELECT firstName, lastName FROM song_listeners WHERE song = 'All Hands Against His Own'")

Unnamed: 0,firstname,lastname
0,Jacqueline,Lynch
1,Tegan,Levine
2,Sara,Johnson


### We can now drop the tables from this demo.

In [14]:
session.execute("DROP TABLE IF EXISTS song_info_session_example")

session.execute("DROP TABLE IF EXISTS song_info_session")
session.execute("DROP TABLE IF EXISTS song_playlist_session")
session.execute("DROP TABLE IF EXISTS song_listeners")

<cassandra.cluster.ResultSet at 0x7f047ac6af60>

### Finally we close the session and cluster connection.

In [15]:
session.shutdown()
cluster.shutdown()