# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd

# Import Python driver of Cassandra 
import cassandra

# A regular expression (or RE) specifies a set of strings that matches it; 
# the functions in this module let you check if a particular string matches a 
# given regular expression (or if a given regular expression matches a particular 
# string, which comes down to the same thing).
import re

# The OS module in Python provides functions for interacting with the operating system. 
# OS comes under Python's standard utility modules. This module provides a portable way of 
# using operating system-dependent functionality
import os

# The glob module finds all the pathnames matching a specified pattern according to 
# the rules used by the Unix shell, although results are returned in arbitrary order. 
# No tilde expansion is done, but *, ?, and character ranges expressed with [] will be correctly matched.
import glob

# NumPy offers comprehensive mathematical functions, random number generators, 
# linear algebra routines, Fourier transforms, and more
import numpy as np

# While the JSON module will convert strings to Python datatypes, 
# normally the JSON functions are used to read and write directly from JSON files.
import json

# The so-called CSV (Comma Separated Values) format is the most common import 
# and export format for spreadsheets and databases.
# The csv module implements classes to read and write tabular data in CSV format.
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

E:\Repositories\public\ETL-pipeline-and NoSQL-data-modeling-with-Apache-Cassandra


In [3]:
# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'
print(filepath)

E:\Repositories\public\ETL-pipeline-and NoSQL-data-modeling-with-Apache-Cassandra/event_data


In [4]:
# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))

#print(file_path_list)

In [5]:
# Files count from the previous list
print("Count of CSV files = ", len(file_path_list))

Count of CSV files =  30


**Correct! We have 30 CSV files in the folder '/event_data'**

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [6]:
# initiating an empty list of rows that will be generated from each file
# We will collect all the rows in all data files inside this list
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for file_path in file_path_list:

    # reading csv file 
    with open(file_path, 'r', encoding = 'utf8', newline='') as csvfile:
        
        # creating a csv reader object 
        csvreader = csv.reader(csvfile)
        
        # next() method returns the current row and advances the iterator 
        # to the next row. the first row of our csv file contains the 
        # headers (or field names).
        # Here, we don't return the first row/header, as we want
        # to load the data rows only.
        # And we will enter the columns header later on in the pipeline
        next(csvreader)
        
        # extracting each data row one by one and append it 
        # Note that each row is inserted as a list
        # That mean, at the end, we will have a BIG LIST OF LISTS == full_data_rows_list
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line)
        # By now we have inserted each row in the current file into the full_data_rows_list
    # Current file is closed now, and we are ready to loop to the next file/file path

# By now, all rows in all CSV files have been insterted into full_data_rows_list

In [7]:
# get total number of rows that are in the full_data_rows_list
print(len(full_data_rows_list))

8056


In [8]:
# see what the list of event data rows looks like
#print(full_data_rows_list)

**A dialect object or (simply dialect) is a way to group various formatting parameters. 
Once you have created the dialect object, simply pass it to the reader or writer, 
rather than passing each formatting argument separately.**

**To create a new dialect, we use register_dialect() function. It accepts dialect name as a string and one or more formatting parameters as keyword arguments.**
* `quoting`: controls when quotes should be generated by the writer or recognized by the reader (see above for other options). If you want double quotes around all fields regardless of whether quotechar or delimiter appears in the data or not, set quoting to csv.QUOTE_ALL.
* `skipinitialspace`: It controls how the space following the delimiter will be interpreted. If True, the initial whitespaces will be removed.



In [9]:
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

In [10]:
# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as event_datafile_new:
    
    # Create a new CSV file with the configurations we've specified in the dialect
    writer = csv.writer(event_datafile_new, dialect='myDialect')
    
    # Now we've created the file, we want to insert the columns names/header row.
    # and we want to do that before inserting the rows of data. Why?
    # Because we have inserted the data rows only in the full_data_rows_list, and in each file
    # we skipped the first row/header row.
    # So, we enter the columns names manually here before loading our data
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    
    # Now, we want to load our data. We loop over the rows, row by row,
    for row in full_data_rows_list:
        
        # Check if the first element in the row is an empty string;
        # that means the first field in that row is missing,
        # and for the sake of this project, we will skip such entries.
        if (row[0] == ''):
            continue
        
        # Since the program reached here, it means that first field exists and we will write the entry
        # in the 'event_datafile_new.csv' file.
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

In [11]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. The Apache Cassandra coding portion of the project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>.  The event_datafile_new.csv contains the following columns: 
- 1. artist 
- 2. firstName of user
- 3. gender of user
- 4. item number in session
- 5. last name of user
- 6. length of the song
- 7. level (paid or free song)
- 8. location of the user
- 9. sessionId
- 10. song title
- 11. userId

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster

In [12]:
from cassandra.cluster import Cluster

# Create a connection the database
# We will use local IP address; since we have a locally installed Apache cassandra instance
cluster = Cluster(['127.0.0.1'])

In [13]:
# Create a session to execute inside it our queries
session = cluster.connect()

#### Create Keyspace

In [14]:
# A keyspace is the top-level database object 
# that controls the replication for the object 
# it contains at each datacenter in the cluster.

