# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv
import decimal
from prettytable import PrettyTable

#### Creating list of filepaths to process original event csv data files

In [2]:
"""
The code retrieves a list of file paths from the "event_data" directory in the current working directory 
and prints the current working directory and the first two file paths in the list.
"""
filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))

print(os.getcwd(), "\n")
print(file_path_list[0:2])

/home/workspace 

['/home/workspace/event_data/.ipynb_checkpoints/2018-11-15-events-checkpoint.csv', '/home/workspace/event_data/.ipynb_checkpoints/2018-11-05-events-checkpoint.csv']


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
"""
The code reads data from each file in the file_path_list and 
appends each row of data to the 'full_data_rows_list' list.
"""

full_data_rows_list = [] 

for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        #The first row is just the names of the columns, so skip it
        next(csvreader)    
        for line in csvreader:
            full_data_rows_list.append(line) 

            
print("Number of rows in full_data_rows_list: ")
print(len(full_data_rows_list),"\n")

print("The data inside the full_data_rows_list looks like:")
print(full_data_rows_list[0:2])


Number of rows in full_data_rows_list: 
2937 

The data inside the full_data_rows_list looks like:
[['Harmonia', 'Logged In', 'Ryan', 'M', '0', 'Smith', '655.77751', 'free', 'San Jose-Sunnyvale-Santa Clara, CA', 'PUT', 'NextSong', '1.54102E+12', '583', 'Sehr kosmisch', '200', '1.54224E+12', '26'], ['The Prodigy', 'Logged In', 'Ryan', 'M', '1', 'Smith', '260.07465', 'free', 'San Jose-Sunnyvale-Santa Clara, CA', 'PUT', 'NextSong', '1.54102E+12', '583', 'The Big Gundown', '200', '1.54224E+12', '26']]


In [4]:
"""
In summary, this code creates a CSV file with a specific format, writes a header row to the file, 
and then writes data rows to the file based on the contents of a list of data rows. 
The if statement is used to skip any rows that don't contain data in the first column.
The 'event_datafile_new.csv' file will be used to insert data into Cassandra tables
"""
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

In [5]:
# Count the number of lines in a CSV file called "event_datafile_new.csv"
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print("Number of rows in even_datafile_new file:")
    print(sum(1 for line in f))

Number of rows in even_datafile_new file:
2532


# Part II.The Apache Cassandra 

## The <font color=red>event_datafile_new.csv</font> contains the following columns: 

- 0 ----- artist 
- 1 ----- firstName of user
- 2 ----- gender of user
- 3 ----- item number in session
- 4 ----- last name of user
- 5 ----- length of the song
- 6 ----- level (paid or free song)
- 7 ----- location of the user
- 8 ----- sessionId
- 9 ----- song title
- 10 ---- userId



The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> file:<br>

<img src="images/image_event_datafile_new.jpg">

## Apache Cassandra code

#### Creating a Cluster

In [6]:
"""
The code creates a connection to a Cassandra cluster running on the local machine 
and prints any exceptions that occur during the connection process.
"""
from cassandra.cluster import Cluster
try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [7]:
"""
The code tries to execute a CQL query to create a keyspace named "project_songs" with a 
replication strategy of SimpleStrategy and a replication factor of 1, and if there's an exception, 
it prints the error message.
"""
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS project_songs
    WITH REPLICATION = 
    {'class': 'SimpleStrategy', 'replication_factor':1}"""
)
    
except Exception as e:
    print(e)

#### Set Keyspace

In [8]:
"""
The code attempts to set the keyspace to "project_songs" using the set_keyspace method of 
a session object, and if an exception is raised, it prints the error message to the console.
"""
try:
    session.set_keyspace('project_songs')
except Exception as e:
    print(e)

## With Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


In [9]:
"""
The PRIMARY KEY is defined as (session_id, item_in_session). In Apache Cassandra, the PRIMARY KEY 
is used to uniquely identify each row in a table.

In this case, the PRIMARY KEY is composed of two columns: session_id and item_in_session. 
This means that each row in the music_app_history table will be uniquely identified by a 
combination of these two columns.

The decision to use (session_id, item_in_session) as the PRIMARY KEY was based on the 
requirements of the application. In this case, the application needs to be able 
to query the 'song_info_by_session' table based on both session_id and item_in_session. By using 
these two columns as the PRIMARY KEY, the table will be optimized for queries that filter on these columns.

