# Project: Data Modeling with Cassandra

## Introduction
Sparkify wants to analyze the song played event data they've been collecting on songs and user activity on their new music streaming app. They are interested in understanding what songs users are listening to. <br>

Team identified 3 queries to analyze the song played event data. <br>

Solution will use Apache Cassandra to support these queries. Solution will require processing event log data in CSV format to be loaded to tables in Apache Cassandra database.


## Solution
Solution has two phases:
 1. Part-I  : ETL pipeline for pre-processing the song played event log files
 2. Part-II : Create Apache Cassandra tables for these queires and load data to event data tables

### Part-I : ETL pipeline for pre-processing the song played event log files

#### Import Python packages 

In [93]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv
import logging


#### Creates list of filepaths to process original event csv data files

In [94]:
# tim.o.> adopted from the original project template
def getFilePaths(filepath      # type: str, file path to obtain filepaths of files in this directory
                ):
    ''' Returns filepaths from the given directory (and all subdirectories under this hierarchy). 
        There is no filter for name of file/directory.
    '''
    assert (filepath != None), "Invalid argument!"
    assert (filepath != ''), "Invalid Value!"
    file_path_list = None
    # Create a for loop to create a list of files and collect each filepath
    for root, dirs, files in os.walk(filepath):    
        # join the file path and roots with the subdirectories using glob
        file_path_list = glob.glob(os.path.join(root,'*'))

    return file_path_list

def printFilePaths(file_path_list     # type: List[str], contains list of filepaths
                  ):
    ''' Prints filepaths in the given list
    '''
    assert (file_path_list != None), "Invalid argument!"
    cnt = 0
    for f in file_path_list:
        print(f)
        cnt += 1
    return cnt


In [95]:
logger = logging.getLogger(__name__)
# checking your current working directory
print('Current working directory is %s ' % os.getcwd())
logger.info('Project-2> START')
logger.info('Project-2> Current working directory is %s ', os.getcwd())

# Get your current folder and subfolder event data
file_path = os.getcwd() + '/event_data'
logger.info('Project-2> Looking for files in %s ', file_path )

# Get filepaths
file_path_list=getFilePaths(file_path)
print('There are %d event files in %s.' % (printFilePaths(file_path_list),file_path))

Current working directory is /home/workspace 
/home/workspace/event_data/2018-11-07-events.csv
/home/workspace/event_data/2018-11-24-events.csv
/home/workspace/event_data/2018-11-04-events.csv
/home/workspace/event_data/2018-11-17-events.csv
/home/workspace/event_data/2018-11-23-events.csv
/home/workspace/event_data/2018-11-21-events.csv
/home/workspace/event_data/2018-11-15-events.csv
/home/workspace/event_data/2018-11-08-events.csv
/home/workspace/event_data/2018-11-11-events.csv
/home/workspace/event_data/2018-11-06-events.csv
/home/workspace/event_data/2018-11-25-events.csv
/home/workspace/event_data/2018-11-02-events.csv
/home/workspace/event_data/2018-11-30-events.csv
/home/workspace/event_data/2018-11-29-events.csv
/home/workspace/event_data/2018-11-05-events.csv
/home/workspace/event_data/2018-11-26-events.csv
/home/workspace/event_data/2018-11-28-events.csv
/home/workspace/event_data/2018-11-14-events.csv
/home/workspace/event_data/2018-11-22-events.csv
/home/workspace/event_d

#### Processing the event log files to create the event data file csv that will be loaded to Apache Casssandra tables

In [96]:
#
# tim.o.> adopted from project's template
#
def writeHeaderToEventDataFile(event_data_fname     # type: str, event data file name
                              ):
    ''' Writes a header to event data file'''
    assert (event_data_fname != None),'Invalid Value: Event Data File Name is None!'
    assert (event_data_fname != ''),'Invalid Value: Event Data File Name is Empty!'
    # Writes header to event data file
    with open(event_data_fname, 'w', encoding = 'utf8', newline='') as f:
        writer = csv.writer(f, dialect='myDialect')
        writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    
    return


