# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import necessary Python packages
import pandas as pd
import cassandra
import os
import glob
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# Creating list of filepaths to process original event csv data files
# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a list of files and collect each filepath
file_path_list = []
for root, dirs, files in os.walk(filepath):
    # Join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root, '*'))

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# Processing the files to create the data file csv that will be used for Apache Cassandra tables
full_data_rows_list = []
for f in file_path_list:
    with open(f, 'r', encoding='utf8', newline='') as csvfile:
        csvreader = csv.reader(csvfile)
        next(csvreader)
        for line in csvreader:
            full_data_rows_list.append(line)

# Create smaller event data csv file used to insert data into the Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)
output_file = 'event_datafile_new.csv'
with open(output_file, 'w', encoding='utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist', 'firstName', 'gender', 'itemInSession', 'lastName', 'length', 'level', 'location', 'sessionId', 'song', 'userId'])
    for row in full_data_rows_list:
        if row[0] == '':
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# Verify the number of rows in the generated csv file
with open(output_file, 'r', encoding='utf8') as f:
    print(f"Number of rows in the CSV file: {sum(1 for line in f)}")

12


# Part II: Complete the Apache Cassandra coding portion of your project.


#### Creating a Cluster

In [5]:
# Creating a Cluster
# Connect to a Cassandra instance on your local machine (127.0.0.1)
from cassandra.cluster import Cluster
cluster = Cluster()

# Establish connection and begin executing queries, initialize a session
session = cluster.connect()

#### Create Keyspace

