In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv
from cassandra.cluster import Cluster

# Data Modeling with Cassandra
The purpose of this project is to create a database in Apache Cassandra for maintaining songs and user activity. This will allow analysts to generate insights from user listening patterns. ETL is performed to load data from user logs into the database to enable easy querying. 

### Input Data
Input data in `/event_data/` is partitioned by date and contains the fields: 
```javascript
{artist,auth,firstName,lastName,gender,itemInSession,length,level,location,method,page,registration,sessionId,song}
```

### Data Queries
The design of the database is optimized to answer the following queries:
1. Given the sessionId and itemInSession, return artists name, song title and song length from the user activity data
2. Given userId and sessionId, return the name of the artist, song title (sorted by itemInSession) and user(firstName and lastName)
3. Given the song title, return each user (first and last name) in the music app history who listened to that song

### Database Design
Unlike regular RDBMS, the data tables in Cassandra have to be denormalized and specifically designed to fit the queries needed since Cassandra does not support cartesian joins

#### Query 1:
    The following fields are required in the table: {session_id, item_in_session, artist, song_title, song_length}. 
    The data can be partitioned by session_id and must have item_in_session as an additional composite key
#### Query 2:
    The following fields are required to model query: {user_id,session_id,item_in_session,artist, song_title, first_name, last_name}.
    The data can be partitioned by the composite key (user_id,session_id) with item_in_session as a clustering key
#### Query 3:
    The following fields are required to model the query: {song_title,user_id,first_name,last_name}.
    Primary key is a composite of (song_title and user_id)

#### Table Creation and Drop Queries

In [2]:
keyspace_name = "music_app_history"

create_queries = {}
drop_queries = {}

create_songs_by_session = """
    create table if not exists  songs_by_session(
        session_id int,item_in_session int,artist text,song_title text,song_length float, 
        primary key ((session_id),item_in_session))"""

create_songs_artist_user = """
    create table if not exists songs_artist_user(
        user_id int,session_id int,item_in_session int,artist text,song_title text,first_name text,last_name text, 
        primary key ((user_id,session_id),item_in_session)) with clustering order by(item_in_session asc)"""

create_user_songs = """
    create table if not exists user_songs(
        song_title text,user_id int,first_name text,last_name text,
        primary key (song_title,user_id))"""

drop_songs_by_session = """
    drop table if exists songs_by_session
"""

drop_songs_artist_user = """
    drop table if exists songs_artist_user
"""

drop_user_songs = """
    drop table if exists user_songs
"""

create_queries["songs_by_session"] = create_songs_by_session
create_queries["songs_artist_user"] = create_songs_artist_user
create_queries["user_songs"] = create_user_songs

drop_queries["songs_by_session"] = drop_songs_by_session
drop_queries["songs_artist_user"] = drop_songs_artist_user
drop_queries["user_songs"] = drop_user_songs

#### Insert Queries

In [3]:
insert_queries = {}

insert_songs_by_session = """
        insert into songs_by_session
            (session_id,item_in_session,artist,song_title,song_length)
                values (%s,%s,%s,%s,%s)"""

insert_songs_artist_user = """
        insert into songs_artist_user
            (user_id,session_id,item_in_session,artist,song_title,first_name,last_name)
                values(%s,%s,%s,%s,%s,%s,%s)"""

insert_user_songs = """
        insert into user_songs
            (song_title,user_id,first_name,last_name)
                values(%s,%s,%s,%s)"""

insert_queries["songs_by_session"] = insert_songs_by_session
insert_queries["songs_artist_user"] = insert_songs_artist_user
insert_queries["user_songs"] = insert_user_songs

#### Select Queries

In [4]:
select_queries = {}

select_songs_by_session = """
        select artist,song_title,song_length from songs_by_session where session_id=%s and item_in_session=%s
        """

select_songs_artist_user = """
        select artist,song_title,first_name,last_name from songs_artist_user where user_id = %s and session_id = %s
        """

