# Part I: ETL pipeline for processing of the CSV data files

In [None]:
import csv
import glob
import json
import os
import re
from pathlib import Path

import cassandra
import numpy as np
import pandas as pd

The first step of the ETL process is to compile a list of all the raw CSV files inside the `event_data` directory.

In [None]:
dataset_path = Path.cwd() / 'event_data'
for root, _, _ in os.walk(dataset_path):    
    dataset_files = sorted(Path(root).glob('*.csv'))

The next step is to process the above listed files in order to create the single CSV that will later be used to populate the Apache Casssandra tables.

In [None]:
DATA_CSV_FILE = 'event_datafile_new.csv'

full_data_rows_list = []
for f in dataset_files:
    with open(f, 'r', encoding='utf8', newline='') as csv_file: 
        csv_reader = csv.reader(csv_file)         
        next(csv_reader) # skips CSV header line
        for line in csv_reader:
            full_data_rows_list.append(line) 

csv.register_dialect('events', quoting=csv.QUOTE_ALL, skipinitialspace=True)
# here we join all valid rows from the separate CSVs into one single CSV file
with open(DATA_CSV_FILE, 'w', encoding='utf8', newline='') as f:
    writer = csv.writer(f, dialect='events')
    writer.writerow([
        'artist',
        'firstName',
        'gender',
        'itemInSession',
        'lastName',
        'length',
        'level',
        'location',
        'sessionId',
        'song',
        'userId'
    ])
    for row in full_data_rows_list:
        if (row[0] != ''):
            writer.writerow((
                row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


Counting rows from the final CSV

In [None]:
with open(DATA_CSV_FILE, 'r', encoding='utf8') as f:
    print(sum(1 for line in f))

# Part II: Data modelling using Apache Cassandra
At this point, all CSV files from the `event_data` directory were processed and joined into a single CSV data file named `event_datafile_new.csv`. That file contains the following columns (the meaning of each one is detailed in parenthesis):
- `artist` (artist name)
- `firstName` (user first name)
- `gender` (user gender)
- `itemInSession` (session item number)
- `lastName` (user last name)
- `length` (song length)
- `level` (user level, i.e. free or paid plan)
- `location` (user location)
- `sessionId` (session ID)
- `song` (song title)
- `userId` (user ID)

The image below is a screenshot of what the data should look like in after the CSV processing code is executed:

<img src="images/image_event_datafile_new.jpg">

In [None]:
from cassandra.cluster import Cluster

cluster = Cluster()
session = cluster.connect()

In [6]:
create_keyspace_query = """
CREATE KEYSPACE IF NOT EXISTS sparkify
    WITH REPLICATION = { 
        'class': 'SimpleStrategy', 
        'replication_factor': 1 
    };
"""
session.execute(create_keyspace_query)

<cassandra.cluster.ResultSet at 0x7f25fb6b4940>

In [7]:
session.set_keyspace('sparkify')


## Data model and queries
In the context of Sparkify's songplay events database, here are some of the questions that must be answered by our data model:

1. Which were the artist name, song title and song length heard during the session with ID 338 and session item number 4?
2. What is the full name of the user, the artist and song (sorted by session item number) listened by the user with ID 10 during the session with ID 182?
3. What is the full name of all the users who listened to the song named 'All Hands Against His Own'?

All tables were designed with the above questions in mind. The code that creates those tables in Apache Cassandra can be seen in the following cells.


In [8]:
create_table_songplays_by_session = """
CREATE TABLE IF NOT EXISTS songplay_events_by_session (
    session_id INT,
    session_item INT,
    artist_name TEXT,
    song_title TEXT,
    song_length FLOAT,
    user_id INT,
    user_first_name TEXT,
    user_last_name TEXT,
    user_gender TEXT,
    user_location TEXT,
    user_plan TEXT,
    PRIMARY KEY ((session_id), session_item, user_id)
);
"""
session.execute(create_table_songplays_by_session)

create_table_songplays_by_user = """
CREATE TABLE IF NOT EXISTS songplay_events_by_user (
    session_id INT,
    session_item INT,
    artist_name TEXT,
    song_title TEXT,
    song_length FLOAT,
    user_id INT,
    user_full_name TEXT,
    user_gender TEXT,
    user_location TEXT,
    user_plan TEXT,
    PRIMARY KEY ((user_id), session_id, session_item)
);
"""
session.execute(create_table_songplays_by_user)

create_table_songplays_by_song = """
CREATE TABLE IF NOT EXISTS songplay_events_by_song (
    session_id INT,
    session_item INT,
    artist_name TEXT,
    song_title TEXT,
    song_length FLOAT,
    user_id INT,
    user_full_name TEXT,
    user_gender TEXT,
    user_location TEXT,
    user_plan TEXT,
    PRIMARY KEY ((song_title), user_id, session_id, session_item)
);
"""
session.execute(create_table_songplays_by_song)

<cassandra.cluster.ResultSet at 0x7f25f0426820>

With the tables created, the data previously compiled in the `event_datafile_new.csv` file can be used to populate them.

In [9]:
insert_query = """
INSERT INTO songplay_events_by_session (
    session_id, 
    session_item, 
    artist_name, 
    song_title, 
    song_length, 
    user_id, 
    user_first_name, 
    user_last_name, 
    user_gender, 
    user_location, 
    user_plan
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
"""
insert_stmt_songplays_by_session = session.prepare(insert_query)

insert_query = """
INSERT INTO {} (
    session_id, 
    session_item, 
    artist_name, 
    song_title, 
    song_length, 
    user_id, 
    user_full_name, 
    user_gender, 
    user_location, 
    user_plan
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
"""
insert_stmt_songplays_by_user = session.prepare(insert_query.format('songplay_events_by_user'))
insert_stmt_songplays_by_song = session.prepare(insert_query.format('songplay_events_by_song'))

with open(DATA_CSV_FILE, encoding='utf8') as f:
    csv_reader = csv.reader(f)
    next(csv_reader) # skips CSV header line
    for line in csv_reader:
        row = [
            int(line[8]),   # session_id
            int(line[3]),   # session_item
            line[0],        # artist_name
            line[9],        # song_title
            float(line[5]), # song_length
            int(line[10]),  # user_id 
            line[1],        # user_first_name
            line[4],        # user_last_name
            line[2],        # user_gender
            line[7],        # user_location
            line[6]         # user_plan
        ]
        session.execute(insert_stmt_songplays_by_session, row)
        # tables 02 and 03 have a slightly different schema
        row = [
            int(line[8]),                                     # session_id
            int(line[3]),                                     # session_item
            line[0],                                          # artist_name
            line[9],                                          # song_title
            float(line[5]),                                   # song_length
            int(line[10]),                                    # user_id
            '{} {}'.format(line[1].strip(), line[4].strip()), # user_full_name
            line[2],                                          # user_gender
            line[7],                                          # user_location
            line[6]                                           # user_plan
        ]
        session.execute(insert_stmt_songplays_by_user, row)
        session.execute(insert_stmt_songplays_by_song, row)

The following cells translate the questions to be answered by the data into Cassandra CQL queries. The numbering on the variables maps to the number given to the example questions above (i.e. `query_01` below relates to the question 1 defined and so on).

In [10]:
# 1. Which were the artist name, song title and song length heard during 
#    the session with ID 338 and session item number 4?
query_01 = """
SELECT artist_name, song_title, song_length
FROM songplay_events_by_session
WHERE session_id = ? AND session_item = ?;
"""
query_01_stmt = session.prepare(query_01)
result = session.execute(query_01_stmt, [338, 4])
row = result.one()
print('artist: {}\nsong: {}\nlength: {:.2f}\n'.format(row.artist_name, row.song_title, row.song_length))


artist: Faithless
song: Music Matters (Mark Knight Dub)
length: 495.31



In [11]:
# 2. What is the full name of the user, the artist and song (sorted by session item number) 
#    listened by the user with ID 10 during the session with ID 182?
query_02 = """
SELECT artist_name, song_title, user_full_name
FROM songplay_events_by_user
WHERE user_id = ? AND session_id = ?;
"""
query_02_stmt = session.prepare(query_02)
result = session.execute(query_02_stmt, [10, 182])
rows = result.all()
for row in rows:
    print('artist: {}\nsong: {}\nuser: {}\n'.format(row.artist_name, row.song_title, row.user_full_name))      

artist: Down To The Bone
song: Keep On Keepin' On
user: Sylvie Cruz

artist: Three Drives
song: Greece 2000
user: Sylvie Cruz

artist: Sebastien Tellier
song: Kilometer
user: Sylvie Cruz

artist: Lonnie Gordon
song: Catch You Baby (Steve Pitron & Max Sanna Radio Edit)
user: Sylvie Cruz



In [12]:
# 3. What is the full name of all the users who listened to the song named 'All Hands Against His Own'?
query_03 = """
SELECT user_full_name 
FROM songplay_events_by_song
WHERE song_title = ?;
"""
query_03_stmt = session.prepare(query_03)
result = session.execute(query_03_stmt, ['All Hands Against His Own'])
rows = result.all()
for row in rows:
    print('{}\n'.format(row.user_full_name)) 

Jacqueline Lynch

Tegan Levine

Sara Johnson



The code below is an alternative implementation of the `query_03` above which uses the concept of materialized views that is available in Apache Cassandra.

In [13]:
create_view = """
CREATE MATERIALIZED VIEW IF NOT EXISTS songplay_events_by_song_mv AS 
    SELECT song_title, artist_name, user_id, user_full_name, session_id, session_item 
    FROM songplay_events_by_user 
    WHERE song_title IS NOT NULL AND session_id IS NOT NULL AND session_item IS NOT NULL AND user_id IS NOT NULL
    PRIMARY KEY ((song_title), user_id, session_id, session_item);
"""
session.execute(create_view)                

<cassandra.cluster.ResultSet at 0x7f25f145a910>

In [14]:
# 3. What is the full name of all the users who listened to the song named 'All Hands Against His Own'?
query_03_alt = """
SELECT user_full_name 
FROM songplay_events_by_song_mv
WHERE song_title = ?;
"""
query_03_stmt = session.prepare(query_03_alt)
result = session.execute(query_03_stmt, ['All Hands Against His Own'])
rows = result.all()
for row in rows:
    print('{}\n'.format(row.user_full_name)) 

Jacqueline Lynch

Tegan Levine

Sara Johnson



After the ETL process and database querying is complete, the tables can be dropped from Cassandra.

In [15]:
drop_stmt = [
    'DROP TABLE IF EXISTS songplay_events_by_session;',    
    'DROP TABLE IF EXISTS songplay_events_by_song;',
    'DROP MATERIALIZED VIEW IF EXISTS songplay_events_by_song_mv;',
    'DROP TABLE IF EXISTS songplay_events_by_user;',
    'DROP KEYSPACE IF EXISTS sparkify;'
]
for stmt in drop_stmt:
    session.execute(stmt)

In [16]:
session.shutdown()
cluster.shutdown()