In [6]:
# Create Keyspace
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
""")
session.set_keyspace('udacity')

<cassandra.cluster.ResultSet at 0x7f77fa57d048>

## Create queries to ask the following three questions of the data

### Question 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

## Answer 1:

## To obtain the artist's name, song title, and song length from the music app history, we need to focus on sessionId and itemInSession as filtering criteria. Given the nature of NoSQL databases, we should first design our query and then create the necessary table.

## The desired output is the "Name of the artist, title of the song, and length of the track" based on "sessionId and itemInSession." This indicates that our SELECT statement should look like this:

## SELECT artist_name, song_title, song_length
## FROM song_session
## WHERE sessionId = 338 AND itemInSession = 4

## Now, let's proceed with creating the table. We'll include a check to ensure the table only gets created if it doesn't already exist. We'll name the table 'song_session' as it aptly describes its purpose.

## Column Names:
## We require the artist's name, song title, song length, sessionId, and itemInSession for our query. Therefore, the column names will be artist, song, length, sessionId, and itemInSession.

## Primary Key:
## To uniquely identify each row, we'll use sessionId and itemInSession as the primary key since these two fields align with the query's filtering criteria.

## Please refer to the Query 1 code below. 
## The code will create the 'song_session' table, insert data from the CSV file, execute the query based on sessionId and itemInSession, and display the artist's name, song title, and song length for the specified session and item.


### Question 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
## Answer 2: 
## The Question 2 requires us to retrieve the name of the artist, song title (sorted by itemInSession), and the user's first and last names based on a specific userId and sessionId.

## The expected output is: "Name of the artist, song title sorted by itemInSession, and user's first and last name" Based on: "userId and sessionId"

## From the above two points, we know that the query to get the data will be a SELECT statement like:

## SELECT name_of_artist, song_title, first_name, last_name FROM user_session WHERE userId = 10 AND sessionId = 182 ORDER BY itemInSession ASC;

## To facilitate this query, we need to create a table with appropriate columns and primary keys. The primary key should include userId and sessionId to allow filtering on these columns, and include itemInSession as a clustering column to sort the data by this column.

## We can create the table with a statement like:


## CREATE TABLE IF NOT EXISTS user_session (
    userId INT, 
    sessionId INT, 
    itemInSession INT, 
    name_of_artist TEXT, 
    song_title TEXT, 
    first_name TEXT, 
    last_name TEXT, 
    PRIMARY KEY ((userId, sessionId), itemInSession)
);
## Column Names: The columns in the table will be userId, sessionId, itemInSession, name_of_artist, song_title, first_name, and last_name.

## Primary Key: The composite partition key will be userId and sessionId to ensure the data is distributed evenly and allow filtering on these columns, and itemInSession will be a clustering column to enable sorting by this column.



### Question 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
## Answer 3: 
## The Question 3 requires us to retrieve the first and last names of every user who listened to a specific song titled 'All Hands Against His Own'.

## The expected output is: "User's first and last name"
## Based on: "Song title"

## From the above two points, we know that the query to get the data will be a SELECT statement like:


## SELECT first_name, last_name FROM song_history WHERE song_title = 'All Hands Against His Own';
## To facilitate this query, we need to create a table with appropriate columns and primary keys. The primary key should include the song title to allow filtering on this column, and also include userId to uniquely identify each record.

## We can create the table with a statement like:

## CREATE TABLE IF NOT EXISTS song_history (
    song_title TEXT, 
    userId INT, 
    first_name TEXT, 
    last_name TEXT, 
    PRIMARY KEY (song_title, userId)
);
## Column Names: The columns in the table will be song_title, userId, first_name, and last_name.

## Primary Key: The primary key will be a composite of song_title and userId to enable filtering on the song title and ensure that each record is unique.


In [8]:
# Creating queries to analyze the data
# Create table (artist_song_for_session) for query 1
# The columns are ordered by the parition key, clustering column then all of the returned values
query = "CREATE TABLE IF NOT EXISTS artist_song_for_session "
query += "(sessionId int, itemInSession int, artist text, song text, length float, PRIMARY KEY (sessionId, itemInSession))"
session.execute(query)


<cassandra.cluster.ResultSet at 0x7f77fa578240>

In [9]:
# Insert data into table for query 1
file = 'event_datafile_new.csv'
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO artist_song_for_session (sessionId, itemInSession, artist, song, length)"
        query += "VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))


#### Do a SELECT to verify that the data have been inserted into each table

In [10]:
# Perform query
query = "SELECT artist, song, length from artist_song_for_session WHERE sessionId=338 AND itemInSession=4"
rows = session.execute(query)
for row in rows:
    print(row.artist, row.song, row.length)


In [11]:

# Create table (artist_song_user_for_user_session) for query 2
# The columns are ordered by the parition key, clustering columns then all of the returned values
query = "CREATE TABLE IF NOT EXISTS artist_song_user_for_user_session "
query += "(userId int, sessionId int, itemInSession int, artist text, song text, firstName text, lastName text, PRIMARY KEY ((userId, sessionId), itemInSession))"
session.execute(query)


# Insert data into table for query 2
file = 'event_datafile_new.csv'
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO artist_song_user_for_user_session (userId, sessionId, itemInSession, artist, song, firstName, lastName)"
        query += "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))


                    

In [12]:
# Perform query 2
query = "SELECT artist, song, firstName, lastName FROM artist_song_user_for_user_session WHERE userId=10 AND sessionId=182"
rows = session.execute(query)
for row in rows:
    print(row.artist, row.song, row.firstName, row.lastName)
               

In [13]:
# Create table (user_for_song) for query 3
# The columns are ordered by the parition key, clustering column then all of the returned values
# The main change in query 2 was ensuring the column order in the INSERT statement matches the order defined in the CREATE statement 

query = "CREATE TABLE IF NOT EXISTS user_for_song "
query += "(song text, userId int, firstName text, lastName text, PRIMARY KEY (song, userId))"
session.execute(query)


# Insert data into table for query 3
file = 'event_datafile_new.csv'
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO user_for_song (song, userId, firstName, lastName)"
        query += "VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))


In [14]:
# Perform query 3
query = "SELECT firstName, lastName FROM user_for_song WHERE song='All Hands Against His Own'"
rows = session.execute(query)
for row in rows:
    print(row.firstName, row.lastName)


### Drop the tables before closing out the sessions

In [15]:
# Cleaning up resources - dropping tables before closing the sessions
session.execute("DROP TABLE artist_song_for_session")
session.execute("DROP TABLE artist_song_user_for_user_session")
session.execute("DROP TABLE user_for_song")


<cassandra.cluster.ResultSet at 0x7f77fa572278>

### Close the session and cluster connection¶

In [16]:
# Confirming the closure of session and cluster connection
session.shutdown()
cluster.shutdown()
print("Session and cluster connections have been closed.")