select_user_songs = """
        select song_title,first_name,last_name from user_songs where song_title=%s
        """

select_queries["songs_by_session"] = select_songs_by_session
select_queries["songs_artist_user"] = select_songs_artist_user
select_queries["user_songs"] = select_user_songs

#### Loading the data
This section loads the data from the files in the `/event_data` folder into a single csv file **event_datafile_new.csv** and stores it in the pandas.DataFrame **event_df**

In [5]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data/'

file_path_list = []
for root, dirs, files in os.walk(filepath):
    # join the file path and roots with the subdirectories using glob
    if (len(dirs)!=0):
        file_path_list = glob.glob(os.path.join(root,'*'))

# Processing the files to create the data file csv that will be used for Apache Casssandra tables
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
               
        for line in csvreader:
            full_data_rows_list.append(line) 
            
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

events_df = pd.read_csv("event_datafile_new.csv")
print(len(events_df.index))

/home/workspace
6820


### Processing the _events_ data frame

#### Creating a cluster and connecting to the database

In [6]:
def connect(keyspace_name):
    
    cluster = Cluster()
    session = cluster.connect()
    
    create_key_space_q = "CREATE  KEYSPACE IF NOT EXISTS "+ keyspace_name
    create_key_space_q =  create_key_space_q + " WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1 }"
    
    session.execute(create_key_space_q)
    
    session.set_keyspace(keyspace_name)
    
    return session,cluster

#### Creating the database

In [7]:
# Obtain a connection to the database
session,cluster = connect(keyspace_name)

# Clear the database
for k in drop_queries:
    session.execute(drop_queries[k])

# Create the tables
for k in create_queries:
    session.execute(create_queries[k])

#### Populating the database

In [8]:
for i,row in events_df.iterrows():
    session.execute(insert_queries["songs_by_session"],(row.sessionId, row.itemInSession, row.artist, row.song, row.length))
    session.execute(insert_queries["songs_artist_user"],(row.userId, row.sessionId, row.itemInSession, row.artist, row.song, row.firstName,row.lastName))
    session.execute(insert_queries["user_songs"],(row.song, row.userId,row.firstName,row.lastName))

### Testing the database

#### Query 1: 
Given the **sessionId** and **itemInSession**, return artists name, song title and song length from the user activity data

In [9]:
results = session.execute(select_queries["songs_by_session"],(293,94))
for item in results:
    print(item)

Row(artist='Kanye West', song_title='Celebration', song_length=198.4779052734375)


#### Query 2: 
Given **userId** and **sessionId**, return the name of the artist, song title (sorted by itemInSession) and user(firstName and lastName)

In [10]:
results = session.execute(select_queries["songs_artist_user"],(10,182))
for item in results:
    print(item)

Row(artist='Down To The Bone', song_title="Keep On Keepin' On", first_name='Sylvie', last_name='Cruz')
Row(artist='Three Drives', song_title='Greece 2000', first_name='Sylvie', last_name='Cruz')
Row(artist='Sebastien Tellier', song_title='Kilometer', first_name='Sylvie', last_name='Cruz')
Row(artist='Lonnie Gordon', song_title='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', first_name='Sylvie', last_name='Cruz')


#### Query 3: 
Given the **song title**, return each user (first and last name) in the music app history who listened to that song

In [11]:
results = session.execute(select_queries["user_songs"],('All Hands Against His Own',))
for item in results:
    print(item)

Row(song_title='All Hands Against His Own', first_name='Jacqueline', last_name='Lynch')
Row(song_title='All Hands Against His Own', first_name='Tegan', last_name='Levine')
Row(song_title='All Hands Against His Own', first_name='Sara', last_name='Johnson')


### Clean Up Time!!

In [12]:
# Clear the database
for k in drop_queries:
    session.execute(drop_queries[k])

session.shutdown()
cluster.shutdown()