In [1]:
# Import Python packages 
import pandas as pd
import cassandra
from cassandra.cluster import Cluster
import re
import os
import glob
import numpy as np
import json
import csv

# Part I. ETL Pipeline for Pre-Processing the Files

## PRE-PROCESSING CSV FILES

**Goal**: Create a new flat CSV file containing all event information collected from the event_data folder. 

#### Creates list of filepaths to process original event csv data files

In [2]:
def get_source_files():
    """ Retrieve datastore directory and collect each source filepath
     
    Returns: A list of filepaths for each event data file.
    """
    filepath = os.getcwd() + '/event_data'
    file_path_list = []
    for root, dirs, files in os.walk(filepath):
        # join the file path and roots with the subdirectories using glob
        files = glob.glob(os.path.join(root,'*events.csv'))
        for f in files:
            file_path_list.append(os.path.abspath(f))
    
    return file_path_list

#### Processes the files to create the data file csv that will be used for Apache Casssandra tables

Note: Running this function will create a file event_datafile_new.csv on the current workspace.

In [3]:
def generate_csv_dataset():
    """ Creates a merged CSV dataset file called `event_datafile_new.csv` with all the event information
    """

    # retrieve list of sources files
    file_path_list = get_source_files()
    # stores all data rows in a list 
    full_data_rows_list = [] 
    
    # process every CSV filepath retrieved and join data in a list 
    for f in file_path_list:
        with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
            csvreader = csv.reader(csvfile) 
            next(csvreader) # Skip header line
            for line in csvreader:
                full_data_rows_list.append(line) 
            
    # creating a smaller event data csv file called event_datafile_new csv that will be used to insert data into the \
    # Apache Cassandra tables
    csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)
    with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
        writer = csv.writer(f, dialect='myDialect')
        writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                    'level','location','sessionId','song','userId'])
        for row in full_data_rows_list:
            if (row[0] == ''):
                continue
            writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


#### Part I - Main pipeline

Run this code before going to Part II

In [4]:
try:
    generate_csv_dataset()
except Exception as e:
    print(e)

# Part II. Loading data to Apache Cassandra

## These code below requires a file <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Creating Apache Cassandra cluster

#### Creating a Cluster

In [5]:
def create_connection():
    """ This should make a connection to a Cassandra instance on your local machine (127.0.0.1)
    
    Return values:
    - Tuple with cluster and session objects to a cassandra local database
    """ 
    cluster = Cluster()
    session = cluster.connect()
    return (cluster, session)

#### Create and Set Keyspace
**Note**: We will set the session to return the resultset as a pandas dataframe