It's worth noting that the order of the columns in the PRIMARY KEY is important. In this case, 
session_id is listed first, which means that rows will be partitioned based on session_id. 
This can be useful for queries that filter on session_id, as it allows Cassandra to efficiently 
retrieve all rows with a given session_id. The item_in_session column is listed second, which 
means that it will be used as a clustering column. Clustering columns determine the order in 
which rows are stored within a partition, and can be used to sort and filter data within a partition.
"""

query = "CREATE TABLE IF NOT EXISTS song_info_by_session "
query = query + "(session_id int, item_in_session int, artist text, song_title text, song_length decimal, \
                    PRIMARY KEY (session_id,item_in_session))"

try:
    session.execute(query)
except Exception as e:
    print(e)

In [10]:
"""   
The code reads in data from a CSV file called "event_datafile_new.csv", skips the header row, 
and then iterates through each row of data, inserting specific columns of data into a 
table called "song_info_by_session" in Apache Cassandra.
"""

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
        query = "INSERT INTO song_info_by_session (session_id, item_in_session, artist, song_title, song_length)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], decimal.Decimal(line[5])))
        

#### Do a SELECT to verify that the data have been inserted into each table

In [11]:
query = "SELECT artist, song_title, song_length FROM song_info_by_session limit 5"

try: 
    rows = session.execute(query)
except Exception as e:
    print(e)

query_table = PrettyTable(['Artist', 'Song', 'Length'])
for row in rows:
    query_table.add_row([row.artist, row.song_title, row.song_length])

print("Five rows from the 'song_info_by_session' table:")
print(query_table, "\n")



query2 = "SELECT artist, song_title, song_length FROM song_info_by_session WHERE session_id=338 AND item_in_session=4"
try: 
    rows2 = session.execute(query2)
except Exception as e:
    print(e)

if rows2:
    query_table.clear_rows()
    for row in rows2:
        query_table.add_row([row.artist, row.song_title, row.song_length])
        
    print("The artist, song title and song's length for session_id 338, and item_in_session 4:")
    print(query_table)    
else:
    print("The record for session_id 338, item_in_session 4 is missing on the 'song_info_by_session' table")


Five rows from the 'song_info_by_session' table:
+--------------------+-----------------------------------+-----------+
|       Artist       |                Song               |   Length  |
+--------------------+-----------------------------------+-----------+
|   Regina Spektor   |  The Calculation (Album Version)  | 191.08526 |
|  Octopus Project   | All Of The Champs That Ever Lived | 250.95791 |
|   Tegan And Sara   |             So Jealous            | 180.06159 |
|     Dragonette     |            Okay Dolores           | 153.39057 |
| Lil Wayne / Eminem |           Drop The World          | 229.58975 |
+--------------------+-----------------------------------+-----------+ 

The artist, song title and song's length for session_id 338, and item_in_session 4:
+-----------+---------------------------------+----------+
|   Artist  |               Song              |  Length  |
+-----------+---------------------------------+----------+
| Faithless | Music Matters (Mark Knight Dub) | 4

## Create queries to ask the following of the data

### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [12]:
"""
In this case, the PRIMARY KEY is composed of three columns: user_id, session_id, and item_in_session.
This means that data will be partitioned based on the values in these columns. The user_id, and session_id is the
the partition key, meaning that data will be grouped together based on the value in these columns. 
Within each partition, data will be sorted based on the values in the item_in_session column.

By using user_id and session_id as the partition key, all of a user's listening history will be stored 
together on a single node, making queries for that user's history fast and efficient. 
The item_in_session column is included in the PRIMARY KEY to ensure that  data is sorted correctly
within each partition.

The Partition Key is responsible for data distribution across the nodes.
The Clustering Key is responsible for data sorting within the partition.

