# Project: Data Modeling with Apache Cassandra

## Part 1. ETL pipeline for preprocessing the files

### Import libraries

In [1]:
import cassandra
from cassandra.cluster import Cluster
import os
from glob import glob
import csv
from prettytable import PrettyTable

### Create list of filepaths to process original event csv data files

In [8]:
# Get current folder and subfolder event data
path = os.getcwd() + "/event_data"

# Create a list of files and collect each filepath
for root, dirs, files in os.walk(path):
    files_path_list = glob(os.path.join(root, "*"))

### Process the files to create a unique data file csv to be used for Apache Cassandra data modeling

In [13]:
# Initiate an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
for file_path in files_path_list:
    # Read csv file
    with open(file_path, 'r', encoding = 'utf8', newline='') as csvfile: 
        # Create a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)  # ignore the header
        
         # Extract each data row one by one and append it the the list       
        for line in csvreader:
            full_data_rows_list.append(line)
            
print(len(full_data_rows_list))

8056


In [14]:
# Create a smaller event data csv file called event_datafile.csv that will be used to insert data into the
# Apache Cassandra tables
csv.register_dialect("myDialect", quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open("event_datafile.csv", "w", encoding = "utf8", newline="") as f:
    writer = csv.writer(f, dialect="myDialect")
    writer.writerow(["artist_name", "user_first_name", "user_gender", "item_in_session", "user_last_name", \
                     "song_length", "level", "user_location", "session_id", "song_title", "user_id"])
    for row in full_data_rows_list:
        if (row[0] == ""):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

# Check the number of rows in the csv file
with open("event_datafile.csv", "r", encoding = "utf8") as f:
    print(sum(1 for line in f))

6821


________________________________________
_________________________________________

## Part 2. Data modeling and queries

Now we are able to work with the `event_datafile.csv` file that we've just created. This file contains the following columns:
* artist_name
* user_first_name
* user_gender
* item_in_session
* user_last_name
* song_length
* level (paid or free song)
* user_location
* session_id
* song_title
* user_id

The image below is a screenshoot of what the denormalized data look like in the `event_datafile_csv` file after the preprocessing is completed:

![event_datafile](images/event_datafile.png)

### Create a connection to the database

In [2]:
try:
    cluster = Cluster(["127.0.0.1"])
    session = cluster.connect()
except Exception as e:
    print(e)

### Create a keyspace to do the work in and connect to it

In [3]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity_keyspace
    WITH REPLICATION = 
    {'class': 'SimpleStrategy', 'replication_factor': 1}
    """)
    session.set_keyspace("udacity_keyspace")
except Exception as e:
    print(e)

### Music library

In Apache Cassandra you model the database tables on the queries you want to run. So in this case we need to create tables to run the following queries.  
1. **Give me the artist, song title and song's length in the music app history that was heard during  session_id = 338, and item_in_session  = 4.**
2. **Give me only the following: name of artist, song (sorted by item_in_session) and user (first and last name) for user_id = 10, session_id = 182.**
3. **Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'.**

### Query 1
**Give me the artist, song title and song's length in the music app history that was heard during session_id = 338, and item_in_session = 4.**

To answer this question, let's create a table called `session_library` with the following information:
* session_id
* item_in_session
* artist_name
* song_title
* song_length

Since we have to filter by session_id and item_in_session, we'll use session_id as the PARTITION KEY and item_in_session as a CLUSTERING COLUMN, so both together form the PRIMARY KEY. In this case, we could have also used them both as a composite PARTITION KEY.

In [4]:
create_session_library = """
CREATE TABLE IF NOT EXISTS session_library (
    session_id INT,
    item_in_session INT,
    artist_name TEXT,
    song_title TEXT,
    song_length DECIMAL,
    PRIMARY KEY (session_id, item_in_session)
);
"""

try:
    session.execute(create_session_library)
except Exception as e:
    print(e)

In [5]:
insert_session_library = """
INSERT INTO session_library (session_id, item_in_session, artist_name, song_title, song_length)
VALUES (%s, %s, %s, %s, %s);
"""

file = 'event_datafile.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        session.execute(insert_session_library, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

Let's verify that the data have been correctly inserted into our table by using the query from the statement.

In [6]:
query = """
SELECT artist_name,
    song_title,
    song_length
FROM session_library
WHERE session_id = 338
AND item_in_session = 4;
"""

try:
    rows = session.execute(query)
except Exception as e:
    print(e)

# df = pd.DataFrame(columns=["artist_name", "song_title", "song_length"])
table = PrettyTable()
table.field_names = ["Artist", "Song title", "Song length"]
    
for row in rows:
    # df.loc[len(df.index)] = [row.artist_name, row.song_title, row.song_length]
    table.add_row([row.artist_name, row.song_title, row.song_length])

# df
print(table)

+-----------+---------------------------------+-------------+
|   Artist  |            Song title           | Song length |
+-----------+---------------------------------+-------------+
| Faithless | Music Matters (Mark Knight Dub) |   495.3073  |
+-----------+---------------------------------+-------------+


### Query 2
**Give me only the following: name of artist, song (sorted by item_in_session) and user (first and last name) for user_id = 10, session_id = 182.**

To answer this question, let's create a table called `user_library` with the following information:
* user_id
* session_id
* item_in_session
* user_first_name
* user_last_name
* artist_name
* song_title

Since we have to filter by user_id and session_id, and we have to sort by item_in_session, we'll use both user_id and session_id as the PARTITION KEY, whereas item_in_session will be our CLUSTERING COLUMN, so all together form the PRIMARY KEY.

In [7]:
create_user_library = """
CREATE TABLE IF NOT EXISTS user_library (
    user_id INT,
    session_id INT,
    item_in_session INT,
    user_first_name TEXT,
    user_last_name TEXT,
    artist_name TEXT,
    song_title TEXT,
    PRIMARY KEY ((user_id, session_id), item_in_session)
);
"""

try:
    session.execute(create_user_library)
except Exception as e:
    print(e)

In [8]:
insert_user_library = """
INSERT INTO user_library (user_id, session_id, item_in_session, user_first_name, 
    user_last_name, artist_name, song_title)
VALUES (%s, %s, %s, %s, %s, %s, %s);
"""

file = 'event_datafile.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        session.execute(insert_user_library, (int(line[10]), int(line[8]), int(line[3]), line[1], line[4], 
                                              line[0], line[9]))

Query from the statement:

In [9]:
query = """
SELECT artist_name,
    song_title,
    user_first_name,
    user_last_name
FROM user_library
WHERE user_id = 10
AND session_id = 182
ORDER BY item_in_session ASC;
"""

try:
    rows = session.execute(query)
except Exception as e:
    print(e)

table = PrettyTable()
table.field_names = ["Artist", "Song title", "User first name", "User last name"]

for row in rows:
    table.add_row([row.artist_name, row.song_title, row.user_first_name, row.user_last_name])
    
print(table)

+-------------------+------------------------------------------------------+-----------------+----------------+
|       Artist      |                      Song title                      | User first name | User last name |
+-------------------+------------------------------------------------------+-----------------+----------------+
|  Down To The Bone |                  Keep On Keepin' On                  |      Sylvie     |      Cruz      |
|    Three Drives   |                     Greece 2000                      |      Sylvie     |      Cruz      |
| Sebastien Tellier |                      Kilometer                       |      Sylvie     |      Cruz      |
|   Lonnie Gordon   | Catch You Baby (Steve Pitron & Max Sanna Radio Edit) |      Sylvie     |      Cruz      |
+-------------------+------------------------------------------------------+-----------------+----------------+


### Query 3
**Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'.**

To answer this question, let's create a table called `song_history_library` with the following information:
* song_title
* user_id
* user_first_name
* user_last_name
* session_id

In this case, in order to have different rows for different users, we'll use song_title as our PARTITION KEY, and user_id as a CLUSTERING COLUMN, so both together form the PRIMARY KEY.

In [10]:
create_song_history_library = """
CREATE TABLE IF NOT EXISTS song_history_library (
    song_title TEXT,
    user_id INT,
    user_first_name TEXT,
    user_last_name TEXT,
    session_id INT,
    PRIMARY KEY (song_title, user_id)
);
"""

try:
    session.execute(create_song_history_library)
except Exception as e:
    print(e)

In [11]:
insert_song_history_library = """
INSERT INTO song_history_library (song_title, user_id, user_first_name, user_last_name, session_id)
VALUES (%s, %s, %s, %s, %s);
"""

file = 'event_datafile.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        session.execute(insert_song_history_library, (line[9], int(line[10]), line[1], line[4], int(line[8])))

Query from the statement:

In [12]:
query = """
SELECT user_first_name,
    user_last_name
FROM song_history_library
WHERE song_title = 'All Hands Against His Own'
"""

try:
    rows = session.execute(query)
except Exception as e:
    print(e)

table = PrettyTable()
table.field_names = ["User first name", "User last name"]

for row in rows:
    table.add_row([row.user_first_name, row.user_last_name])
    
print(table)

+-----------------+----------------+
| User first name | User last name |
+-----------------+----------------+
|    Jacqueline   |     Lynch      |
|      Tegan      |     Levine     |
|       Sara      |    Johnson     |
+-----------------+----------------+


### Drop the tables

In [13]:
try:
    session.execute("DROP TABLE session_library;")
    session.execute("DROP TABLE user_library;")
    session.execute("DROP TABLE song_history_library;")
except Exception as e:
    print(e)

### Close session and cluster connection

In [14]:
session.shutdown()
cluster.shutdown()