# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
file_path_list = []
for root, dirs, files in os.walk(filepath):
    file_path_list += glob.glob(os.path.join(root,'*'))

/Users/antoniolechuga/Developer/udacity/data-engineering/dend-p2


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

    # reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        # removing the header line
        next(csvreader)
        
        # extracting each data row one by one and append it        
        for line in csvreader:
            full_data_rows_list.append(line) 
            
# creating a smaller event data csv file called event_datafile_full csv
# that will be used to insert data into the Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_data_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow([
        'artist',
        'firstName',
        'gender',
        'itemInSession',
        'lastName',
        'length',
        'level',
        'location',
        'sessionId',
        'song',
        'userId'
    ])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in csv file
with open('event_data_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Apache Cassandra Data Modeling

##  The event_datafile_full.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster

Creting the cluster that will be used for this project. It is assumed that default authentication credentials are being used by the user. If this is not your case, change the parameters `username` and `password` in line 5. 

In [5]:
# This should make a connection to a Cassandra instance your local machine (127.0.0.1)
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(auth_provider=auth_provider)

# To establish connection and begin executing queries, need a session
try:
    session = cluster.connect()
except Exception as e:
    print(f"Exception encountered while trying to connect with the db backend: {e}")

#### Create Keyspace

Creating the `sparkifydb` keyspace. A simple strategy and a replication of 1 is being used for testing purposes. These parameters need to be modified in production.

In [6]:
keyspace_config = "{'class': 'SimpleStrategy', 'replication_factor': 1}"
try:
    session.execute("CREATE KEYSPACE IF NOT EXISTS sparkifydb WITH REPLICATION = " + keyspace_config)
except Exception as e:
    print(f"Exception encountered while trying to create keyspace: {e}")

#### Set Keyspace

Setting `sparkifydb` as session's keyspace. 

In [7]:
try:
    session.set_keyspace('sparkifydb')
except Exception as e:
    print(f"Exception encountered while trying to set keyspace: {e}")

## Data Modeling

In this section the required tables will be created to be able to perform efficiently the following three kinds of queries:
1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = `<session_id>`, and itemInSession = `<item_in_session>`
1. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = `<user_id>`, sessionid = `<session_id>`
1. Give me every user name (first and last) in my music app history who listened to the song `<name_of_song>`

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

Here the table `songs_per_session_and_item` will be created. This tables helps the analytics team find a song when they have knowledge of the session and the item in session that will enable them to understand what songs in the music library are being heard the most.

It has the following structure:

|       Column      |   Type  |
|:-----------------:|:-------:|
|      `artist`     |  `text` |
|      `title`      |  `text` |
|      `length`     | `float` |
|    `session_id`   |  `int`  |
| `item_in_session` |  `int`  |

`PRIMARY KEY` is `(session_id, item_in_session)`. This primary key was necessary to uniquely identify each row in the csv file, and also to allow finding easily a given session id.

The expected result is the following:

|   artist  |               song              |   length   |
|:---------:|:-------------------------------:|:----------:|
| Faithless | Music Matters (Mark Knight Dub) | 495.307312 |

In [8]:
# creating table for query 1
query_1_table_create = """
    CREATE TABLE IF NOT EXISTS songs_per_session_and_item (
        artist text,
        title text,
        length float,
        session_id int,
        item_in_session int,
        PRIMARY KEY (session_id, item_in_session)
    );
"""

try:
    session.execute(query_1_table_create)
    print("Table 1 created successfully...")
except Exception as e:
    print(f"Exception encountered while creating table 1: {e}")

Table 1 created successfully...


In [9]:
# opening csv
file = 'event_data_new.csv'
query_1_table_insert = """
    INSERT INTO songs_per_session_and_item (artist, title, length, session_id, item_in_session)
    VALUES (%s, %s, %s, %s, %s);
"""

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    # skip header
    next(csvreader)
    for line in csvreader:
        # database insertion
        session.execute(query_1_table_insert, (line[0], line[9], float(line[5]), int(line[8]), int(line[3])))

#### SELECT to verify that the data have been inserted into each table

In [10]:
select_query_1 = """
    SELECT artist, title, length FROM songs_per_session_and_item 
    WHERE session_id = 338 AND item_in_session = 4;
"""
try:
    rows = session.execute(select_query_1)
    q1_df = pd.DataFrame(list(rows))
    display(q1_df)
except Exception as e:
    print(f"Exception encountered while querying table 1: {e}")

Unnamed: 0,artist,title,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

Here the table `songs_per_user_and_session` will be created. This tables helps the analytics team find the name of the artist, song and user when they have knowledge of the user id and the session id. The results are returned in ascending order of the item in session column. These results will enable the analytics team to make better recommendations on what song to listen to next to a specific user. 

It has the following structure:

| Column            | Type   |
|:-----------------:|:------:|
| `artist`          | `text` |
| `title`           | `text` |
| `first_name`      | `text` |
| `last_name`       | `text` |
| `user_id`         | `int`  |
| `session_id`      | `int`  |
| `item_in_session` | `int`  |

`PRIMARY KEY` is `((user_id, session_id), item_in_session)`. The composite partition key i.e. `(user_id, session_id)` was designed this way to avoid having sessions from the same user that might reside in different nodes, which will cause a performance issue when the database is very large. Additionally, `item_in_session` was used as a clustetiring key to be able to return the results in ascending order of the item in session column. 

The expected result is the following:

|       artist      |                       title                       | first_name | last_name |
|:-----------------:|:-------------------------------------------------:|:----------:|:---------:|
| Down To The Bone  | Keep On Keepin' On                                | Sylvie     | Cruz      |
| Three Drives      | Greece 2000                                       | Sylvie     | Cruz      |
| Sebastien Tellier | Kilometer                                         | Sylvie     | Cruz      |
| Lonnie Gordon     | Catch You Baby (Steve Pitron & Max Sanna Radio... | Sylvie     | Cruz      |

In [11]:
# creating table for query 2
query_2_table_create = """
    CREATE TABLE IF NOT EXISTS songs_per_user_and_session (
        artist text,
        title text,
        first_name text,
        last_name text,
        user_id int,
        session_id int,
        item_in_session int,
        PRIMARY KEY ((user_id, session_id), item_in_session)
    );
"""

try:
    session.execute(query_2_table_create)
    print("Table 2 created successfully...")
except Exception as e:
    print(f"Exception encountered while creating table 2: {e}")

Table 2 created successfully...


In [12]:
# opening csv
file = 'event_data_new.csv'
query_2_table_insert = """
    INSERT INTO songs_per_user_and_session (artist, title, first_name, last_name, user_id, session_id, item_in_session)
    VALUES (%s, %s, %s, %s, %s, %s, %s);
"""

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    # skip header
    next(csvreader)
    for line in csvreader:
        # database insertion
        session.execute(query_2_table_insert, (line[0], line[9], line[1], line[4], int(line[10]), int(line[8]), int(line[3])))

#### SELECT to verify that the data have been inserted into each table

In [13]:
select_query_2 = """
    SELECT artist, title, first_name, last_name FROM songs_per_user_and_session 
    WHERE user_id = 10 AND session_id = 182;
"""
try:
    rows = session.execute(select_query_2)
    q2_df = pd.DataFrame(list(rows))
    display(q2_df)
except Exception as e:
    print(f"Exception encountered while querying table 3: {e}")

Unnamed: 0,artist,title,first_name,last_name
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

Here the table `users_per_song` will be created. This tables helps the analytics team find the first and last name of the users that have listened to a specific song. 

It has the following structure:

| Column            | Type   |
|:-----------------:|:------:|
| `first_name`      | `text` |
| `last_name`       | `text` |
| `user_id`         | `int`  |
| `title`           | `text` |

`PRIMARY KEY` is `(title, user_id)`. This PRIMARY KEY might not result in a unique record for every log in the csv files, but given the nature of the query: to obtain a user first and last name if they have listened to a song, we do not need a unique record per csv line. What we need is only to remember every person that has listened a given song at least once. This is taken care by this table design.

The expected result is the following:

| first_name | last_name |
|:----------:|:---------:|
| Jacqueline |     Lynch |
|      Tegan |    Levine |
|       Sara |   Johnson |

In [14]:
# creating table for query 3
query_3_table_create = """
    CREATE TABLE IF NOT EXISTS users_per_song (
        first_name text,
        last_name text,
        user_id int,
        title text,
        PRIMARY KEY (title, user_id)
    );
"""

try:
    session.execute(query_3_table_create)
    print("Table 3 created successfully...")
except Exception as e:
    print(f"Exception encountered while creating table 3: {e}")

Table 3 created successfully...


In [15]:
# opening csv
file = 'event_data_new.csv'
query_3_table_insert = """
    INSERT INTO users_per_song (first_name, last_name, user_id, title)
    VALUES (%s, %s, %s, %s);
"""

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    # skip header
    next(csvreader)
    for line in csvreader:
        # database insertion
        session.execute(query_3_table_insert, (line[1], line[4], int(line[10]), line[9]))

#### SELECT to verify that the data have been inserted into each table

In [16]:
select_query_3 = """
    SELECT first_name, last_name FROM users_per_song 
    WHERE title = 'All Hands Against His Own';
"""
try:
    rows = session.execute(select_query_3)
    q3_df = pd.DataFrame(list(rows))
    display(q3_df)
except Exception as e:
    print(f"Exception encountered while querying table 3: {e}")

Unnamed: 0,first_name,last_name
0,Jacqueline,Lynch
1,Tegan,Levine
2,Sara,Johnson


### Drop the tables before closing out the sessions

In [17]:
# defining drop queries
query_1_table_drop = "DROP TABLE IF EXISTS songs_per_session_and_item;"
query_2_table_drop = "DROP TABLE IF EXISTS songs_per_user_and_session;"
query_3_table_drop = "DROP TABLE IF EXISTS users_per_song;"

In [18]:
try:
    session.execute(query_1_table_drop)
    session.execute(query_2_table_drop)
    session.execute(query_3_table_drop)
except Exception as e:
    print(f"Exception encountered while trying to drop tables: {e}")

### Close the session and cluster connection¶

In [19]:
session.shutdown()
cluster.shutdown()