# Project - Data Modeling with Cassandra

### _Import required packages_

In [1]:
import glob
import pandas as pd
import os

from cassandra.cluster import Cluster
from prettytable import PrettyTable

## Part I. ETL Pipeline for Pre-Processing the Files

### _Load Dataset_

In [2]:
# Get the data directory with current directory as the workplace.
data_dir = os.path.join(os.getcwd(), 'event_data')

# Get all the filename to be loaded.
filename_list = glob.glob(os.path.join(data_dir,'*'), recursive=True)

# Load dataset
df_list = [pd.read_csv(filename) for filename in filename_list]
raw_data = pd.concat(df_list)

### _Filter Dataset_

In [3]:
# Define the target columns
target_columns = [
    'artist', 
    'firstName', 
    'gender', 
    'itemInSession', 
    'lastName', 
    'length', 
    'level', 
    'location', 
    'sessionId', 
    'song', 
    'userId'
]

# Remove the data which artist is missing.
data = raw_data[raw_data['artist'].notna()][target_columns]

# Cast the type of userId to int
data['userId'] = data['userId'].astype(int)

# Dump filtered data to `event_datafile_new.csv` in workplace.
data.to_csv('event_datafile_new.csv', index=False)

## Part II. ETL Pipeline for Cassandra Database Operations. 

### _Define the Utility Methods_

In [4]:
def execute_query(session, query, data=None):
    """Execute the given query and data
    
    Keyword arguments:
    session (cassandra.cluster.Connection) -- cassandra connection session.
    query (str) -- the query to be executed.
    data (list) -- the data corresponding to the given query.
    
    Reture:
    results (cassandra.cluster.ResultSet) -- the results of the execution.
    """
    try:
        results = session.execute(query, data)
    except Exception as e:
        print(e)
    
    return results


def insert_data(session, table_name, columns, data):
    """Insert the data into table with the given information and pre-defined execute_query method
    
    Keyword arguments:
    session (cassandra.connection) -- cassandra connection session.
    table_name (str) -- the name of the table where the data is inserted into.
    columns (List[str]) -- the column names of the table.
    data (dataframe) -- the data corresponding to the given columns.
    """
    insert_query = f'insert into {table_name} ({",".join(columns)}) values ({",".join(["%s"]*len(columns))})'
    for row in data.values:
        execute_query(session, insert_query, row.tolist())


### _Establish the Connection to Crassandra Database Cluster_

In [5]:
# Create a cluster object.
cluster = Cluster()

# Establish the connection and create a session for the operations.
session = cluster.connect()

### _Create a Keyspace_

In [6]:
# Define the query for creating the keyspace.
query = """create keyspace if not exists udacity
with replication = {'class': 'SimpleStrategy', 'replication_factor': 1}"""

# Execute the query to create the keyspace.
try:
    session.execute(query)
except Exception as e:
    print(e)

### _Set KeySpace to Session_

In [7]:
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

### _Create Queries to Ask the following 3 questions of the data_

### Query 1

**Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4**

### Query 2

**Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182**

### Query 3

**Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'**


### _ETL Pipeline for Query 1_

**Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4**

The session_id and item_in_session are used to create the primary key becasue the data is uniquely identified by them.
And the session_id is choosen as the partition key because I expected the data can be evenly distributed, 
which item_in_session could be skewed towards 0.
As the result, the query will looks like follows:

```
select artist, song_title, song_length 
from session_history 
where sessionId = 338 and itemInSession = 4
```

**Create Table and Insert Data**

In [8]:
# Drop the table to prevent the exception.
execute_query(session, "drop table if exists session_history")

# According to the explanation, session_id and item_in_session_id are used to create the primary key.
# And the session_id is partition key and item_in_session_id is clustering key.
# The asked information is artist, song title and song length so that they are the clustering columns.
create_table_query = """
create table if not exists session_history
(session_id int, item_in_session int, artist text, song_title text, song_length float, 
primary key (session_id, item_in_session))
"""
execute_query(session, create_table_query)

# Insert Data
# Define the source columns (dataframe) and target columns (table).
columns_in_dataframe = ['sessionId', 'itemInSession', 'artist', 'song', 'length']
columns_in_table = ['session_id', 'item_in_session', 'artist', 'song_title', 'song_length']
insert_data(session, 'session_history', columns_in_table, data[columns_in_dataframe])

**Execute the Query to Ask the Question**

In [9]:
query = """
select artist, song_title, song_length 
from session_history 
where session_id = 338 and item_in_session = 4
"""

# execute the query
rows = execute_query(session, query)

# show the results
pretty_table = PrettyTable(['artist', 'song_title', 'song_length'])
for row in rows:
    pretty_table.add_row(row)

print("Table 1. Song Info for Session ID: 338, Item: 4")
print(pretty_table)

Table 1. Song Info for Session ID: 338, Item: 4
+-----------+---------------------------------+--------------------+
|   artist  |            song_title           |    song_length     |
+-----------+---------------------------------+--------------------+
| Faithless | Music Matters (Mark Knight Dub) | 495.30731201171875 |
+-----------+---------------------------------+--------------------+


### _ETL Pipeline for Query 2_

**Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182**

Similar approach to query 1, the data is identified by user_id and session_id so that they are choosen as the primary key. And both user_id and session_id looks ok for evently distributed so that they are used as the partition key. 
Besides, as it mentioned that song is going to be sorted by item_in_session, item_in_session is used as clustering key. 
As the result, the primary key consists of user_id and seesion_id as partition key and item_in_session as the clustering key.
And the query would look like follows:

```
select artist, song_title, first_name, last_name
from user_history 
where user_id = 10 and session_id = 182
```

**Create Table and Insert Data**

In [10]:
# Drop the table to prevent the exception.
execute_query(session, "drop table if exists user_history")

# According to the explanation above, user_id and session_id are used to create the partition key, 
# and item_in_session is used as the clustering key.
# The asked information is artist, song_title, first_name, last_name so that they are the clustering columns.
create_table_query = """
create table if not exists user_history
(user_id int, session_id int, item_in_session int, artist text, song_title text, first_name text, last_name text, 
primary key ((user_id, session_id), item_in_session))
"""
execute_query(session, create_table_query)

# Insert Data
# Define the source columns (dataframe) and target columns (table).
columns_in_dataframe = ['userId', 'sessionId','itemInSession', 'artist', 'song', 'firstName', 'lastName']
columns_in_table = ['user_id', 'session_id', 'item_in_session', 'artist', 'song_title', 'first_name', 'last_name']
insert_data(session, 'user_history', columns_in_table, data[columns_in_dataframe])

**Execute the Query to Ask the Question**

In [11]:
query = """
select artist, song_title, first_name, last_name
from user_history 
where user_id = 10 and session_id = 182
"""

# execute the query
rows = execute_query(session, query)

# show the results
pretty_table = PrettyTable(['artist', 'song_title', 'first_name', 'last_name'])
for row in rows:
    pretty_table.add_row(row)

print("Table 2. Songs Info Played By User ID: 10, Session ID: 4")
print(pretty_table)

Table 2. Songs Info Played By User ID: 10, Session ID: 4
+-------------------+------------------------------------------------------+------------+-----------+
|       artist      |                      song_title                      | first_name | last_name |
+-------------------+------------------------------------------------------+------------+-----------+
|  Down To The Bone |                  Keep On Keepin' On                  |   Sylvie   |    Cruz   |
|    Three Drives   |                     Greece 2000                      |   Sylvie   |    Cruz   |
| Sebastien Tellier |                      Kilometer                       |   Sylvie   |    Cruz   |
|   Lonnie Gordon   | Catch You Baby (Steve Pitron & Max Sanna Radio Edit) |   Sylvie   |    Cruz   |
+-------------------+------------------------------------------------------+------------+-----------+


### _ETL Pipeline for Query 3_

**Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'**

The song is used to identified the data to find the user who has listened to.
Obviously, song is going to be used as the partition key. 
On the other hand, the combination of last_name and fist_name is not guaranteed to be unique. 
user_id is used as the unique identifier although it's not required actually.
As the result, the primary key consists of song as the partition key and user_id as the clustering key.
And the query would looks like follows:

```
select first_name, last_name 
from song_history
where song_title = 'All Hands Against His Own'
```

**Create Table and Insert Data**

In [12]:
# Drop the table to prevent the exception.
execute_query(session, "drop table if exists song_history")

# According to the explanation, song_title and user_id are used to create the primary key, 
# which song_title is the partition key and user_id is the clustering key.
# The user list will be sorted by user_id because user_id is the clustering key.
# The asked information is first_name, last_name so that they are the clustering columns.
create_table_query = """
create table if not exists song_history
(song_title text, user_id int, first_name text, last_name text, PRIMARY KEY (song_title, user_id))
"""
execute_query(session, create_table_query)

# Insert Data
# Define the source columns (dataframe) and target columns (table).
columns_in_dataframe = ['song', 'userId', 'firstName', 'lastName']
columns_in_table = ['song_title', 'user_id', 'first_name', 'last_name']
insert_data(session, 'song_history', columns_in_table, data[columns_in_dataframe])

**Execute the Query to Ask the Question**

In [13]:
query = """
select first_name, last_name 
from song_history
where song_title = 'All Hands Against His Own'
"""

# execute the query
rows = execute_query(session, query)

# show the results
pretty_table = PrettyTable(['first_name', 'last_name'])
for row in rows:
    pretty_table.add_row(row)

print("Table 2. Users Who Listened Song: All Hands Against His Own")
print(pretty_table)

Table 2. Users Who Listened Song: All Hands Against His Own
+------------+-----------+
| first_name | last_name |
+------------+-----------+
| Jacqueline |   Lynch   |
|   Tegan    |   Levine  |
|    Sara    |  Johnson  |
+------------+-----------+


### _Drop the Tables Before Closing_

In [14]:
execute_query(session, "drop table if exists session_history")
execute_query(session, "drop table if exists user_history")
execute_query(session, "drop table if exists song_history")

<cassandra.cluster.ResultSet at 0x7f8892c0ce10>

### _Close the Session and the Cluster Connection_

In [15]:
session.shutdown()
cluster.shutdown()