# Apache Cassandra Database Modeling & ETL Pipeline

#### Import Python packages 

In [118]:
# sys libs
import os
import glob
from io import StringIO
# data libs
import pandas as pd
import csv
# database libs
from cassandra.cluster import Cluster

## ETL Phase 1: Pre-Processing CSV Files

#### Create the list of filepaths to process event_data CSV files

In [119]:
# create a list of files in the data folder
def get_file_list(dir):
    # get the current folder and 'dir' subfolder
    filepath = os.path.abspath(f'{os.getcwd()}/{dir}')

    # create a list of files and collect each filepath
    for root, dirs, files in os.walk(filepath):
        # join the file path and roots with the subdirectories using glob
        file_path_list = glob.glob(os.path.join(root, '*'))

    return file_path_list

#### Process the files list to create the data_rows list

In [120]:
# read all csv files and return a list of data rows
def get_data_rows(files):
    # initiating an empty list of rows
    data_rows = []
    # for every filepath in the file path list
    for f in get_file_list(files):
        # reading csv file
        with open(f, 'r', encoding='utf8', newline='') as csvfile:
            # create a csv reader object
            csvreader = csv.reader(csvfile)
            next(csvreader)
            # extracting each data row and append it
            for line in csvreader:
                data_rows.append(line)

    return data_rows

#### Write the single target CSV file to generate insert statements for Cassandra's tables

In [121]:
# create a single event data csv file called 'event_datafile_new.csv'
def write_target_file(file, rows):

    # file used to insert data into the Apache Cassandra tables
    targetfile = os.path.abspath(f'{os.getcwd()}/{file}')

    # configure a csv writer object
    csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)
    # create a csv writer object
    with open(targetfile, 'w', encoding='utf8', newline='') as f:
        # write the header row
        writer = csv.writer(f, dialect='myDialect')
        writer.writerow(['artist', 'firstName', 'gender', 'itemInSession', 'lastName',
                         'length', 'level', 'location', 'sessionId', 'song', 'userId'])
        # write each data row
        for row in rows:
            # ignore all rows where the artist is empty
            if (row[0] == ''):
                continue
            writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6],
                             row[7], row[8], row[12], row[13], row[16]))
    
    # returns the absolute target file path
    return targetfile

#### Process source files and write the target file

In [122]:
# read all csv files in 'data/event_data' and return a list of data rows 
data_rows = get_data_rows('data/event_data')

# write a single csv file from the data rows
targetfile = write_target_file('event_data_new.csv', data_rows)

# print the number of rows in the source files
print(f'INFO: Rows read from the source CSV: {len(data_rows)}')

# print the number of rows in the target file
with open(targetfile, 'r', encoding = 'utf8') as f:
    print(f'INFO: Rows written in target CSV: {sum(1 for line in f)}')

print('INFO: Source files were filtered to remove rows with empty artist names.')

INFO: Rows read from the source CSV: 8056
INFO: Rows written in target CSV: 6821
INFO: Source files were filtered to remove rows with empty artist names.


## Apache Cassandra Database Modeling 

##### The <font color=blue>event_data_new.csv</font> contains the following columns: 

- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of the denormalized data in <font color=blue>**event_data_new.csv**</font> after the code above is run:<br>

<img src="./images/image_event_data_new.jpg">

#### Creating a Cluster

