In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

In [2]:
filepath = os.getcwd() + '/event_data'

In [3]:
for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))
#     print(file_path_list)

Create single csv data file from event data to populate Apache Casssandra tables.

In [4]:
full_data_rows_list = [] 
    
for f in file_path_list:

    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        for line in csvreader:
            full_data_rows_list.append(line) 
            

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

In [5]:
#checking the number of rows of the created csv file

with open('event_datafile_new.csv', 'r', encoding='utf8') as f:
    print(sum(1 for line in f))


6821


In [6]:
from cassandra.cluster import Cluster
cluster = Cluster()

session = cluster.connect()

In [7]:
#Create Keyspace¶
session.execute("""
create keyspace if not exists musicomp with replication = {'class': 'SimpleStrategy', 'replication_factor':1}
""")

<cassandra.cluster.ResultSet at 0x7f9f8e608c50>

In [8]:
#set the keyspace

session.set_keyspace('musicomp')

Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.
Create queries to ask the following three questions of the data
1. Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4
2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [9]:
# Query 1: 
session.execute("""
CREATE TABLE IF NOT EXISTS session_songs(sessionId int, itemInSession int, artist text, song_title text, song_length float, PRIMARY KEY(sessionId, itemInSession))
""")

<cassandra.cluster.ResultSet at 0x7f9f92a79390>

In [10]:
file = 'event_datafile_new.csv'

with open(file, encoding='utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO session_songs (sessionId, itemInSession, artist, song_title, song_length)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        artist_name, user_name, gender, itemInSession, user_last_name, length, level, location, sessionId, song, userId = line
        session.execute(query, (int(sessionId), int(itemInSession), artist_name, song, float(length)))

In [11]:
rows = session.execute("""SELECT artist, song_title, song_length FROM session_songs WHERE sessionId = 338 AND itemInSession = 4""")

for row in rows:
    print(row.artist, row.song_title, row.song_length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


In [12]:
# Query 2: 
session.execute("""      

        create table if not exists event_log
            
            (artist text, 
            song text, 
            first_name text, 
            last_name text,
            user_id int,
            session_id int,
            item_in_session int, 
            primary key ((user_id, session_id), item_in_session))
""")

<cassandra.cluster.ResultSet at 0x7f9f91fb28d0>

In [13]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
        query = "INSERT INTO event_log (artist, song, first_name, last_name, user_id, session_id, item_in_session)"
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (line[0], line[9], line[1], line[4], int(line[10]), int(line[8]), int(line[3])))

In [14]:
# Query 2:
rows = session.execute("""
                            select artist , song , first_name , last_name
                            from event_log
                            where user_id = 10 and session_id = 182
                            order by item_in_session 

""")
for row in rows:
    print (row.artist , row.song , row.first_name , row.last_name)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


In [15]:
# Query 3: 
session.execute("""
                    create table if not exists song_users (
                    song text, user_id int, first_name text, last_name text, 
                    primary key (song, user_id))
""")

<cassandra.cluster.ResultSet at 0x7f9f8aa77b50>

In [16]:
file = 'event_datafile_new.csv'

df = pd.read_csv(file, usecols=[1, 4, 9, 10])
df.drop_duplicates(inplace=True)

for ix, row in df.iterrows():
    query = "INSERT INTO song_users (song, user_id, first_name, last_name)"
    query = query + " VALUES (%s, %s, %s, %s)"
    session.execute(query, (row['song'], row['userId'], row['firstName'], row['lastName']))

In [17]:
rows = session.execute("""SELECT first_name, last_name FROM song_users WHERE song = 'All Hands Against His Own'""")

for row in rows:
    print( row.first_name, row.last_name )

Jacqueline Lynch
Tegan Levine
Sara Johnson


In [18]:
session.execute("drop table session_songs")
session.execute("drop table event_log")
session.execute("drop table song_users")

<cassandra.cluster.ResultSet at 0x7f9f8aa19bd0>

In [19]:
session.shutdown()
cluster.shutdown()