# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages
import cassandra
import os
import pandas as pd
import glob
import csv
import time
from IPython.display import display

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
current_dir = os.getcwd()
print(f"Current directory: {current_dir}")

# Get your current folder and sub-folder event data
filepath = current_dir + '/event_data'
print(f"Filepath directory: {filepath}")

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
    # join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

print(f"Found {len(file_path_list)} files in 'Filepath' directory")

Current directory: /Users/jp/projects/udacity-data-engineering-submissions/project_1b_data_modeling_with_cassandra
Filepath directory: /Users/jp/projects/udacity-data-engineering-submissions/project_1b_data_modeling_with_cassandra/event_data
Found 30 files in 'Filepath' directory


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        # extracting each data row one by one and append it        
        for line in csvreader:
            full_data_rows_list.append(line) 

print(f"Total number of rows collected across all files: {len(full_data_rows_list)}")

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

staging_csv = current_dir + '/' + 'event_datafile_new.csv'
with open(staging_csv, 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',
                     'level','location','sessionId','song','userId'])

    for row in full_data_rows_list:
        if row[0] == '':
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))
        
print(f"Staging CSV file created: {staging_csv}")

Total number of rows collected across all files: 8056
Staging CSV file created: /Users/jp/projects/udacity-data-engineering-submissions/project_1b_data_modeling_with_cassandra/event_datafile_new.csv


In [4]:
# check the number of rows in your csv file
with open(staging_csv, 'r', encoding = 'utf8') as f:
    print(f"Number of rows in staging CSV file: {sum(1 for line in f)}")

Number of rows in staging CSV file: 6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster

In [24]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

cassandra_hosts = ['127.0.0.1']
cassandra_ap = PlainTextAuthProvider(username='cassandra', password='cassandra')

try:
    cluster = Cluster(cassandra_hosts, auth_provider=cassandra_ap)
    session = cluster.connect()
    print(f"Connected to cassandra cluster {cassandra_hosts}")
except Exception as e:
    print(f"Failed to connect to cassandra cluster {cassandra_hosts}, error: {e}")

Connected to cassandra cluster ['127.0.0.1']


#### Create Keyspace

In [6]:
# Create keyspace, if it does not exist
cassandra_keyspace = 'sparkify'
try:
    session.execute("CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }" % (cassandra_keyspace))
except Exception as e:
    print(f"Failed to create cassandra keyspace '{cassandra_keyspace}', error: {e}")

#### Set Keyspace

In [25]:
try:
    session.set_keyspace(cassandra_keyspace)
except Exception as e:
    print(e)

## Cassandra database ETL

### Create queries to ask the following three questions of the data

### Query 1: Get artist's name, song title and song's length that was heard during sessionId = 338, and itemInSession = 4

### Query 2: Get artist's name, song title, user's first and last name for userid = 10, sessionid = 182, sorted by "item in session"

### Query 3: Get every user's first and last name, who listened to the song 'All Hands Against His Own'

### Query 1

In [8]:
# Create table for query 1
query = """
CREATE TABLE IF NOT EXISTS artist_song_lookup (
    artist_name text,
    song_title text,
    song_length double,
    session_id int,
    item_in_session int,
    PRIMARY KEY (session_id, item_in_session)
)
"""
try:
    session.execute(query)
    print("Table 'artist_song_lookup' has been created or it already existed")
except Exception as e:
    print(f"Failed in creating table 'artist_song_lookup'. Error: {e}")

Table 'artist_song_lookup' has been created or it already existed


In [9]:
# Load data from staging CSV file into table 'artist_song_lookup' for query 1
file = staging_csv

print("Loading data into table 'artist_song_lookup'")
query = """INSERT INTO artist_song_lookup (artist_name, song_title, song_length, session_id, item_in_session)
VALUES (%s, %s, %s, %s, %s)"""

start_time = time.time()
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        data = (line[0], line[9], float(line[5]), int(line[8]), int(line[3]))
        session.execute(query, data)
print(f"Data has been loaded in table 'artist_song_lookup'... time taken {round(time.time() - start_time, 2)} seconds")

Loading data into table 'artist_song_lookup'
Data has been loaded in table 'artist_song_lookup'... time taken 11.45 seconds


In [10]:
# Check the data, by running the desired query
query = """
SELECT
    artist_name,
    song_title,
    song_length
FROM
    artist_song_lookup
WHERE
    session_id = 338
AND item_in_session = 4
"""