In [123]:
try:
    # creates and returns a new Session object. 
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [124]:
try:
    # create a keyspace if it doesn't exist
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
    """)
except Exception as e:
    print(e)

#### Set Keyspace

In [125]:
try:
    # set the current session keyspace
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

### Create queries to ask the following three questions:

##### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

In [126]:
select_songplay_by_session = """
SELECT artist, song, length
FROM songplayBySession
WHERE sessionId = 338 AND itemInSession = 4
"""

##### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [127]:
select_songplay_by_user = """
SELECT artist, song, firstName, lastName
FROM songplayByUser
WHERE userId = 10 AND sessionId = 182
ORDER BY itemInSession ASC
"""

##### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [128]:
select_songplay_by_song = """
SELECT firstName, lastName
FROM songplayBySong
WHERE song = 'All Hands Against His Own'
"""

#### Create Pandas DataFrame to Test Queries

In [129]:
event_data = pd.read_csv(targetfile)
print(f'The data set has {event_data.shape[1]} columns and {event_data.shape[0]} rows')
event_data.head(2)

The data set has 11 columns and 6820 rows


Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,The Killers,Jayden,M,32,Graves,246.80444,paid,"Marinette, WI-MI",594,Read My Mind,25
1,Tamia,Jayden,M,33,Graves,243.09506,paid,"Marinette, WI-MI",594,Officially Missing You (Radio Version),25


In [130]:
# query 1
select_columns = ['artist', 'song', 'length']
where_conditions = '(sessionId == 338) & (itemInSession == 4)'
event_data.query(where_conditions)[select_columns]

Unnamed: 0,artist,song,length
366,Faithless,Music Matters (Mark Knight Dub),495.3073


In [131]:
# query 2
select_columns = ['artist', 'song', 'firstName', 'lastName']
where_conditions = '(userId == 10) and (sessionId == 182)'
event_data.query(where_conditions).sort_values('itemInSession', ascending=True)[select_columns]

Unnamed: 0,artist,song,firstName,lastName
2663,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
2664,Three Drives,Greece 2000,Sylvie,Cruz
2665,Sebastien Tellier,Kilometer,Sylvie,Cruz
2666,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


In [132]:
# query 3
select_columns = ['firstName', 'lastName']
where_conditions = "song == 'All Hands Against His Own'"
event_data.query(where_conditions)[select_columns]

Unnamed: 0,firstName,lastName
693,Sara,Johnson
1249,Jacqueline,Lynch
5838,Tegan,Levine


#### Find Unique Keys with Pandas DataFrame

In [133]:
# create a set of key combinations for each table
key_combinations = [
    ['songplayBySession', ['sessionId'], 0, ''],
    ['songplayBySession', ['sessionId', 'itemInSession'], 0, ''],
    ['songplayByUser', ['userId'], 0, ''],
    ['songplayByUser', ['userId', 'sessionId'], 0, ''],
    ['songplayByUser', ['userId', 'sessionId', 'itemInSession'], 0, ''],
    ['songplayBySong', ['song'], 0, ''],
    ['songplayBySong', ['song', 'firstName'], 0, ''],
    ['songplayBySong', ['song', 'firstName', 'lastName'], 0, ''],
    ['songplayBySong', ['song', 'sessionId', 'itemInSession'], 0, '']
]

# loop through the key combinations and test for primary key candidates
for key_combination in key_combinations:
    key_combination[2] = event_data.duplicated(key_combination[1]).sum()
    key_combination[3] = 'Yes' if key_combination[2] == 0 else 'No'

# show the results
key_cadidates = pd.DataFrame(key_combinations,
                             columns=['Table', 'Columns', 'Number of Duplicates', 'Is Unique']).set_index('Table')
key_cadidates

Unnamed: 0_level_0,Columns,Number of Duplicates,Is Unique
Table,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
songplayBySession,[sessionId],6044,No
songplayBySession,"[sessionId, itemInSession]",0,Yes
songplayByUser,[userId],6724,No
songplayByUser,"[userId, sessionId]",6044,No
songplayByUser,"[userId, sessionId, itemInSession]",0,Yes
songplayBySong,[song],1630,No
songplayBySong,"[song, firstName]",218,No
songplayBySong,"[song, firstName, lastName]",202,No
songplayBySong,"[song, sessionId, itemInSession]",0,Yes


#### Create Table Statements

**IMPORTANT:** The tables' primary keys were defined according to each **query requirement** and the **key candidates** above. All information presented in the Udacity course material, in addition to the information found on [this course page](https://www.baeldung.com/cassandra-keys), was used as a reference to design the tables.

In [134]:
# table for 1. select_songplay_by_session
query = """
CREATE TABLE IF NOT EXISTS songplayBySession
(artist text, song text, length float,
sessionId int, itemInSession int,
PRIMARY KEY ((sessionId, itemInSession)))
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

