# Project 1B - Data Modeling with Cassandra

In [1]:
# Import Python packages 

import csv
import glob
import json
import re
import os

import cassandra
import numpy as np
import pandas as pd

In [2]:
# Constants

COLLECTED_EVENTS_CSV = "event_data_new.csv"

## 1. ETL Pipeline

### 1.1 Collect file paths from event_data folder

In [3]:
# Get filepath to event_data directory (should be /home/workspace/event_data/ in Project Workspace)
filepath = os.path.join(os.getcwd(), "event_data")

# Retrieve event log files from event_data directory
file_path_list = []
for root, _, _ in os.walk(filepath):
    # Since there is only one directory of logs, this loop should run only once
    file_path_list.extend(glob.glob(os.path.join(root, "*.csv")))
    
# Should be 30 files in the event_data directory
len(file_path_list)

30

### 1.2 Combine event data from all logs into one file

In [4]:
# Store event data for all logs
full_data_rows_list = [] 
    
# Process each event log
for f in file_path_list:
    # Read csv file and collect all events
    with open(f, "r", encoding = "utf8", newline="") as csvfile:  
        csvreader = csv.reader(csvfile) 
        next(csvreader)  # Skip header
        full_data_rows_list.extend([line for line in csvreader])

csv.register_dialect("myDialect", quoting=csv.QUOTE_ALL, skipinitialspace=True)

# Export collected events into a single CSV file
with open(COLLECTED_EVENTS_CSV, "w", encoding="utf8", newline="") as f:
    writer = csv.writer(f, dialect="myDialect")
    writer.writerow([
        "artist",
        "firstName",
        "gender",
        "itemInSession",
        "lastName",
        "length",
        "level",
        "location",
        "sessionId",
        "song",
        "userId"
    ])
    for row in full_data_rows_list:
        if (row[0] == ""):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

In [5]:
# Should be 6,821 rows in the combined event data CSV
with open(COLLECTED_EVENTS_CSV, "r", encoding="utf8") as f:
    print(sum(1 for line in f))

6821


## 2. Apache Cassandra data modeling

### 2.1 CSV Input

The image below shows the columns and sample of the data from the CSV file we'll be working with.
<img src="images/image_event_datafile_new.jpg">

### 2.2 Initialize Cassandra connection

- Create connection to a Cassandra session
- Create and set keyspace

In [6]:
from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, we need a session
session = cluster.connect()