# Keyspaces contain tables, materialized views and user-defined types, 
# functions and aggregates. 
# Typically, a cluster has one keyspace per application.

session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity
    WITH REPLICATION = 
        {'class' : 'SimpleStrategy', 'replication_factor' : 1}"""
)

<cassandra.cluster.ResultSet at 0x22268393bb0>

#### Set Keyspace

In [15]:
# Set KEYSPACE to the keyspace specified above
session.set_keyspace('udacity')

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4
`SELECT artist_name, song_title, song_length FROM session_songs WHERE session_id=338 AND item_in_session=4`

### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
`SELECT artist_name, song_title, user_first_name, user_last_name FROM user_sessions WHERE user_id=10 AND session_id=182 ORDER BY item_in_session`    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
`SELECT user_first_name, user_last_name WHERE song_title='All Hands Against His Own'`

<img src="images/keyspace-diagram.png">

## Important notes regarding the Keyspace design:
* We know that cassandra doesn't allow for duplicated rows
* That's why I added the `user_id` column in the song_fans table; because user's first and last name, and the song title, can easily get repeated when two customers have the same name and listen to the same song.
* In that case, Cassandra will overwrite the old row with the new row.
* By adding the `user_id` field, we ensure that all users who listen to any song, will be in that table.

### Table `session_songs`

`SELECT artist_name, song_title, song_length FROM session_songs WHERE session_id=338 AND item_in_session=4`

> for the `PRIMARY KEY`, since we need to filter the results by `session_id`, I will choose that column to be the `Partition Key`. Also, we want to filter the results by `item_in_session` as well, so, I will choose this to be a `Cluster Column`

In [16]:
# Set the query to create the table
query = "CREATE TABLE IF NOT EXISTS session_songs "
query += "(session_id INT, item_in_session INT, artist_name VARCHAR, "
query += "song_title VARCHAR, song_length DECIMAL, "
query += "PRIMARY KEY (session_id, item_in_session))"

# Execute the query and create the table
session.execute(query)

<cassandra.cluster.ResultSet at 0x222683ad940>

In [17]:
# Now, we want to load the data from `event_datafile_new` into our table

# Set the INSERT query 
query = "INSERT INTO session_songs "
query += "(session_id, item_in_session, artist_name, song_title, song_length) "
query = query + "VALUES (%s, %s, %s, %s, %s)"

# Set file name
file = 'event_datafile_new.csv'

# open the CSV file
with open(file, encoding = 'utf8') as f:
    
    # Read the CSV file
    csvreader = csv.reader(f)
    
    # Skip first row/columns names/header row
    next(csvreader) # skip header
    
    # Loop over the rows, row by row
    for row in csvreader:

        # We have 11 columns in each row in the 'event_datafile_new.csv' file
        # Each row is basically a list
        # To load the needed columns into session_songs table, we have to pick these columns
        # from this list as follows:
        # session_id: row[8]
        # item_in_session: row[3]
        # artist_name: row[0]
        # song_title: row[9]
        # song_length: row[5]
        # CAREFULL: each element in each row is STRING. So, we need to convert each element
        # to the appropriate data type
        session.execute(query, (int(row[8]), int(row[3]), row[0], row[9], float(row[5])))

#### Do a SELECT to verify that the data have been inserted into each table

In [18]:
# Give me the artist, song title and song's length in the music app history that was heard during 
# sessionId = 338, and itemInSession = 4

# Set SELECT statement
query = "SELECT artist_name, song_title, song_length FROM session_songs "
query += "WHERE session_id=338 AND item_in_session=4"

# Execute the query
rows =session.execute(query)

# Print the results
for row in rows:
    print(row)

Row(artist_name='Faithless', song_title='Music Matters (Mark Knight Dub)', song_length=Decimal('495.3073'))


### Table `user_sessions`

`SELECT artist_name, song_title, user_first_name, user_last_name FROM user_sessions WHERE user_id=10 AND session_id=182 ORDER BY item_in_session` 

> for the `PRIMARY KEY`, since we need to filter the results by `user_id`, I will choose that column to be the `Partition Key`. Also, we want to filter the results by `session_id` as well, so, I will choose this to be a `Cluster Column`. Finally, we want to sort the results by `item_in_session`, so we set this as `Cluster Column` as well.

In [19]:
# Set the query to create the table
query = "CREATE TABLE IF NOT EXISTS user_sessions "
query += "(user_id INT, session_id INT, item_in_session INT, "
query += "user_first_name VARCHAR, user_last_name VARCHAR, "
query += "artist_name VARCHAR, song_title VARCHAR, "
query += "PRIMARY KEY (user_id, session_id, item_in_session))"

# Execute the query and create the table
session.execute(query)

<cassandra.cluster.ResultSet at 0x222684056d0>

In [20]:
# Now, we want to load the data from `event_datafile_new` into our table

# Set the INSERT query 
query = "INSERT INTO user_sessions "
query += "(user_id, session_id, item_in_session, user_first_name, "
query += "user_last_name, artist_name, song_title)"
query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"

# Set file name
file = 'event_datafile_new.csv'

# open the CSV file
with open(file, encoding = 'utf8') as f:
    
    # Read the CSV file
    csvreader = csv.reader(f)
    
    # Skip first row/columns names/header row
    next(csvreader) # skip header
    
    # Loop over the rows, row by row
    for row in csvreader:

        # We have 11 columns in each row in the 'event_datafile_new.csv' file
        # Each row is basically a list
        # To load the needed columns into user_sessions table, we have to pick these columns
        # from this list as follows:
        # user_id: row[10]
        # session_id: row[8]
        # item_in_session: row[3]
        # user_first_name: row[1]
        # user_last_name: row[4]
        # artist_name: row[0]
        # song_title: row[9]
        # CAREFULL: each element in each row is STRING. So, we need to convert each element
        # to the appropriate data type
        session.execute(query, (int(row[10]), int(row[8]), int(row[3]), row[1], row[4], row[0], row[9]))

#### Do a SELECT to verify that the data have been inserted into each table

In [21]:
# Set SELECT statement
query = "SELECT item_in_session, artist_name, song_title, user_first_name, user_last_name "
query += "FROM user_sessions "
query += "WHERE user_id=10 AND session_id=182 "
query += "ORDER BY session_id, item_in_session"

# Execute the query
rows =session.execute(query)

# Print the results
for row in rows:
    print(row, end='\n\n')

Row(item_in_session=0, artist_name='Down To The Bone', song_title="Keep On Keepin' On", user_first_name='Sylvie', user_last_name='Cruz')

Row(item_in_session=1, artist_name='Three Drives', song_title='Greece 2000', user_first_name='Sylvie', user_last_name='Cruz')

Row(item_in_session=2, artist_name='Sebastien Tellier', song_title='Kilometer', user_first_name='Sylvie', user_last_name='Cruz')

Row(item_in_session=3, artist_name='Lonnie Gordon', song_title='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', user_first_name='Sylvie', user_last_name='Cruz')



### Table `song_fans`

`SELECT user_first_name, user_last_name WHERE song_title='All Hands Against His Own'`

> for the `PRIMARY KEY`, since we need to filter the results by `song_title`, I will choose that column to be the `Partition Key`. But, the song_title is not unique to that table! Multiple users can listen to the same song, and multiple users can have the same first and last name, so, I choose `user_id` to be a `Clustering Column`.

In [22]:
# Set the query to create the table
query = "CREATE TABLE IF NOT EXISTS song_fans "
query += "(user_id INT, song_title VARCHAR, user_first_name VARCHAR, user_last_name VARCHAR, "
query += "PRIMARY KEY (song_title, user_id))"

# Execute the query and create the table
session.execute(query)

<cassandra.cluster.ResultSet at 0x2226838d6a0>

In [23]:
# Now, we want to load the data from `event_datafile_new` into our table

# Set the INSERT query 
query = "INSERT INTO song_fans "
query += "(user_id, song_title, user_first_name, user_last_name)"
query = query + "VALUES (%s, %s, %s, %s)"

# Set file name
file = 'event_datafile_new.csv'

# open the CSV file
with open(file, encoding = 'utf8') as f:
    
    # Read the CSV file
    csvreader = csv.reader(f)
    
    # Skip first row/columns names/header row
    next(csvreader) # skip header
    
    # Loop over the rows, row by row
    for row in csvreader:

        # We have 11 columns in each row in the 'event_datafile_new.csv' file
        # Each row is basically a list
        # To load the needed columns into song_fans table, we have to pick these columns
        # from this list as follows:
        # user_id: row[10]
        # song_title: row[9]
        # user_first_name: row[1]
        # user_last_name: row[4]
        # CAREFULL: each element in each row is STRING. So, we need to convert each element
        # to the appropriate data type
        session.execute(query, (int(row[10]), row[9], row[1], row[4]))

#### Do a SELECT to verify that the data have been inserted into each table

In [24]:
# Give me the artist, song title and song's length in the music app history that was heard during 
# sessionId = 338, and itemInSession = 4

# Set SELECT statement
query = "SELECT user_first_name, user_last_name "
query += "FROM song_fans WHERE song_title='All Hands Against His Own'"

# Execute the query
rows =session.execute(query)

# Print the results
for row in rows:
    print(row, end='\n\n')

Row(user_first_name='Jacqueline', user_last_name='Lynch')

Row(user_first_name='Tegan', user_last_name='Levine')

Row(user_first_name='Sara', user_last_name='Johnson')



### Drop the tables

In [25]:
query = "DROP TABLE IF EXISTS session_songs"
session.execute(query)

query = "DROP TABLE IF EXISTS user_sessions"
session.execute(query)

query = "DROP TABLE IF EXISTS song_fans"
session.execute(query)

<cassandra.cluster.ResultSet at 0x222683adb50>

### Drop the Keyspace

In [26]:
query = "DROP KEYSPACE IF EXISTS udacity"
session.execute(query)

<cassandra.cluster.ResultSet at 0x222684015e0>

### Close the session and cluster connection¶

In [27]:
session.shutdown()
cluster.shutdown()