In [135]:
# table for 2. select_songplay_by_user
query = """
CREATE TABLE IF NOT EXISTS songplayByUser
(artist text, song text, firstName text, lastName text,
userId int, sessionId int, itemInSession int,
PRIMARY KEY ((userId, sessionId), itemInSession))
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

In [136]:
# table for 3. select_songplay_by_song
query = """
CREATE TABLE IF NOT EXISTS songplayBySong
(song text, sessionId int, itemInSession int,
firstName text, lastName text,
PRIMARY KEY (song, sessionId, itemInSession))
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

#### Create a table insert statement builder

In [137]:
class InsertBuilder:
    """
    This class defines an insert statement builder
    that maps a data input into a CQL statement.
    """

    def __init__(self, table, columns, mapper):
        """
        The constructor instantiates an insert builder with the following arguments:

        Args:

        `table`: The name of the table to generate the insert statement.
        `columns`: The list of the table columns.
        `mapper`: A function that maps the input values to the table columns' values.
        """
        self.table = table
        self.columns = columns
        self.mapper = mapper

    def to_sql(self, values):
        """
        Maps the data input to the table insert statement.

        Args:

        `values`: The data to be mapped into insert values.
        """
        query = f"""
        INSERT INTO {self.table} ({', '.join(self.columns)})
        VALUES ({', '.join(self.mapper(values, self.columns))});\n
        """
        return query

#### Create a batch insert processing mananger

In [138]:
class InsertBatch:
    """
    This class defines a batch processing object that can be
    used to insert data into the Apache Cassandra tables.

    See Also:
    `https://docs.datastax.com/en/cql-oss/3.3/cql/cql_using/useBatchGoodExample.html`: DataStax documentation on batch statement.
    """

    def __init__(self, session, inserts, max_size=250):
        """
        The constructor instantiates a batch object with the following arguments:

        Args:

        `session`: Cassandra session object.
        `inserts`: List of insert statement builders per table.
        `max_size`: The maximum number of rows to be inserted by each commit.
        """
        self.session = session

        self.buffers = [None] * len(inserts)
        self.inserts = inserts

        self.max_size = max_size
        self.size = 0

    def __begin_batch(self):
        """
        Initializes the StringIO buffer for each table batch object
        and writes the BEGIN BATCH statement.
        """
        for index, buffer in enumerate(self.buffers):
            self.buffers[index] = StringIO()
            self.buffers[index].write("BEGIN BATCH\n")

    def __commit_batch(self):
        """
        Commits each table batch following the steps below:

        - Writes the APPLY BATCH statement.
        - Sets the StringIO buffer cursor to the beginning of the buffer.
        - Executes the batch statements using Cassandra's session method.
        - Closes the StringIO buffer.
        - Clears the batches array.
        - Resets the batch size counter.
        """
        for buffer in self.buffers:
            try:
                buffer.write('APPLY BATCH;\n')
                buffer.seek(0)
                self.session.execute(buffer.read())
                buffer.close()
            except Exception as e:
                print(e)

        self.buffers = [None] * len(self.inserts)
        self.size = 0

    def __write(self, values):
        """
        Maps the data input for each table insert statement and
        writes to the corresponding StringIO buffer.

        Args:

        `values`: The data input to write as a SQL insert.
        """
        for index, buffer in enumerate(self.buffers):
            buffer.write(self.inserts[index].to_sql(values))

    def commit(self, close=False):
        """
        This method must be called at the end of the processing
        to commit the last batch of insert instructions.
        Once the batch has not reached the maximum size
        to be automatically committed by the insertion method.

        Args:

        `close`: If True, closes the session.
        """
        self.__commit_batch()
        if close:
            self.session.shutdown()
            self.session.close()
            self.session = None

    def write(self, values):
        """
        Writes the data input into the the batch buffer as an insert statement.
        If the batch size is equal to the maximum size, the batch is committed
        and the buffer is cleared.

        Args:

        `values`: The data input to write as a SQL insert.
        """
        if self.size == 0:
            self.__begin_batch()

        if self.size < self.max_size:
            self.__write(values)
            self.size += 1
        else:
            self.__write(values)
            self.__commit_batch()