# Create keyspace
try:
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS sparkify 
        WITH REPLICATION = 
        { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
    """)
except Exception as e:
    print(e)

# Set keyspace
try:
    session.set_keyspace("sparkify")
except Exception as e:
    print(e)    

### 2.3 Question 1

Give me the artist, song title and song's length in the music app history that was heard during 
sessionId = 338, and itemInSession = 4

#### 2.3.1 Table

In order to answer this query, we'll need to set a partition key of `session_id` and add `item_in_session` as a clustering column which will uniquely distinguish each row.

We will also include columns for `artist`, `song`, and `length`, which are required for answering the query in question.

In [7]:
# Drop the song_history__by_session table in case we need to rerun some cells
drop_query = "DROP TABLE IF EXISTS song_history__by_session;"
try:
    session.execute(drop_query)
except Exception as e:
    print(e) 

# Create the song_history__by_session
create_query = """
CREATE TABLE IF NOT EXISTS song_history__by_session (
    session_id int, 
    item_in_session int, 
    artist text, 
    song text, 
    length double, 
    PRIMARY KEY (session_id, item_in_session)
)
"""
try:
    session.execute(create_query)
except Exception as e:
    print(e) 

#### 2.3.2 Load data

In [8]:
insert_stmt = session.prepare("""
INSERT INTO song_history__by_session 
(session_id, item_in_session, artist, song, length)
VALUES (?, ?, ?, ?, ?)
""")

with open(COLLECTED_EVENTS_CSV, encoding="utf8") as f:
    csvreader = csv.DictReader(f)
    for line in csvreader:
        data = (
            int(line["sessionId"]), 
            int(line["itemInSession"]), 
            line["artist"], 
            line["song"], 
            float(line["length"]),
        )
        session.execute(insert_stmt, data)

#### 2.3.3 Answer to question 1

In [9]:
query = "SELECT * FROM song_history__by_session WHERE session_id = %s AND item_in_session = %s"
try:
    rows = session.execute(query, (338, 4))
except Exception as e:
    print(e)
    
for row in rows:
    print(row.artist, row.song, row.length)

Faithless Music Matters (Mark Knight Dub) 495.3073


### 2.4 Question 2

Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

#### 2.4.1 Table

In order to answer this query, we'll need to set a partition key of `user_id`. We'll also need clustering columns for `session_id` and `item_in_session` to distinguish unique rows.

The results in the primary key will be sorted in ascending order, so this meets the requirement of sorting by song via `item_in_session`.

We will also include columns for `first_name`, `last_name`, `artist`, and `song` which are required for answering the query in question.

In [10]:
# Drop the song_history__by_user_session table in case we need to rerun some cells
drop_query = "DROP TABLE IF EXISTS song_history__by_user_session;"
try:
    session.execute(drop_query)
except Exception as e:
    print(e) 

# Create the song_history__by_user_session
create_query = """
CREATE TABLE IF NOT EXISTS song_history__by_user_session (
    user_id int,
    session_id int, 
    item_in_session int, 
    first_name text,
    last_name text,    
    artist text, 
    song text,
    PRIMARY KEY (user_id, session_id, item_in_session)
)
"""  
try:
    session.execute(create_query)
except Exception as e:
    print(e) 

#### 2.4.2 Load data

In [11]:
insert_stmt = session.prepare("""
INSERT INTO song_history__by_user_session 
(user_id, session_id, item_in_session, first_name, last_name, artist, song)
VALUES (?, ?, ?, ?, ?, ?, ?)
""")

with open(COLLECTED_EVENTS_CSV, encoding="utf8") as f:
    csvreader = csv.DictReader(f)
    for line in csvreader:
        data = (
            int(line["userId"]),             
            int(line["sessionId"]), 
            int(line["itemInSession"]), 
            line["firstName"],             
            line["lastName"],                         
            line["artist"], 
            line["song"], 
        )
        session.execute(insert_stmt, data)

#### 2.4.3 Answer to question 2

In [12]:
query = """
SELECT artist, song, first_name, last_name 
FROM song_history__by_user_session 
WHERE user_id = %s AND session_id = %s
"""

try:
    rows = session.execute(query, (10, 182))
except Exception as e:
    print(e)
    
for row in rows:
    print(row.artist, row.song, row.first_name, row.last_name)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


### 2.5 Question 3

Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### 2.5.1 Table

In order to answer this query, we'll need to set a partition key by `song` and add a clustering column by `user_id`.

It makes sense to partition by song since we're filtering data by a song.

Without the clustering key on `user_id`, the table will only have one row. This is because song would not be able to ensure uniqueness. Using `first_name`/`last_name` as clustering columns wouldn't work since it's possible that multiple user may have the same first and last name.

We do not need `session_id` or `item_in_session` since we care about unique users and not how many times a user listened to a song.

In [13]:
# Drop the song_history__by_song_user table in case we need to rerun some cells
drop_query = "DROP TABLE IF EXISTS song_history__by_song_user;"
try:
    session.execute(drop_query)
except Exception as e:
    print(e) 

# Create the song_history__by_song_user
create_query = """
CREATE TABLE IF NOT EXISTS song_history__by_song_user (
    song text,
    user_id int,
    first_name text,
    last_name text,    
    PRIMARY KEY (song, user_id)
)
"""   
try:
    session.execute(create_query)
except Exception as e:
    print(e) 

#### 2.5.2 Load data

In [14]:
insert_stmt = session.prepare("""
INSERT INTO song_history__by_song_user (song, user_id, first_name, last_name)
VALUES (?, ?, ?, ?)
""")

with open(COLLECTED_EVENTS_CSV, encoding="utf8") as f:
    csvreader = csv.DictReader(f)
    for line in csvreader:
        data = (
            line["song"],            
            int(line["userId"]),             
            line["firstName"],             
            line["lastName"],                         
        )
        session.execute(insert_stmt, data)

#### 2.5.3 Answer to question 3

In [15]:
query = "SELECT first_name, last_name FROM song_history__by_song_user WHERE song = %s"
try:
    rows = session.execute(query, ("All Hands Against His Own",))
except Exception as e:
    print(e)
    
for row in rows:
    print(row.first_name, row.last_name) 

Jacqueline Lynch
Tegan Levine
Sara Johnson


## 3. Clean up

In [16]:
try:
    session.execute("DROP TABLE IF EXISTS song_history__by_session;")
except Exception as e:
    print(e)
    
try:
    session.execute("DROP TABLE IF EXISTS song_history__by_user_session;")
except Exception as e:
    print(e) 
    
try:
    session.execute("DROP TABLE IF EXISTS song_history__by_song_user;")
except Exception as e:
    print(e)     

In [17]:
session.shutdown()
cluster.shutdown()