# Part I. ETL Pipeline for Pre-Processing the Files

## Dependencies

In [1]:
import re
import json
import numpy as np
import pandas as pd
from glob import glob

from cassandra.cluster import Cluster

## Creating list of filepaths to process original event csv data files

In [2]:
file_path_list = sorted(glob('event_data/*.csv'))

Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
data = pd.concat(
    [
        pd.read_csv(file)
        for file in file_path_list
    ]
).drop(
    columns=['auth']
).dropna(
    subset=['artist']
).astype(
    {
        'userId': int
    }
)

data.to_csv(
    'event_datafile_new.csv',
    index=False
)

Shape of stored data

In [4]:
data.shape

(6820, 16)

# Part II. Apache Cassandra code

The `event_datafile_new.csv` contains the following columns:
- `artist`: artist name
- `firstName`: first Name of user
- `gender`: gender of user
- `itemInSession`: 
- `lastName`: last name of user
- `length`: length of the song
- `level`: paid or free song
- `location`: location of the user
- `sessionId`: session identifier
- `song`: song title
- `userId`: user identifier

Below is what the denormalized data looks:

In [5]:
data.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
2,Des'ree,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540340000000.0,139,You Gotta Be,200,1541110000000.0,8
4,Mr Oizo,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540340000000.0,139,Flat 55,200,1541110000000.0,8
5,Tamba Trio,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540340000000.0,139,Quem Quiser Encontrar O Amor,200,1541110000000.0,8
6,The Mars Volta,Kaylee,F,5,Summers,380.42077,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540340000000.0,139,Eriatarka,200,1541110000000.0,8
7,Infected Mushroom,Kaylee,F,6,Summers,440.2673,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540340000000.0,139,Becoming Insane,200,1541110000000.0,8


## Apache Cassandra

#### Creating a Cluster

In [6]:
cluster = Cluster()
session = cluster.connect()

#### Create Keyspace

In [7]:
session.execute(
    """
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = {
        'class': 'SimpleStrategy',
        'replication_factor': 1
    }
    """
)

<cassandra.cluster.ResultSet at 0x7f8d7c61bf40>

#### Set Keyspace

In [8]:
session.set_keyspace('udacity')

## Queries

The following data wants to be queried

### 1. Artist, song title and song's length in the music app history that was heard during an specific session and item in session

For table `music_heard_in_session`, the columns `sessionId` and `itemInSession` were used as partition keys because the queries will filter by these columns.

In [9]:
session.execute(
    """
    CREATE TABLE IF NOT EXISTS music_heard_in_session
    (
        sessionId int,
        itemInSession int,
        song text,
        artist text,
        length decimal,
        PRIMARY KEY (sessionId, itemInSession)
    )
    """
)

<cassandra.cluster.ResultSet at 0x7f8dac405520>

In [10]:
for _, line in data.iterrows():
    session.execute(
        """
        INSERT INTO music_heard_in_session(sessionId, itemInSession, song, artist, length)
        VALUES (%s, %s, %s, %s, %s)
        """,
        [
            line['sessionId'],
            line['itemInSession'],
            line['song'],
            line['artist'],
            line['length']
        ]
    )

Verify that the data have been inserted into each table

In [11]:
results = session.execute(
    """
    SELECT 
        artist, song, length
    FROM 
        music_heard_in_session 
    WHERE 
        sessionId = 338 AND itemInSession = 4
    """
)

for result in results:
    print(result)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', length=Decimal('495.3073'))


### 2. Name of artist, song (sorted by `itemInSession`) and user (first and last name) for `userId = 10` and `sessionId = 182`

For table `user_music_session`, the columns `userId` and `sessionId` where used as a composite partition key because the queries will filter these columns. `itemInSession` is used as a clustering column sort data

In [12]:
session.execute(
    """
    CREATE TABLE IF NOT EXISTS user_music_session
    (
        userId int,
        sessionId int,
        itemInSession int,
        artist text,
        song text,
        firstName text,
        lastName text,
        PRIMARY KEY ((userId, sessionId), itemInSession)
    )
    """
)

<cassandra.cluster.ResultSet at 0x7f8d7b52b490>

In [13]:
for _, line in data.iterrows():
    session.execute(
        """
        INSERT INTO user_music_session(userId, sessionId, itemInSession, artist, song, firstName, lastName)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        """,
        [
            line['userId'],
            line['sessionId'],
            line['itemInSession'],
            line['artist'],
            line['song'],
            line['firstName'],
            line['lastName']
        ]
    )

Verify that the data have been inserted into each table

In [14]:
results = session.execute(
    """
    SELECT 
        artist, song, firstName, lastName
    FROM 
        user_music_session 
    WHERE 
        userId = 10 AND sessionId = 182
    """
)

for result in results:
    print(result)

Row(artist='Down To The Bone', song="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz')
Row(artist='Three Drives', song='Greece 2000', firstname='Sylvie', lastname='Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', firstname='Sylvie', lastname='Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz')


### 3. Every user name (first and last) in the music app history who listened to the song 'All Hands Against His Own' 

For table `song_listening_activity`, the columns `song` and `userId` where used as a composite partition key because the queries will filter these columns.

In [15]:
session.execute(
    """
    CREATE TABLE IF NOT EXISTS song_listening_activity
    (
        song text, 
        userId int,
        firstName text,
        lastName text,
        PRIMARY KEY (song, userId)
    )
    """
)

<cassandra.cluster.ResultSet at 0x7f8d7a43b820>

In [16]:
for _, line in data.iterrows():
    session.execute(
        """
        INSERT INTO song_listening_activity(song, userId, firstName, lastName)
        VALUES (%s, %s, %s, %s)
        """,
        [
            line['song'],
            line['userId'],
            line['firstName'],
            line['lastName']
        ]
    )

Verify that the data have been inserted into each table

In [17]:
results = session.execute(
    """
    SELECT 
        firstName, lastName
    FROM 
        song_listening_activity 
    WHERE 
        song = 'All Hands Against His Own'
    """
)

for result in results:
    print(result)

Row(firstname='Jacqueline', lastname='Lynch')
Row(firstname='Tegan', lastname='Levine')
Row(firstname='Sara', lastname='Johnson')


### Drop the tables before closing out the sessions

In [18]:
session.execute('DROP TABLE music_heard_in_session')
session.execute('DROP TABLE user_music_session')
session.execute('DROP TABLE song_listening_activity')

<cassandra.cluster.ResultSet at 0x7f8d7c605520>

### Close the session and cluster connection¶

In [19]:
session.shutdown()
cluster.shutdown()