# Part I. ETL Pipeline for Pre-Processing Event Data Files

## Purpose
Since in Apache Cassandra we model tables based on the queries, we often load subsets of the same data into different tables.  Therefore, if we have a set of data files (daily event files in this case), it is more efficient to first combine them into a single file, and then read this single file once and load its contents into data tables.  Additionally, since the event/log files might have data points which are not necessary for us to capture in the data tables, we can also eliminate them as we create this new/combined data file.  Considering the expected size of these (potentially streaming data) event logs, these design efficiencies should go a long way.

# Process

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import csv

#### Create a list of filepaths to process original event csv data files

In [2]:
# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    # join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))

#### Process event data files and merge them into a single file that will be used for Apache Cassandra tables

In [3]:
# Initiate an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# read a csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # create a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
         # extract each data row one by one and append it        
        for line in csvreader:
            full_data_rows_list.append(line) 

# Create a smaller event data csv file called event_datafile_new csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


## Expected Result

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> <br>

<img src="images/image_event_datafile_new.jpg">

# Part II. Apache Cassandra Workflow

## Problem

Business team has provided us with the queries our Apache Cassandra data model needs to be able to answer by using the event data.  In order to satisfy the business request, we need to evaluate the queries and identify what table(s) we need to create, data points within each table, as well as PRIMARY/PARTITION keys/CLUSTERING column(s), to ensure the questions can be answered correctly and efficiently.

## Design

Let's begin by evaluating each query to identify needed table structures and primary/partition keys/clustering column(s)

**Query 1:** Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

To satisfy this query, our "song_playlist_session_item" table should have the following columns (with data types matching those in the data file):
- sessionId
- itemInSession
- artist
- song
- length

SessionID + itemInSession should be our composite PRIMARY KEY (to uniquely identify rows) as well as a PARTITION KEY (to allow for efficient lookups for this query).

**Query 2:** Give me the name of the artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

To satisfy this query, our "song_playlist_session" table should have the following columns (with data types matching those in the data file):
- userId
- sessionId
- itemInSession
- artist
- song
- firstName
- lastName

userId + sessionId + itemInSession should be our composite PRIMARY KEY (to uniquely identify rows), userId + sessionId should be our PARTITION KEY (to allow for efficient lookups for this query) and itemInSession should be our CLUSTERING COLUMN (as the query requests we sort data by itemInSession).

**Query 3:** Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

To satisfy this query, our "song_user" table should have the following columns (with data types matching those in the data file):
- song
- userId
- firstName
- lastName

song + userId should be our composite PRIMARY KEY (to uniquely identify rows), where song is a PARTITION KEY (to allow for efficient lookups for this query) and userId is our CLUSTERING COLUMN.

## Solution

We will follow these steps for solution implementation and testing

1. Create Apache Cassandra cluster
2. Create a keyspace and connect to it
3. Create 3 tables
4. Load data into 3 tables
5. Run 3 queries to ensure we are getting expected results

#### Create Apache Cassandra cluster

In [4]:
# Import needed libraries
import cassandra
from cassandra.cluster import Cluster

try: 
    # Define a cluster
    cluster = Cluster(['127.0.0.1']) #If you have a locally installed Apache Cassandra instance
    
    # To establish connection and begin executing queries, need a session
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [5]:
# Create a Keyspace 
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [6]:
# Set KEYSPACE to the keyspace specified above
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

#### Create Tables

In [7]:
# Create table for query 1
try:
    session.execute("""
    CREATE TABLE IF NOT EXISTS song_playlist_session_item (
        sessionId      INT, 
        itemInSession  INT, 
        artist         TEXT, 
        song           TEXT, 
        length         TEXT, 
        PRIMARY KEY((sessionId, itemInSession))
        );
""")
except Exception as e:
    print(e) 
    
# Create table for query 2
try:
    session.execute("""
    CREATE TABLE IF NOT EXISTS song_playlist_session (
        userId         INT,
        sessionId      INT, 
        itemInSession  INT, 
        artist         TEXT, 
        song           TEXT, 
        firstName      TEXT, 
        lastName       TEXT,
        PRIMARY KEY((userId, sessionId), itemInSession)
        );
""")
except Exception as e:
    print(e) 
    
# Create table for query 3
try:
    session.execute("""
    CREATE TABLE IF NOT EXISTS song_user (
        song           TEXT, 
        userId         INT,
        firstName      TEXT, 
        lastName       TEXT,
        PRIMARY KEY(song, userId)
        );
""")
except Exception as e:
    print(e) 


#### Load Data

In [8]:
# We will open a file once, read it once, and load data into 3 tables at the same time

# Define filename of the data file to read
file = 'event_datafile_new.csv'

# Define insert queries
# Query 1
query1 = "INSERT INTO song_playlist_session_item (sessionId, itemInSession, artist, song, length)"
query1 = query1 + " VALUES (%s, %s, %s, %s, %s)"  

# Query 2
query2 = "INSERT INTO song_playlist_session (userId, sessionid, itemInSession, artist, song, firstName, lastName)"
query2 = query2 + " VALUES (%s,%s,%s,%s,%s,%s,%s)"

# Query 3
query3 = "INSERT INTO song_user (song, userId, firstName, lastName)"
query3 = query3 + " VALUES (%s,%s,%s,%s)"

# Open the file, read it row by row, extract needed data points, and insert them into the tables
# Loading into 3 tables at the same time allows us to read the data file only once!
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:   
        # execute insert query 1
        try:
            session.execute(query1, (int(line[8]), int(line[3]), line[0], line[9], line[5]))
        except Exception as e:
            print(e) 
        # execute insert query 2
        try:
            session.execute(query2, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))
        except Exception as e:
            print(e)   
        # execute insert query 3
        try:
            session.execute(query3, (line[9], int(line[10]), line[1], line[4]))
        except Exception as e:
            print(e)   

#### Test Queries

##### Query 1

In [12]:
# Give me the artist, song title and song's length in the music app history that was heard during \
# sessionId = 338, and itemInSession = 4   
query = "SELECT artist, song, length FROM song_playlist_session_item WHERE sessionId = 338 and itemInSession = 4"

## Execute query
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

# Output query results
for row in rows:
    print (row.artist, row.song, row.length)

Faithless Music Matters (Mark Knight Dub) 495.3073


##### Query 2

In [13]:
# Give me the name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
query = "SELECT artist, song, firstname, lastname  FROM song_playlist_session WHERE userId = 10 AND sessionId = 182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

# Output query results
for row in rows:
    print (row.artist, row.song, row.firstname, row.lastname)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


##### Query 3

In [14]:
# Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
query = "SELECT firstname, lastname  FROM song_user WHERE song = 'All Hands Against His Own'"

# Execute the query
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

# Output query results
for row in rows:
    print (row.firstname, row.lastname)

Jacqueline Lynch
Tegan Levine
Sara Johnson


#### Drop the tables before closing out the sessions

In [15]:
# Drop the tables created before closing out the sessions
query = "DROP TABLE IF EXISTS song_playlist_session_item"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "DROP TABLE IF EXISTS song_playlist_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
query = "DROP TABLE IF EXISTS song_user"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

#### Close the session and cluster connection¶

In [16]:
session.shutdown()
cluster.shutdown()