rows = []
try:
    rows = session.execute(query)
    df_result = pd.DataFrame(list(rows))
    display(df_result)
except Exception as e:
    print(f"Select query has failed. Error: {e}")

Unnamed: 0,artist_name,song_title,song_length
0,Faithless,Music Matters (Mark Knight Dub),495.3073


### Query 2

In [13]:
# Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name)
# for userid = 10, sessionid = 182
query = """
CREATE TABLE IF NOT EXISTS user_activity (
    artist_name text,
    song_title text,
    user_firstn text,
    user_lastn text,
    item_in_session int,
    user_id int,
    session_id int,
    PRIMARY KEY ((session_id, user_id), item_in_session)
) WITH CLUSTERING ORDER BY (item_in_session ASC)
"""

try:
    session.execute(query)
    print("Table 'user_activity' has been created or it already existed")
except Exception as e:
    print(f"Failed in creating table 'user_activity'. Error: {e}")

Table 'user_activity' has been created or it already existed


In [14]:
# Load data from staging CSV file into table 'user_activity' for query 2
file = staging_csv

print("Loading data into table 'user_activity'")
query = """INSERT INTO user_activity (artist_name, song_title, user_firstn, user_lastn, item_in_session, user_id, session_id)
VALUES (%s, %s, %s, %s, %s, %s, %s)"""
    
start_time = time.time()
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        data = (line[0], line[9], line[1], line[4], int(line[3]), int(line[10]), int(line[8]))
        session.execute(query, data)
print(f"Data has been loaded in table 'user_activity'... time taken {round(time.time() - start_time, 2)} seconds")

Loading data into table 'user_activity'
Data has been loaded in table 'user_activity'... time taken 11.05 seconds


In [17]:
# Check the data, by running the desired query
query = """
SELECT artist_name, song_title, user_firstn, user_lastn
FROM user_activity
WHERE user_id = 10
  AND session_id = 182;
"""

rows = []
try:
    rows = session.execute(query)
    df_result = pd.DataFrame(list(rows))
    display(df_result)
except Exception as e:
    print(f"Select query has failed. Error: {e}")

Unnamed: 0,artist_name,song_title,user_firstn,user_lastn
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


### Query 3

In [18]:
# Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
query = """
CREATE TABLE IF NOT EXISTS user_songs (
    song_title text,
    user_firstn text,
    user_lastn text,
    user_id int,
    PRIMARY KEY (song_title, user_id)
)
"""

try:
    session.execute(query)
    print("Table 'user_songs' has been created or it already existed")
except Exception as e:
    print(f"Failed in creating table 'user_songs'. Error: {e}")                    

Table 'user_songs' has been created or it already existed


In [19]:
# Load data from staging CSV file into table 'user_songs' for query 3
file = staging_csv

print("Loading data into table 'user_songs'")
query = """INSERT INTO user_songs (song_title, user_firstn, user_lastn, user_id)
VALUES (%s, %s, %s, %s)"""
    
start_time = time.time()
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        data = (line[9], line[1], line[3], int(line[10]))
        session.execute(query, data)
print(f"Data has been loaded in table 'user_songs'... time taken {round(time.time() - start_time, 2)} seconds")

Loading data into table 'user_songs'
Data has been loaded in table 'user_songs'... time taken 12.06 seconds


In [20]:
# Check the data, by running the desired query
query = """
SELECT user_firstn, user_lastn
FROM user_songs
WHERE song_title = 'All Hands Against His Own'
"""

rows = []
try:
    rows = session.execute(query)
    df_result = pd.DataFrame(list(rows))
    display(df_result)
except Exception as e:
    print(f"Select query has failed. Error: {e}")

Unnamed: 0,user_firstn,user_lastn
0,Jacqueline,50
1,Tegan,25
2,Sara,31


### Drop the tables before closing out the sessions

In [26]:
drop_queries = [
    "DROP TABLE IF EXISTS artist_song_lookup;",
    "DROP TABLE IF EXISTS user_activity;",
    "DROP TABLE IF EXISTS user_songs;",
]
try:
    for query in drop_queries:
        session.execute(query)
except Exception as e:
    print(e)

### Close the session and cluster connection¶

In [27]:
session.shutdown()
cluster.shutdown()