## Project: Data Modeling with Apache Cassandra
A startup called Sparkify wants to analyze the data they've been collecting on songs and user activity on their new music streaming app. The analysis team is particularly interested in understanding what songs users are listening to. Currently, there is no easy way to query the data to generate the results, since the data reside in a directory of CSV files on user activity on the app.

They'd like a data engineer to create an Apache Cassandra database which can create queries on song play data to answer the questions, and wish to bring you on the project. Your role is to create a database for this analysis. You'll be able to test your database by running queries given to you by the analytics team from Sparkify to create the results.

## Part I. ETL Pipeline for pre-processing the CSV files

In [None]:
#pip install cassandra-driver

#### Import Python packages 

In [None]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

# set Jupyter setting to display all float values with 2 decimals
pd.set_option('display.float_format', lambda x: '%.2f' % x)

#### Creating list of filepaths to process csv data files

In [None]:
# checking your current working directory
print(os.getcwd())

# get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

#### Merging all of the csv files for the Apache Cassandra tables

In [None]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [None]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

#### Exploratory Data Analysis

In [None]:
# create dataframe from event_datafile_new.csv
df = pd.read_csv('event_datafile_new.csv')

In [None]:
# show sample of data
df.sample(n=3)

In [None]:
# show any null values and data types
print(df.info())

In [None]:
# show summary statistics for both numeric
print(df.describe(include='number'))

In [None]:
# show summary statistics for object columns
print(df.describe(include='object'))

In [None]:
# show unique values for each column
for col in df.columns:
    print(col, df[col].nunique())

## Part II. Create Apache Cassandra databases

The csv file titled <font color=red>event_datafile_new.csv</font> should be located within the directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is an example of how the denormalized data should appear in the <font color=red>**event_datafile_new.csv**</font> after the CSV files are merged and processed:<br>

<img src="image_event_datafile_new.jpg">

#### Creating a Cluster

In [None]:
# create a cluster
from cassandra.cluster import Cluster
# establish connection and create a session
try: 
    cluster = Cluster(['127.0.0.1']) #If you have a locally installed Apache Cassandra instance
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [None]:
# create keyspace
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS project 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [None]:
# connrect to keyspace
try:
    session.set_keyspace('project')
except Exception as e:
    print(e)

### Create queries to ask the following three questions of the data

In [None]:
# set variable name for csv file to be inserted into tables
file = 'event_datafile_new.csv'

##### Query 1:  Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4
`SELECT artist, song, length FROM history WHERE sessionid = 338 AND itemInSession = 4;`


In [None]:
# create the table for query 1
query = "CREATE TABLE IF NOT EXISTS history_1 "
query = query + "(sessionId int, itemInSession int, \
                artist text, song text, length float, \
                PRIMARY KEY (sessionId, itemInSession))";
try:
    session.execute(query)
except Exception as e:
    print(e)

In [None]:
with open(file, encoding='utf8') as f:
    # use csv.DictReader to read the CSV file as a dictionary
    csvreader = csv.DictReader(f)
    
    for row in csvreader:
        # assign the INSERT statements into the `query` variable
        query = """INSERT INTO history_1 (sessionId, itemInSession, artist, song, length)
        VALUES (%s, %s, %s, %s, %s)"""
        # assign which column element should be assigned for each column in the INSERT statement
        session.execute(query, (int(row['sessionId']), int(row['itemInSession']),
                                row['artist'], row['song'], float(row['length'])))

Rationale: 
- *sessionid* is used as the partition key. This will help evenely distribute data across nodes and the requested query filters data based on the sessionid.
- *itemInSession* is chosen as the clustering column because the requested query filters on this column. Additionally, using it in the primary key helps to avoid the ALLOW FILTERING clause.

In [None]:
## add in the SELECT statement to verify the data was entered into the table
query = "select artist, song, length from history_1 WHERE sessionID = 338 AND itemInSession = 4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    print(row.artist, row.song, row.length)

##### Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

`SELECT artist, song, firstName, lastName FROM history WHERE userId = 10 AND sessionId = 182 ORDER BY itemInSession;`

In [None]:
# create table for query 2
query = "CREATE TABLE IF NOT EXISTS history_2 "
query = query + "(userId int, sessionId int, itemInSession int, artist text, \
        song text, firstName text, lastName text, \
        PRIMARY KEY ((userId, sessionId), itemInSession))";
try:
    session.execute(query)
except Exception as e:
    print(e)

In [None]:
with open(file, encoding='utf8') as f:
    csvreader = csv.DictReader(f)

    for row in csvreader:
        # assign the INSERT statements into the `query` variable
        query = """INSERT INTO history_2 (userId, sessionId, itemInSession,
                                          artist, song, firstName, lastName)
                   VALUES (%s, %s, %s, %s, %s, %s, %s)"""
        # assign which column element should be assigned for each column in the INSERT statement
        session.execute(query, (int(row['userId']), int(row['sessionId']), \
                                int(row['itemInSession']), row['artist'], \
                                row['song'], row['firstName'], \
                                row['lastName']))

Rationale: 
- A compound partition key consisting of *userId* and *sessionId* is used to distribute the data across nodes. The query filters data based on both userId and sessionId, making them suitable candidates for the partition key.
- *itemInSession* is chosen as the clustering column because the requested query sorts on this column. Additionally, this allows Cassandra store data within each partition in the desired order, making for a more efficient query.

In [None]:
## add in the SELECT statement to verify the data was entered into the table
query = "select artist, song, firstName, lastName from history_2 WHERE userId = 10 AND sessionId = 182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    print(row.artist, row.song, row.firstname, row.lastname)

##### Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

`SELECT firstName, lastName FROM history WHERE song = 'All Hands Against His Own';`

In [None]:
# create table for query 3
query = "CREATE TABLE IF NOT EXISTS history_3 "
query = query + "(song text, userId int, firstName text, lastName text, \
    PRIMARY KEY (song, userId))";
try:
    session.execute(query)
except Exception as e:
    print(e)

In [None]:
with open(file, encoding='utf8') as f:
    csvreader = csv.DictReader(f)

    for row in csvreader:
        # assign the INSERT statements into the `query` variable
        query = """INSERT INTO history_3 (song, userId, firstName, lastName)
                   VALUES (%s, %s, %s, %s)"""
        # assign which column element should be assigned for each column in the INSERT statement
        session.execute(query, (row['song'], int(row['userId']), \
                                row['firstName'], row['lastName']))


Rationale: 
- *song* is used as the partition key, as the query filters data based on the song's title. Using song as the partition key allows Cassandra to efficiently locate the partition containing the data for a specific song.
- *userId* is chosen as the clustering column to ensure that each row within a partition is unique. Since Sparkify is interested in retrieving the firstName and lastName of users who listened to a specific song, we need to consider that the same user might listen to the same song multiple times. Using *userId* as the clustering column helps avoid overwriting data.

In [None]:
# add the SELECT statement to verify the data was entered into the table
query = "SELECT firstName, lastName FROM history_3 WHERE song = 'All Hands Against His Own';"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    print(row.firstname, row.lastname)

### Drop the tables before closing out the sessions

In [None]:
## drop the table before closing out the sessions
table_names = ['history_1', 'history_2', 'history_3']

for table_name in table_names:
    query = f"DROP TABLE {table_name}"
    try:
        session.execute(query)
        print(f"Table {table_name} dropped.")
    except Exception as e:
        print(f"Error dropping table {table_name}: {e}")


### Close the session and cluster connection¶

In [None]:
session.shutdown()
cluster.shutdown()