def writeToEventDataFile(full_data_rows_list, # type: List[str], list of lines from one event log
                         event_data_fname     # type: str, event data file name
                        ):
    ''' Extracts the fields from each row of event log to write to event data file 
    '''
    assert (event_data_fname != None),'Invalid Value: Event Data File Name is None!'
    assert (event_data_fname != ''),'Invalid Value: Event Data File Name is Empty!'
    assert (full_data_rows_list != None),'Invalid Value: List of lines is None!'
    
    with open(event_data_fname, 'a', encoding = 'utf8', newline='') as f:
        writer = csv.writer(f, dialect='myDialect')
        # For each row, extract the fields needed 
        for row in full_data_rows_list:
            if (row[0] != ''):
                writer.writerow((row[0], row[2], row[3], row[4], 
                                 row[5], row[6], row[7], row[8], 
                                 row[12], row[13], row[16]))
    return


def processALogFile(file_name  # type: str, file name of event log csv file
                   ):
    '''
    Reads event log (file_name) file and collects each line into a list
    '''
    assert (file_name != None),'Invalid Value: Event Log File Name is None!'
    assert (file_name != ''),'Invalid Value: Event Log File Name is Empty!'
    
    # initiating an empty list of rows that will be generated from each file
    full_data_rows_list = [] 
    
    # reading a csv file, f 
    with open(file_name, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)        
        # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line), if (line != '')
            full_data_rows_list.append(line) 

    return full_data_rows_list

    
def processLogFiles(file_path_list,   # type: List[str], list of event log file names
                    event_data_fname  # type: str, event data file name
                   ):
    ''' 
    Extracts data from event log files (file_path_list), and collects them into event data file.
    '''
    assert (file_path_list != None),'Invalid Value: List of event log file names is None!'
    assert (event_data_fname != None),'Invalid Value: Event Data File Name is None!'
    assert (event_data_fname != ''),'Invalid Value: Event Data File Name is Empty!'

    # for every filepath in the file path list 
    for f in file_path_list:
        full_data_rows_list=processALogFile(f)
        #if( full_data_rows_list != None and len(full_data_rows_list) > 0):
        writeToEventDataFile(full_data_rows_list,event_data_fname)

    return


In [97]:
#
# tim.o.> continue to follow the project template
#

# Extract event logs to event data file
evt_data_file_name = 'event_datafile_new.csv'

# write header to event data file
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)
writeHeaderToEventDataFile(evt_data_file_name)

processLogFiles(file_path_list,evt_data_file_name)

logging.info('Project-2> Completed transfering event log files in %s to %s ', file_path,evt_data_file_name )


In [98]:
#
# tim.o.> from project template
#
# check the number of rows in event data csv file
with open(evt_data_file_name, 'r', encoding = 'utf8') as f:
    print('There are %d rows in \"%s\".'%(sum(1 for line in f),evt_data_file_name))

There are 6821 rows in "event_datafile_new.csv".


### Part-II : Create Apache Cassandra tables for the following queries and load data to event data tables


The CSV file titled <font color=red>**event_datafile_new.csv**</font>, located within the Workspace directory, contains all song played event logs. 

The <font color=red>**event_datafile_new.csv**</font> contains the following columns: <br>

| Index | Column                     |
|-------|----------------------------|
|  0    | artist                     |
|  1    | firstName of user          |
|  2    | gender of user             |
|  3    | item number in session     |
|  4    | last name of user          |
|  5    | length of the song         |
|  6    | level (paid or free song)  |
|  7    | location of the user       |
|  8    | sessionId                  |
|  9    | song title                 |
| 10    | userId                     |

The image below is a screenshot of the denormalized data in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

### Connecting to Apache Cassandra 
First, it will connect to <i>cluster</i> and get a <i>session</i> object. <br>
Second, it will define a <i>sparkify</i> keyspace. <br>
Third, it will set the keyspace to <i>spakify</i>.<br>

#### Connecting to a Cluster

