# PART I. ETL Pipeline for Pre-processing Files

## Import Packages

In [None]:
import os
import glob
import pandas as pd

## Process all CSVs found in the `event_data` folder and combine them to one file.

In [None]:
event_csv_wildcard = os.path.join(".", "event_data", "**", "*.csv")

In [None]:
event_files = glob.glob(event_csv_wildcard, recursive=True)

In [None]:
cols_to_use = [
    'artist','firstName','gender','itemInSession',
    'lastName','length', 'level','location',
    'sessionId','song','userId'
]

In [None]:
events_df = pd.concat(
    pd.read_csv(f, usecols=cols_to_use) for f in event_files
)

In [None]:
events_df = events_df.loc[events_df.isna().sum(axis=1) == 0].copy()

events_df.reset_index(drop=True, inplace=True)

events_df['userId'] = events_df['userId'].astype('int')

events_df.to_csv(
    "./event_data_new.csv",
    index=False
)

In [None]:
# Create New Column
events_df['name'] = events_df[['lastName', 'firstName']].agg(', '.join, axis=1)

# PART II. SETTING UP THE DATABASE

## Create Cluster

In [None]:
from cassandra.cluster import Cluster

try:
    cluster = Cluster()
    session = cluster.connect()
except Exception as e:
    print(e)

## Create & Connect to Keyspace

In [None]:
try:
    session.execute(
        """
            CREATE KEYSPACE IF NOT EXISTS udacity
            WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }
        """
    )
    print("Success: Keyspace created successfully.")
except Exception as e:
    print(e)

In [None]:
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

# PART III. QUERIES

## 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

### QUERY

        SELECT artist, song_title, song_length FROM session_item_library WHERE session_id=338 AND item_session=4

### PRIMARY KEY

        PARTITION KEY: sessionId
        CLUSTERING KEY: item_session

Create table for Session Item database

In [None]:
query1 = "CREATE TABLE session_item_library "
query1 += """(
    artist text,
    song_title text,
    song_length float,
    session_id int,
    item_session int,
    PRIMARY KEY (session_id, item_session))
"""

try:
    session.execute(query1)
    print("Success: Table created successfully.")
except Exception as e:
    print(e)

In [None]:
session_item_df = events_df[
    ['artist', 'song', 'length', 'sessionId', 'itemInSession']
].copy()

Insert data into database

In [None]:
session_item_insert = """
    INSERT INTO session_item_library (artist, song_title, song_length, session_id, item_session) VALUES (%s, %s, %s, %s, %s)
"""

for _, dat in session_item_df.iterrows():
    session.execute(session_item_insert, tuple(dat.tolist()))

Test that the database creation & data insertion worked

expected output:

        Faithless Music Matters (Mark Knight Dub) 495.30731201171875

In [None]:
session_item_select = "SELECT artist, song_title, song_length FROM session_item_library WHERE session_id=338 AND item_session=4"

try:
    rows = session.execute(session_item_select)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.artist, row.song_title, row.song_length)

## 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

### QUERY

        SELECT artist, song_title, name, item_session FROM user_session_library WHERE user_id=10 AND session_id=182

### PRIMARY KEY

        PARTITION KEY: user_id
        CLUSTERING KEY: session_id & item_session

Create table for User Session database

In [None]:
query2 = "CREATE TABLE user_session_library "
query2 += """(
    artist text,
    song_title text,
    item_session int,
    user_id int,
    name text,
    session_id int,
    PRIMARY KEY (user_id, session_id, item_session))
"""

try:
    session.execute(query2)
except Exception as e:
    print(e)

In [None]:
user_session_df = events_df[
    ['artist', 'song', 'itemInSession', 'userId', 'name', 'sessionId']
].copy()

Insert data into database

In [None]:
user_session_insert = """
    INSERT INTO user_session_library (
        artist, song_title, item_session,
        user_id, name, session_id
    ) VALUES (%s, %s, %s, %s, %s, %s)
"""

for _, dat in user_session_df.iterrows():
#     print(_)
    session.execute(user_session_insert, tuple(dat.tolist()))

Test that the database creation & data insertion worked

expected output:

        Down To The Bone Keep On Keepin' On Cruz, Sylvie 0
        Three Drives Greece 2000 Cruz, Sylvie 1
        Sebastien Tellier Kilometer Cruz, Sylvie 2
        Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Cruz, Sylvie 3

*Note: ordered by item_session*

In [None]:
user_session_select = "SELECT artist, song_title, name, item_session FROM user_session_library WHERE user_id=10 AND session_id=182"

try:
    rows = session.execute(user_session_select)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.artist, row.song_title, row.name, row.item_session)

## 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

### QUERY

        SELECT name FROM user_song_library WHERE song_title='All Hands Against His Own'

### PRIMARY KEY

        PARTITION KEY: song_title
        CLUSTERING KEY: user_id

Create table for User Song database

In [None]:
query3 = "CREATE TABLE user_song_library "
query3 += """(
    song_title text,
    user_id int,
    name text,
    PRIMARY KEY (song_title, user_id))
"""

try:
    session.execute(query3)
except Exception as e:
    print(e)

In [None]:
user_song_df = events_df[
    ['song', 'userId', 'name']
].copy()

Insert data into database

In [None]:
user_song_insert = """
    INSERT INTO user_song_library (
        song_title, user_id, name
    ) VALUES (%s, %s, %s)
"""

for _, dat in user_song_df.iterrows():
#     print(_)
    session.execute(user_song_insert, tuple(dat.tolist()))

Test that the database creation & data insertion worked

expected output:

        Lynch, Jacqueline
        Levine, Tegan
        Johnson, Sara

*Note: ordered by item_session*

In [None]:
user_song_select = "SELECT name FROM user_song_library WHERE song_title='All Hands Against His Own'"

try:
    rows = session.execute(user_song_select)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.name)

## DROP TABLES

In [None]:
tables_to_drop = ['session_item_library', 'user_session_library', 'user_song_library']

for table in tables_to_drop:
    try:
        session.execute(f"DROP TABLE {table}")
    except Exception as e:
        print(e)

## CLOSE SESSION

In [None]:
session.shutdown()
cluster.shutdown()