# Part I. ETL Pipeline for Pre-Processing the Files

In [1]:
import cassandra
import csv

from pathlib import Path

#### Creating list of filepaths to process original event csv data files

In [2]:
file_path = Path.cwd() / "event_data"
file_path_list = list(file_path.rglob("*.csv"))

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)
keep_fields = ['artist','firstName','gender','itemInSession','lastName','length', 'level','location','sessionId','song','userId']

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline = '') as fhand_out:
    dict_writer = csv.DictWriter(fhand_out, dialect='myDialect', fieldnames=keep_fields)
    dict_writer.writeheader()
    
    for f in file_path_list:
        with open(f, 'r', encoding = 'utf8', newline = '') as fhand_in:
            dict_reader = csv.DictReader(fhand_in)
            for row in dict_reader:
                if row['artist']:
                    dict_writer.writerow({field: row[field] for field in keep_fields})
            

In [4]:
# check the number of rows in csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Using Apache Cassandra to analyze data. 

## The event_datafile_new.csv contains the following columns:: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

A screenshot of the denormalized data is as shown:<br>

<img src="../images/image_event_datafile_new.jpg">

#### Create a Cluster

In [5]:
from cassandra.cluster import Cluster

cluster = Cluster()
session = cluster.connect()

#### Create Keyspace

In [6]:
try:
    session.execute("""create keyspace if not exists music
        with replication = 
        { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
    """)
except Exception as e:
    print(e)

#### Set Keyspace

In [7]:
try:
    session.set_keyspace('music')
except Exception as e:
    print(e)

## Modelling database table based on the queries we want to run. 

In [8]:
file = 'event_datafile_new.csv'

#### Query 1: Return the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession  = 4

In [9]:
# Query 1:
# We know the where conditions use sessionId and itemInSession. We can use sessionId as the partition key and itemInSession as the
# clustering column. We treat these together as the primary key. 

# create table with a primary key chosen to process the query
query = "create table if not exists song_by_sessionid_itemsession "
query = query + "(sessionId int, itemInSession int, artist text, song text, length float, primary key (sessionId, itemInSession))"

try:
    session.execute(query)
except Exception as e:
    print(e)

#insert data into table
query = "insert into song_by_sessionid_itemsession (sessionId, itemInSession, artist, song, length)"
query = query + " values (%s, %s, %s, %s, %s)"

with open(file, encoding = 'utf8') as fhand:
    dict_reader = csv.DictReader(fhand)
    for row in dict_reader:
        session.execute(query, (int(row['sessionId']), int(row['itemInSession']), row['artist'], row['song'], float(row['length'])))

#### We run the query and verify that the data has been inserted into each table.

In [10]:
## TO-DO: Add in the SELECT statement to verify the data was entered into the table
query = """select artist, song, length
from song_by_sessionid_itemsession 
where sessionId = 338 and itemInSession = 4"""

rows = session.execute(query)

for row in rows:
    print(row.artist, row.song, row.length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


#### Query 2: Return the name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182 

In [11]:
# Query 2: 

# The where conditions use userId and sessionId. We can use userId and sessionId as a composite partition key for better performance.
# We know that song should be sorted by itemInSession and hence, choose it as the next clustering column
# followed by song. The primary key is comprised of userId, sessionId, itemInSession and song.

# create table with a primary key chosen to process the query
query = "create table if not exists artist_by_userid_sessionid "
query = query + """(userId int, sessionId int, itemInSession int, artist text, song text, firstName text, lastName text,
primary key ((userId, sessionId), itemInSession, song))"""

try:
    session.execute(query)
except Exception as e:
    print(e)

# insert data into table
query = "insert into artist_by_userid_sessionid (userId, sessionId, itemInSession, artist, song, firstName, lastName)"
query = query + " values (%s, %s, %s, %s, %s, %s, %s)"

with open(file, encoding = 'utf8') as fhand:
    dict_reader = csv.DictReader(fhand)
    for row in dict_reader:
        session.execute(query, (int(row['userId']), int(row['sessionId']), int(row['itemInSession']), row['artist'], row['song'], row['firstName'], row['lastName']))


#### We run the query and verify that the data has been inserted into each table.

In [12]:
query = "select artist, song, firstName, lastName from artist_by_userid_sessionid where userId = 10 and sessionId = 182"

rows = session.execute(query)

for row in rows:
    print(row.artist, row.song, row.firstname, row.lastname)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


#### Query 3: Return every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [13]:
# Query 3:

# The where condition refers to song and hence, we use it as the partition key. In order to make is unique we need clustering
# column(s). We know that we want to retrieve the first and last names. We can use these as the clustering columns. 

# create table with a primary key chosen to process the query

query = "create table if not exists username_by_song "
query = query + "(song text, userId int, firstName text, lastName text, primary key (song, userId))"

try:
    session.execute(query)
except Exception as e:
    print(e)
    
# insert data into table
query = "insert into username_by_song (song, userId, firstName, lastName)"
query = query + " values (%s, %s, %s, %s)"

with open(file, encoding = 'utf8') as fhand:
    dict_reader = csv.DictReader(fhand)
    for row in dict_reader:
        session.execute(query, (row['song'], int(row['userId']), row['firstName'], row['lastName']))

#### We run the query and verify that the data has been inserted into each table.

In [14]:
query = "select firstname, lastname from username_by_song where song = 'All Hands Against His Own'"

rows = session.execute(query)

for row in rows:
    print(row.firstname, row.lastname)

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Drop all tables

In [15]:
query = "drop table song_by_sessionid_itemsession"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "drop table artist_by_userid_sessionid"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
query = "drop table username_by_song"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

errors={'127.0.0.1:9042': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=127.0.0.1:9042
errors={'127.0.0.1:9042': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=127.0.0.1:9042
errors={'127.0.0.1:9042': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=127.0.0.1:9042


### Close the session and cluster connection¶

In [16]:
session.shutdown()
cluster.shutdown()