In [99]:
from cassandra.cluster import Cluster

In [100]:
#
# tim.o.> adopted from project template
#
def connectToCassandraCluster(list_of_nodes   # type: List[str], list of ip addresses
                             ):
    '''
    Connects to Cassandra Cluster, returns cluster and session objects
    '''
    assert (list_of_nodes != None), 'Invalid Value: List of ip addresses is None!'
    assert (len(list_of_nodes) > 0), 'Invalid Value: List of ip addresses is Empty!'
    
    cluster = None
    session = None
    try:
        cluster = Cluster(list_of_nodes)
        logger.debug('Got cluster')
        
        # To establish connection and begin executing queries, need a session
        session = cluster.connect()
        logger.debug('Connected, got session!')
        
    except Exception as e:
        print(e)
        logger.exception(e)

    return cluster, session

In [101]:
# Conect to Cassandra instance on local machine
list_of_nodes= ['127.0.0.1']
cluster, session = connectToCassandraCluster(list_of_nodes)

#### Create Keyspace

In [102]:
# Create a Keyspace for Sparkify
try:
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS sparkify 
        WITH REPLICATION = 
        { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
    )
    logger.debug('CREATE KEYSPACE sparkify, OK!')
except Exception as e:
    print(e)
    logger.exception(e)

#### Set Keyspace

In [103]:
# Set KEYSPACE to 'sparkify'
try:
    session.set_keyspace('sparkify')
    logger.debug('SET KEYSPACE TO sparkify, OK!')
except Exception as e:
    print(e)
    logger.exception(e)


## Task: Create queries to ask the following three questions of the data

#### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


#### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

#### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




 
Now we need to create tables to run these queries. <br>
We will design the database tables for these queries with Apache Cassandra.

In [104]:
#
# tim.o.> adapted from project template
#
def insertToDB(dbSession,       # type: , database session handle
               file,            # type: str, name of event data csv file
               istmt,           # type: str, CQL Insert Statement 
               funcLine         # type: func_ptr, Pointer to a function: line -> fields in istmt
              ):
    ''' 
    Inserts fields from event data file to database table
    query string (of INSERT CQL) 
    A function (funcLine) to extract fields from an event data file line (type: List[str])
    '''
    assert (dbSession != None), 'Invalid Value: database session is None!'
    assert (file != None), 'Invalid Value: Name of event data csv file is None!'
    assert (file != ''), 'Invalid Value: Name of event data csv file is Empty!'
    assert (istmt != None), 'Invalid Value: CQL INSERT statement is None!'
    assert (istmt != ''), 'Invalid Value: CQL INSERT statement is Empty!'
    assert (funcLine != None), 'Invalid Value: Function: line->fields in INSETRT statement is None!'
    
    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader) # skip header
        # for each line
        for line in csvreader:
            dbSession.execute(istmt, funcLine(line))
            
    return

### Query 1
**Query**:  Give me the artist, song's title and song's length in the music app history 
that was heard during sessionId = 338, and itemInSession = 4

Expected output columns are;
 1. artist's name
 1. song's title
 1. song's length

The required query uses <i>sessionId</i> and <i>itemInSession</i> in WHERE clause.

**PRIMARY KEY (PK) Rationale**<br>
 - Table for this query should use sessionId as <b>Partition Key</b> and itemInSession as <b>Clustering Column</b>. <br>
 - The value of sessionId will help to route the given query to one node. Assuming all rows of a session can fit in one node. 

**Data Type Selection Rationale** <br>