"""

query = "CREATE TABLE IF NOT EXISTS artist_user_info_by_id "
query = query + "(user_id int, session_id int, item_in_session int, artist text, song_title text, first_name text, \
                    last_name text, PRIMARY KEY ((user_id, session_id), item_in_session))"

try:
    session.execute(query)
except Exception as e:
    print(e)          

In [13]:
"""
This code reads data from a CSV file called "event_datafile_new.csv" and inserts the 
data into a table called "artist_user_info_by_id" in Apache Cassandra using the 
execute method of a session object.
"""
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    # skip header
    next(csvreader) 
    for line in csvreader:
        query = "INSERT INTO artist_user_info_by_id (user_id, session_id, item_in_session, artist, song_title, \
                                                        first_name, last_name)"
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

#### Do a SELECT to verify that the data have been inserted into each table

In [14]:
query = "SELECT artist, song_title, first_name, last_name FROM artist_user_info_by_id limit 5"

try: 
    rows = session.execute(query)
except Exception as e:
    print(e)

query_table = PrettyTable(['Artist', 'Song', 'First Name', 'Last Name'])
for row in rows:
    query_table.add_row([row.artist, row.song_title, row.first_name, row.last_name])

print("Five rows from the 'artist_user_info_by_id' table:")
print(query_table, "\n")



query2 = "SELECT artist, song_title, first_name, last_name FROM artist_user_info_by_id WHERE \
                user_id=10 AND session_id=182"
try: 
    rows2 = session.execute(query2)
except Exception as e:
    print(e)

if rows2:
    query_table.clear_rows()
    for row in rows2:
        query_table.add_row([row.artist, row.song_title, row.first_name, row.last_name])
        
    print("The name of artist, song and user (first & last name) for userid = 10, and sessionid = 182:")
    print(query_table)    
else:
    print("The record for user_id 10 and session_id 182 is missing on the 'artist_user_info_by_id' table")


Five rows from the 'artist_user_info_by_id' table:
+--------------------------------------+-------------------------------------------+------------+-----------+
|                Artist                |                    Song                   | First Name | Last Name |
+--------------------------------------+-------------------------------------------+------------+-----------+
|             1 Mile North             |                Black Lines                |    Ryan    |   Smith   |
|   USS (Ubiquitous Synergy Seeker)    |             Man Makes The Zoo             |    Ryan    |   Smith   |
| EsmÃÂ©e Denters / Justin Timberlake | Love Dealer (Featuring Justin Timberlake) |    Ryan    |   Smith   |
|                Train                 |              Hey_ Soul Sister             |    Ryan    |   Smith   |
|   The Pussycat Dolls / Snoop Dogg    |                 Bottle Pop                |    Ryan    |   Smith   |
+--------------------------------------+-----------------------------

## Create queries to ask the following of the data
### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [15]:
"""
In Cassandra, the primary key is used to determine the partition key and the clustering columns. 
The partition key is used to distribute the data across the cluster, while the clustering columns
are used to sort the data within each partition.

In our example, we have chosen song_title as our partition key and user_id as our clustering column. 
This means that all data for a particular song will be stored together in the same partition, and 
within each partition, the data will be sorted by user ID.

We chose song_title as our partition key because we want to be able to query the data based 
on the song title. We chose user_id as our clustering column because we want to be able to sort 
the data within each partition by user ID.

If you don't involve the user_id column in this case, you won't be able to sort the results by user. 
The query would still return every user who listened to the song "All Hands Against His Own", but the 
results would be in an arbitrary order.

"""

query = "CREATE TABLE IF NOT EXISTS username_by_songtitle "
query = query + "(song_title text, user_id int, first_name text, last_name text, PRIMARY KEY (song_title, user_id))"

try:
    session.execute(query)
except Exception as e:
    print(e)

                    

In [16]:
"""
This code reads data from a CSV file named "event_datafile_new.csv" and inserts the 
data into a table called "username_by_songtitle" in Apache Cassandra.
"""
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO username_by_songtitle (song_title, user_id, first_name, last_name)"
        query = query + " VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

#### Do a SELECT to verify that the data have been inserted into each table

In [17]:
query = "SELECT song_title, first_name, last_name FROM username_by_songtitle limit 5"

try: 
    rows = session.execute(query)
except Exception as e:
    print(e)

query_table = PrettyTable(['Song', 'First Name', 'Last Name'])
for row in rows:
    query_table.add_row([row.song_title, row.first_name, row.last_name])

print("Five rows from the 'username_by_songtitle' table:")
print(query_table, "\n")



query2 = "SELECT song_title, first_name, last_name FROM username_by_songtitle WHERE song_title='All Hands Against His Own'"
try: 
    rows2 = session.execute(query2)
except Exception as e:
    print(e)

if rows2:
    query_table.clear_rows()
    for row in rows2:
        query_table.add_row([row.song_title, row.first_name, row.last_name])
        
    print("Users who listened to the song 'All Hands Against His Own'")
    print(query_table)    
else:
    print("The records for the users whoe listened to the song 'All Hands Against His Own' are missing")

Five rows from the 'username_by_songtitle' table:
+-------------------------------------+------------+-----------+
|                 Song                | First Name | Last Name |
+-------------------------------------+------------+-----------+
|  Too Tough (1994 Digital Remaster)  |   Aleena   |   Kirby   |
| Rio De Janeiro Blue (Album Version) |   Chloe    |   Cuevas  |
|             Misfit Love             |   Jayden   |   Graves  |
|           Hey_ Soul Sister          |    Ryan    |   Smith   |
|           Hey_ Soul Sister          |   Carlos   |   Carter  |
+-------------------------------------+------------+-----------+ 

Users who listened to the song 'All Hands Against His Own'
+---------------------------+------------+-----------+
|            Song           | First Name | Last Name |
+---------------------------+------------+-----------+
| All Hands Against His Own |   Tegan    |   Levine  |
| All Hands Against His Own |    Sara    |  Johnson  |
+---------------------------+

### Drop the tables before closing out the sessions

In [18]:
query = "drop table song_info_by_session"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "drop table artist_user_info_by_id"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "drop table username_by_songtitle"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)

### Close the session and cluster connection¶

In [19]:
session.shutdown()
cluster.shutdown()