#### Create the CSV to CQL values mapper function 

In [139]:
def csv_mapper(values, columns):
    """
    Maps the csv data line in event_data to the table columns' values.

    Args:

    `values`: The csv data line in the target file.
    """
    # replace the single quote char by the corresponding unicode char 
    # otherwise the insert statement will fail
    values[0] = values[0].replace("'", "\u2027")
    values[9] = values[9].replace("'", "\u2027")
    

    map = {
        'artist': f"'{values[0]}'",
        'firstName': f"'{values[1]}'",
        'gender': f"'{values[2]}'",
        'itemInSession': values[3],
        'lastName': f"'{values[4]}'",
        'length': values[5],
        'level': f"'{values[6]}'",
        'location': f"'{values[7]}'",
        'sessionId': values[8],
        'song': f"'{values[9]}'",
        'userId': values[10]
    }

    return [map[column] for column in columns]

#### Create the insert statement builder for each table

In [140]:
# create a list of insert statement builders with all the tables
inserts = [
    InsertBuilder(
        'songplayBySession',
        ['artist', 'song', 'length', 'sessionId', 'itemInSession'],
        csv_mapper  
    ),
    InsertBuilder(
        'songplayByUser',
        ['artist', 'song', 'firstName', 'lastName', 'userId', 'sessionId', 'itemInSession'],
        csv_mapper
    ),
    InsertBuilder(
        'songplayBySong',
        ['song', 'sessionId', 'itemInSession', 'firstName', 'lastName'],
        csv_mapper
    )
]

#### Execute the batch insert process for all tables

In [141]:
# create a batch processing for all table inserts
# set the maximum size of each batch
batch = InsertBatch(session, inserts, 250)

with open(targetfile, encoding='utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)  # skip header
    
    # write the csv data line into the batch buffer
    # automatically commit if the buffer size is equal to the maximum size
    for line in csvreader:
        batch.write(line)
    
    # commit the last batch
    batch.commit()

#### Do a SELECT to verify that the data have been inserted into each table

In [142]:
# select for query 1
try:
    response = session.execute(select_songplay_by_session)
except Exception as e:
    print(e)

pd.DataFrame(response.all())        

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


In [143]:
# select for query 2
try:
    response = session.execute(select_songplay_by_user)
except Exception as e:
    print(e)

pd.DataFrame(response.all())

Unnamed: 0,artist,song,firstname,lastname
0,Down To The Bone,Keep On Keepin‧ On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


In [144]:
# select for query 3
try:
    response = session.execute(select_songplay_by_song)
except Exception as e:
    print(e)

pd.DataFrame(response.all())                          

Unnamed: 0,firstname,lastname
0,Sara,Johnson
1,Jacqueline,Lynch
2,Tegan,Levine


### Drop the tables before closing out the sessions

In [145]:
query = "DROP TABLE IF EXISTS songplayBySession"
try:
    session.execute(query)
except Exception as e:
    print(e)

query = "DROP TABLE IF EXISTS songplayByUser"
try:
    session.execute(query)
except Exception as e:
    print(e)

query = "DROP TABLE IF EXISTS songplayBySong"
try:
    session.execute(query)
except Exception as e:
    print(e)

### Close the session and cluster connection

In [146]:
session.shutdown()
cluster.shutdown()