|Field| Data Type (Cassandra,[CQL](https://docs.datastax.com/en/cql/3.3/cql/cql_reference/cql_data_types_c.html))| Why? |
|-----|----------|-----|
| sessionId (K) | bigint | There will be many sessions. (<i>partition **K**ey</i>) |
| itemInSession (C)| int | Number of items in a session.<br> A range of [0,max(int)] should be sufficient. (<i>**C**lustering column</i>) |
| artistName | text | String data type for artist's name. (<i>query return</i>) |
| songTitle  | text | String data type for title of played song. (<i>query return</i>) |
| songDuration | float | Duration of played song. <br> A range of [0,max(float)] should be sufficient.  (<i>query return</i>) |

<p>
Combined columns (sessionId and itemInSession) uniquely identify a song played event. 

In [105]:
##
## Create table for query 1
## Contains fields that need to be retrieved and primary key (partition key,clustering column)
##

table_q1_name = 'songplayed_By_Session_And_ItemInSession'

# CQL CREATE TABLE statement
stmt = "CREATE TABLE IF NOT EXISTS " + table_q1_name
stmt = stmt + " (sessionId bigint, itemInSession int, artistName text, songTitle text, songDuration double, \
                PRIMARY KEY(sessionId, itemInSession));"
logger.debug('CREATE TABLE FOR Q1: [%s]', stmt)

try:
    session.execute(stmt)
except Exception as e:
    print(e)
    logger.exception(e)

In [106]:
def funcLine1(line    # type: List[str], a row of event data csv file
             ):
    ''' Returns extracted column values from given line, a row of event data file 
    '''
    assert (line != None), 'Invalid Value: A line of event data file is None!'
    assert (len(line) > 9), 'Invalid Value: A line of event data file is Empty!'
    return (int(line[8]),int(line[3]),line[0],line[9],float(line[5]))


# Prepare CQL INSERT Statement
istmt = "INSERT INTO " + table_q1_name
istmt = istmt + " (sessionId, itemInSession, artistName, songTitle, songDuration)"
istmt = istmt + " VALUES (%s, %s, %s, %s, %s);"

# Extract and load rows from event data csv file to a table in Cassandra 
insertToDB(session,evt_data_file_name,istmt,funcLine1)

#### Do a SELECT to verify that the data have been inserted into each table

In [107]:
#
# tim.o.> adopted from project template
#
def queryBySessionIdItemInSession(dbSession,
                                  table_name,
                                  sessionId,
                                  itemInSession
                                 ):
    ''' Retrieves artist's name, title of song, and duration song for given session and item in session.
    '''
    assert (dbSession != None), 'Invalid Value: Database Session is None!'
    assert (table_name != None), 'Invalid Value: Table Name is None!'
    assert (table_name != ''), 'Invalid Value: Table Name is Empty!'
    # Return
    rows = None
    # CQL SELECT statement
    query = "SELECT artistName, songTitle, songDuration FROM "+ table_name;
    query = query + " WHERE sessionId = " + str(sessionId);
    query = query + " and itemInSession = "+ str(itemInSession) +";"
    logger.debug(' Query [%s]', query)
    
    try:
        rows = dbSession.execute(query)
    except Exception as e:
        print(e)
        logger.exception(e)

    return rows

In [108]:
# query parameters
sessionId = 338
itemInSession = 4

# do query
rows = queryBySessionIdItemInSession(session,table_q1_name,sessionId,itemInSession)

# prepare data frame from results set
result_set = []
if(rows != None):
    for row in rows:
        result_set.append([row.artistname,row.songtitle,row.songduration])

q1_text = 'Give me the artist, song\'s title and song\'s length in the music app history ';
q1_text = q1_text +'that was heard during'; 
print( '\n\n%s sessionId = %d and itemInSession = %d \n\n' % (q1_text,sessionId,itemInSession))

res_df = pd.DataFrame(result_set,index=None,columns=['artistName','songTitle','songDuration'])
res_df



Give me the artist, song's title and song's length in the music app history that was heard during sessionId = 338 and itemInSession = 4 




Unnamed: 0,artistName,songTitle,songDuration
0,Faithless,Music Matters (Mark Knight Dub),495.3073


### Query 2

**Query**: Give me only the following: name of artist, song (sorted by itemInSession) and 
        user (first and last name) for userid = 10, sessionid = 182

Expected output columns are;
 1. artist's name
 1. song's title
 1. user's first name
 1. user's last name

The required query uses userId, sessionId and itemInSession in WHERE clause.

**PRIMARY KEY (PK) Rationale** <br>
 - Table for this query should use <i>userId</i> as <b>Partition Key</b> because the objective of query is to return data about the songs played by a user for a given session. 
<br>
 - Table for this query should use <i>sessionId</i> and <i>itemInSession</i> as <b>Clustering Column</b>. The inclusion of <i>itemInSession</i> will support the requirement of: result set to be sorted by itemInSession field.<br>

**Assumptions**
 - We can assume that all sessions of any user will fit.

**Data Type Selection Rationale** <br>

|Field| Data Type (Cassandra,[CQL](https://docs.datastax.com/en/cql/3.3/cql/cql_reference/cql_data_types_c.html))| Why? |
|-----|----------|-----|
| userId (K)| bigint | There will be many users. (<i>partition key</i>) |
| sessionId (C)| bigint |  [0,max(bigint)] should be sufficient because there will be many sessions. (<i>clustering column</i>) |
| itemInSession (C)| int | Number of items in a session should within [0,max(int)]. (<i>clustering column</i>) |
| artistName | text | String data type for artist. (<i>query return</i>) |
| songTitle  | text | String data type for title of played song. (<i>query return</i>) |
| userFirstName  | text | First name of user.  (<i>query return</i>) |
| userLastName   | text | Last name of user.  (<i>query return</i>) |

Naming fields <br>
 - Used <i>artist</i> prefix in the field name for artist's name to indicate it is an attribute of artist. <br>
 - Used <i>song</i> prefix in the field name for song's title to indicate it is an attribute of song. <br>
 - Used <i>user</i> prefix in field names for user's id, first name, and last name to indicate that it is attribute of user.<br>


In [109]:
##
## Create Table for Query 2
##
table_q2_name = 'query_By_UserId_SessionId'

# CQL CREATE TABLE statement
stmt = "CREATE TABLE IF NOT EXISTS "+ table_q2_name +" "
stmt = stmt + "(userId int, sessionId bigint, itemInSession int, \
                artistName text, songTitle text, firstName text, lastName text, \
                PRIMARY KEY(userId, sessionId, itemInSession));"
logger.debug('CREATE TABLE FOR Q2: [%s]', stmt)

try:
    session.execute(stmt)
except Exception as e:
    print(e)
    logger.exception(e)

In [110]:
##
## (ETL)
## Load data from event_datafile_new.csv to songsplay_q2
##
def funcLine2(line      # type: List[str], a row of event data csv file
             ):
    ''' Extracts userId, sessionId, itemInSession, artist, song, firstName, lastName
    '''
    assert (line != None), 'Invalid Value: A line of event data file is None!'
    assert (len(line) > 9), 'Invalid Value: A line of event data file is Empty!'
    return (int(line[10]),int(line[8]),int(line[3]),line[0],line[9],line[1],line[4])


# INSERT statement
istmt = "INSERT INTO "+ table_q2_name
istmt = istmt + " (userId,sessionId,itemInSession,artistName,songTitle,firstName,lastName)"
istmt = istmt + " VALUES (%s, %s, %s, %s, %s, %s, %s);"

# extract and load song played event logs to a table
insertToDB(session,evt_data_file_name,istmt,funcLine2)

In [111]:
#
# Query 2: Give me only the following: 
#   name of artist, song (sorted by itemInSession) and user (first and last name)\
#   for userid = 10, sessionid = 182
#
def queryByUserIdSessionId(dbSession,   # type:  , database session
                           table_name,  # type: str, table name
                           userId,      # type: int, userId (query term)
                           sessionId    # type: bigint, sessionId (query term)  
                          ):  
    ''' Retrieves artist, title of song, first and last name of user by using userId and sessionId
    '''
    assert (dbSession != None), 'Invalid Vale: Database Session is None!'
    assert (table_name != None), 'Invalid Vale: Table name is None!'
    assert (table_name != ''), 'Invalid Vale: Table name is Empty!'
    
    rows = None
    # session.prepare()
    query = "SELECT artistName, songTitle, firstName, lastName "
    query = query + "FROM " + table_name + " " 
    query = query + "WHERE userId = " + str(userId) + " and sessionId = " + str(sessionId) + ";"  
    logger.debug('Query: [%s]',query)
    try:
        rows = dbSession.execute(query)
    except Exception as e:
        print(e)
        logger.exception(e)

    return rows


In [112]:
# query parameters
userId = 10
sessionId = 182 

# query
rows = queryByUserIdSessionId(session, table_q2_name, userId, sessionId)

# process results set
result_set = []
if rows != None:
    for row in rows:
        result_set.append((row.artistname,row.songtitle,row.firstname,row.lastname))

# display results
q2_text = "Give me only the following: name of artist, song (sorted by itemInSession) "
q2_text = q2_text + "and user (first and last name) for"
print('\n%s for userid = %d, sessionid = %d \n\n' % (q2_text, userId, sessionId))

res_df = pd.DataFrame(result_set,index=None,columns=['artistName','songTitle','firstName','lastName'])
res_df


Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for for userid = 10, sessionid = 182 




Unnamed: 0,artistName,songTitle,firstName,lastName
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


## Query 3
**Query**: Give me every user name (first and last) in my music app history 
        who listened to the song 'All Hands Against His Own'   

Expected output columns are;
 1. firstName
 1. lastName

The required query uses songTitle in WHERE clause.

**PRIMARY KEY (PK) Rationale** <br>
 - Table for this query should use <i>songTitle</i> as <b>Partition Key</b> because the objective of query is to return first name and last name of users who played the given song.<br>
 - Table for this query should use <i>userFirstName</i> and <i>userLastName</i> as <b>Clustering Column</b> to eliminate repeated play entries of the same song by the same user. 

**Assumptions**
 - The combination of song's title, user's first name, and user's last name will be unique for the query's objective.
 - Hash value of song's title should produce good distribution while we should expect to observe that
 there will be more song play events for favored songs. 
 <br>
 Notes
 - According to query, there is no need to return/store multiple play events of the same song by the same user in the same/different sessions. (Requires inclusion of sessionId and itemInSession fields. When aggregate count() is needed or the datetime of song played event becomes important.)
 - The song's title is unique (i.e., one artist - one song title. The cases of reusing the song title by the same artist(1) or different artists(2) are not handled. Requires inclusion of artist identifier(2), release date/duration attributes(1) assuming that the same artist will not release two songs with the same title in the same year/with exactly the same duration. If not, one needs to look for attributes that can distinguish these two songs.) 
 
**Data Type Selection Rationale** <br>

|Field| Data Type (Cassandra,[CQL](https://docs.datastax.com/en/cql/3.3/cql/cql_reference/cql_data_types_c.html))| Why? |
|-----|----------|-----|
| songTitle (K)  | text | String data type for title of played song. (<i>partition key</i>) |
| userFirstName (C) | text | First name of user.  (<i>clustering column</i>, <i>query return</i>) |
| userLastName (C)  | text | Last name of user.  (<i>clustering column</i>, <i>query return</i>) |

**Naming fields** <br>
 - Used <i>song</i> prefix in the field name for song's title to indicate it is an attribute of a song. <br>
 - Used <i>user</i> prefix in field names for user's first name, and last name to indicate that it is an attribute of a user.<br>


In [113]:
##
## Create Table for Query 3
##
table_q3_name = "query_By_SongTitle"

# CQL CREATE TABLE statement
stmt = "CREATE TABLE IF NOT EXISTS " + table_q3_name + " "
stmt = stmt + "(songTitle text, userId bigint, userFirstName text, userLastName text, \
                PRIMARY KEY(songTitle, userId));"

logger.debug('CREATE TABLE FOR Q3: [%s]', stmt)

try:
    session.execute(stmt)
except Exception as e:
    print(e)
    logger.exception(e)

In [114]:
##
## (ETL)
## Load data from event_datafile_new.csv to songsplay_q3
##
def funcLine3(line       # type: List[str], list of columns in a line from event data
             ):
    '''Picks song, userId, firstName, and lastName
    '''
    assert (line != None), 'Invalid Value: A line of event data file is None!'
    assert (len(line) > 9), 'Invalid Value: A line of event data file is Empty!'
    return (line[9],int(line[10]),line[1],line[4])


# INSERT statement
istmt3 = "INSERT INTO " + table_q3_name + " (songTitle, userId, userFirstName, userLastName)"
istmt3 = istmt3 + " VALUES (%s, %s, %s, %s);"

# extract and load song played event logs to a table
insertToDB(session,evt_data_file_name,istmt3,funcLine3)


In [115]:
##
## Query: Give me every user name (first and last) in my music app history 
## who listened to the song 'All Hands Against His Own'
##
def queryUserFirstLastNameBySongTitle(dbSession,  # type: , database session
                                      table_name, # type: str, table name
                                      songTitle   # type: str, song title [query term]
                                     ):
    ''' Retrives first and last name of users who listened to given song
    '''
    assert (dbSession != None), 'Invalid Value: Database session is None!'
    assert (table_name != None), 'Invalid Value: Table name is None!'
    assert (table_name != ''), 'Invalid Value: Table name is Empty!'
    assert (songTitle != None), 'Invalid Value: Song title is None!'
    assert (songTitle != ''), 'Invalid Value: Song title is Empty!'
    
    rows = None      # return
    # CQL SELECT Statement
    query = "SELECT userFirstName, userLastName"
    query = query + " FROM "+table_name
    query = query + " WHERE songTitle=\'"+songTitle+"\';"
    logger.debug('Query: [%s]', query)
    try:
        rows = dbSession.execute(query)
    except Exception as e:
        print(e)
        logger.exception(e)
        
    return rows

In [116]:
# query parameters
songTitle = 'All Hands Against His Own'

# query
rows = queryUserFirstLastNameBySongTitle(session,table_q3_name,songTitle)

# process result set
result_set = []
if rows != None:
    for row in rows:
        result_set.append((row.userfirstname,row.userlastname))

# display results
q3_text = 'Give me every user name (first and last) in my music app history who listened to the song'
print('\n\n%s \'%s\'.\n' % (q3_text, songTitle))

res_df = pd.DataFrame(result_set,index=None,columns=['userFirstName','userLastName'])
res_df



Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'.



Unnamed: 0,userFirstName,userLastName
0,Jacqueline,Lynch
1,Tegan,Levine
2,Sara,Johnson


### Drop the tables before closing out the sessions

dropTable() drops one table (if exists), given by table name. <br>
dropTables() drops tables (if exists), given by array of table names. <br>

In [117]:
def dropTable(dbSession,   # type: , database session
              table_name   # type: str, table name
             ):
    ''' Drops the given table if exists
    '''
    assert (dbSession != None), 'Invalid Value: Database Session is None!'
    assert (table_name != None), 'Invalid Value: Table name is None!'
    assert (table_name != ''), 'Invalid Value: Table name is Empty!'
    # CQL DROP Statement
    stmt="DROP TABLE IF EXISTS "+table_name + ";";

    try:
        rows = dbSession.execute(stmt)
    except Exception as e:
        print(e)     
        logger.exception(e)
    
    return 

In [118]:
def dropTables(dbSession,        # type: , database session
               table_names       # type: List[str], list of table names to be dropped
              ):
    ''' Drops tables in the list 
    '''
    assert (dbSession != None), 'Invalid Value: Database Session is None!'
    assert (table_names != None), 'Invalid Value: List of table names is None!'
    assert (len(table_names) > 0), 'Invalid Value: List of table names is Empty!'

    for f in table_names:
        dropTable(dbSession, f)
        
    return

In [119]:
# Drop tables
#
tables=[table_q1_name, table_q2_name, table_q3_name]
dropTables(session, tables)

### Close the session and cluster connection¶

In [120]:
session.shutdown()
cluster.shutdown()

logger.info('Project-2> END')