In [6]:
def init_keyspace(conn):
    """ Create and set the keyspace sparkify from a open session to a Cassandra database.
    
    Parameters:
    - conn: connection to a cassandra cluster
    """
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS sparkify
        WITH REPLICATION = 
        { 'class': 'SimpleStrategy', 'replication_factor': 1}
    """)
    
    session.row_factory = lambda colnames, rows: pd.DataFrame(rows, columns=colnames)
    
    session.set_keyspace('sparkify')

In [7]:
def pandas_factory(colnames, rows):
    """ Converts cassandra result set to pandas dataframe. Apply it to session.row_factory before running a query.
    """
    return pd.DataFrame(rows, columns=colnames)

#### Create cluster and session globals for a cassandra keyspace

In [8]:
try:
    cluster,session = create_connection()
    init_keyspace(session)
except Exception as e:
    print(e)

## Create queries to ask the following three questions of the data

### 1. Give me the `artist`, `song` title and song's `length` in the music app history that was heard during  `sessionId = 338`, and `itemInSession  = 4`

#### ETL Routine to load data to cassandra table:

In [9]:
def load_song_by_session(sess):
    """ ETL routine to create cassandra table to answer 1st query relating song from session_id and item_in_session.
    
    Parameters:
    - cassandra session to create table in the keyspace set
    """
    
    query = """
        CREATE TABLE IF NOT EXISTS song_by_session (
            session_id INT
            , item_in_session INT
            , artist TEXT
            , song TEXT
            , length FLOAT
            , PRIMARY KEY(
                (session_id),
                item_in_session
            )
        )
    """
    session.execute(query)

    # CSV dataset to extract data
    file = 'event_datafile_new.csv'

    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader)
        for line in csvreader:
            query = """
                INSERT INTO song_by_session (
                    session_id
                    , item_in_session
                    , artist
                    , song
                    , length
                ) VALUES(%s, %s, %s, %s, %s);
            """
            # Index of the matching attributes in the CSV line.
            row = (int(line[8]), int(line[3]), line[0], line[9], float(line[5]))
            session.execute(query, row)        

#### Query function to search `artist`, `song` and `length` by `session_id` and `item_in_session`

In [10]:
def query_song_by_session(sess, session_id, item_in_session):
    """ Query song, artist and length streamed in the app by the given session id and itemInSession.
    
    Example:
    - Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4
    
    Parameters:
    - cassandra session to query a keyspace
    - session_id used as partition key value to query the table
    - item_in_session used as clustering column to query the table

    Returns:
    - Resultset as a pandas dataframe
    """
    query = """
        SELECT
            artist
            , song
            , length
        FROM song_by_session
        WHERE
            session_id = {}
            AND item_in_session = {};
    """.format(session_id, item_in_session)

    session.row_factory = pandas_factory
    result_df = session.execute(query)._current_rows
    return result_df

#### Main pipeline for question 1

Execute proposed query statement to answer business question

In [11]:
try:
    load_song_by_session(session)
    df = query_song_by_session(session,338,4)
except Exception as e:
    print(e)
    
df

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


### 2. Give me only the following: name of `artist`, `song` (sorted by `item_in_session`) and `user` (first and last name) for `user_id = 10`, `session_id = 182`

#### ETL Routine to load data to cassandra table:

In [12]:
def load_song_by_user(sess):
    """ ETL routine to create cassandra table to answer 2nd query relating song from user_id and session_id.
    
    Parameters:
    - cassandra session to create table in the keyspace set
    """
    
    query = """
        CREATE TABLE IF NOT EXISTS song_by_user (
            user_id INT
            , session_id INT
            , item_in_session INT
            , artist TEXT
            , song TEXT
            , user TEXT
            , PRIMARY KEY(
                (user_id, session_id),
                item_in_session
            )
        )
    """
    session.execute(query)

    # CSV dataset to extract data
    file = 'event_datafile_new.csv'

    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader)
        for line in csvreader:
            query = """
                INSERT INTO song_by_user (
                    user_id
                    , session_id
                    , item_in_session
                    , artist
                    , song
                    , user
                ) VALUES(%s, %s, %s, %s, %s, %s);
            """
            # Index of the matching attributes in the CSV line.
            row = (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1] + ' ' + line[4])
            session.execute(query, row)

#### Query function to search song by user_id and session_id

In [13]:
def query_song_by_user(sess, user_id, session_id):
    """ Query song, artist and user streamed in the app by the given user and session id ordered by item_in_session.
    
    Example:
    - Give me only the following: name of artist, song (sorted by item_in_session) and user (first and last name) for user_id = 10, session_id = 182
    
    Parameters:
    - cassandra session to query a keyspace
    - user_id used as partition key value to query the table
    - session_id used as clustering column to query the table
    
    Returns:
    - Resultset as a pandas dataframe
    """
    query = """
        SELECT
            artist
            , song
            , user
        FROM song_by_user
        WHERE
            user_id = {}
            AND session_id = {}
        ORDER BY item_in_session ASC;
    """.format(user_id, session_id)

    session.row_factory = pandas_factory
    result_df = session.execute(query)._current_rows
    return result_df

#### Main pipeline for question 2

Execute proposed query statement to answer business question

In [14]:
try:
    load_song_by_user(session)
    df = query_song_by_user(session,10,182)
except Exception as e:
    print(e)

df

Unnamed: 0,artist,song,user
0,Down To The Bone,Keep On Keepin' On,Sylvie Cruz
1,Three Drives,Greece 2000,Sylvie Cruz
2,Sebastien Tellier,Kilometer,Sylvie Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie Cruz


### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### ETL Routine to load data to cassandra table:

In [15]:
def load_user_by_song(sess):
    """ ETL routine to create cassandra table to answer 3nd query relating user from song.
    
    Parameters:
    - cassandra session to create table in the keyspace set
    """
    
    query = """
        CREATE TABLE IF NOT EXISTS user_by_song (
            song TEXT
            , user TEXT
            , PRIMARY KEY((song), user)
        )
    """
    session.execute(query)

    # CSV dataset to extract data
    file = 'event_datafile_new.csv'

    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader)
        for line in csvreader:
            query = """
                INSERT INTO user_by_song (
                    song
                    , user
                ) VALUES(%s, %s);
            """
            # Index of the matching attributes in the CSV line.
            row = (line[9], line[1] + ' ' + line[4])
            session.execute(query, row)

#### Query function to search username by song

In [16]:
def query_user_by_song(sess, song):
    """ Query username (first name and last name) by song in the event dataset.
    
    Example:
    - Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
    
    Parameters:
    - cassandra session to query a keyspace
    - song as the name of the song to be queried
    
    Returns:
    - Resultset as a pandas dataframe
    """
    query = """
        SELECT user
        FROM user_by_song
        WHERE song = '{}'
    """.format(song)

    session.row_factory = pandas_factory
    result_df = session.execute(query)._current_rows
    return result_df

#### Main pipeline for question 3

Execute proposed query statement to answer business question

In [17]:
try:
    load_user_by_song(session)
    df = query_user_by_song(session,'All Hands Against His Own')
except Exception as e:
    print(e)
    
df

Unnamed: 0,user
0,Jacqueline Lynch
1,Sara Johnson
2,Tegan Levine


### Drop the tables before closing out the sessions

In [18]:
try:
    session.execute("DROP TABLE IF EXISTS song_by_session;")
    session.execute("DROP TABLE IF EXISTS song_by_user;")
    session.execute("DROP TABLE IF EXISTS user_by_song;")
except Exception as e:
    print(e)

### Close the session and cluster connection¶

In [19]:
session.shutdown()
cluster.shutdown()