# Part I. ETL Pipeline for Pre-Processing the Files

## RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [96]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [97]:
# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [98]:
# create a single csv file from all data
full_data_rows_list = [] 
    
for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        for line in csvreader:
            full_data_rows_list.append(line) 

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [99]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Data Modeling with Apache Cassandra

## The CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory, contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Set Apache Cassandra configurations

#### Creating a Cluster

In [100]:
# Make connection to a cassandra cluster
from cassandra.cluster import Cluster

cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

#### Create Keyspace

In [101]:
# Create a Keyspace
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
    """)
except Expection as e:
    print(e)

#### Set Keyspace

In [102]:
# Setting a Keyspace
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




### Query 1
#### Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

The table we are going to make for this query is `music_library_by_sessions`. We will cluster our column by session_id and item_in_session, and add other fields i.e artist, song, and length for querying purpose.<br>
For above query we should get all records whose <b>sessionId = 338</b> and <b>itemInSession = 4</b>.

#### create music_library_by_sessions table

In [103]:
# Create query for a session_history
try:
    query = """
    CREATE TABLE IF NOT EXISTS music_library_by_sessions (
            session_id int,
            item_in_session int,
            artist text,
            song text,
            length float,
            PRIMARY KEY (session_id, item_in_session)
    );
    """
    session.execute(query)
except Exception as e:
    print(e)


#### insert data into music_library_by_sessions table

In [104]:
# Extract the data from event_datafile_new.csv and insert that into a session_history table
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = """
        INSERT INTO music_library_by_sessions (
            session_id,
            item_in_session,
            artist,
            song,
            length
        )
        """
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))
        

#### Do a SELECT to verify that the data have been inserted into table

In [105]:
# Verify the data was entered into the table
try:
    query = """
        SELECT artist, song, length
        FROM music_library_by_sessions
        WHERE session_id=338 AND item_in_session=4
    """
    rows = session.execute(query)
except Exception as e:
    print(e)

results = []
for row in rows:
    results.append([row.artist, row.song, row.length])

# change data to pandas dataframe
df = pd.DataFrame.from_records(results)
df.columns = ["artist", "song", "lenght"]
df


Unnamed: 0,artist,song,lenght
0,Faithless,Music Matters (Mark Knight Dub),495.307312


### Query 2
#### Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

The table we are going to make for this query is `music_library_by_users`. We will cluster our column by (user_id, session_id) and item_in_session, and add other fields i.e song, artist, first_name, and last_name for querying purpose. We choose our Partition Key (user_id, session_id) to optimize read performance.<br>
For above query we should get all records whose <b>user_id = 10</b> and <b>session_id = 182</b>.

#### create music_library_by_users table

In [106]:
# Create query for a music_library_by_users table
try:
    query = """
    CREATE TABLE IF NOT EXISTS music_library_by_users ( 
            user_id int,
            session_id int,
            item_in_session int,
            song text,
            artist text,
            first_name text,
            last_name text,
            PRIMARY KEY ((user_id, session_id), item_in_session)
    );
    """
    session.execute(query)
except Exception as e:
    print(e)


#### insert data into music_library_by_users table

In [107]:
# Extract the data from event_datafile_new.csv and insert that into a music_library_by_users table
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = """
        INSERT INTO music_library_by_users (
            user_id,
            session_id,
            item_in_session,
            song,
            artist,
            first_name,
            last_name
        )
        """
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[9], line[0], line[1], line[4]))
        

#### Do a SELECT to verify that the data have been inserted into table

In [108]:
## Verify the data was entered into the table
try:
    query = """
        SELECT artist, song, first_name, last_name
        FROM music_library_by_users
        WHERE user_id=10 AND session_id=182
    """
    rows = session.execute(query)
except Exception as e:
    print(e)

results = []
for row in rows:
    results.append([row.artist, row.song, row.first_name, row.last_name])

df = pd.DataFrame.from_records(results)
df.columns = ["artist", "song", "firstName", "lastName"]
df


Unnamed: 0,artist,song,firstName,lastName
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


### Query 3
#### Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

The table we are going to make for this query is `music_library_by_songs`. We will cluster our column by song and user_id, and add other fields i.e first_name, and last_name for querying purpose.<br>
For above query we should get all users who listen <b>All Hands Against His Own</b> song

#### create music_library_by_songs table

In [109]:
# create query for a music_library_by_songs table
try:
    query = """
    CREATE TABLE IF NOT EXISTS music_library_by_songs ( 
            song text,
            user_id int,
            first_name text,
            last_name text,
            PRIMARY KEY (song, user_id)
    );
    """
    session.execute(query)
except Exception as e:
    print(e)


#### insert data into song_user_history table

In [110]:
# Extract the data from an event_datafile_new.csv and insert that into a music_library_by_songs table

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = """
        INSERT INTO music_library_by_songs (
            song,
            user_id,
            first_name,
            last_name
        )
        """
        query = query + "VALUES (%s, %s, %s, %s);"
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))


#### Do a SELECT to verify that the data have been inserted into table

In [111]:
# Verify the data was entered into the table
try:
    query = """
        SELECT first_name, last_name
        FROM music_library_by_songs
        WHERE song='All Hands Against His Own'
    """
    rows = session.execute(query)
except Exception as e:
    print(e)

results = []
for row in rows:
    results.append([row.first_name, row.last_name])

# Change results to pandas dataframe
df = pd.DataFrame.from_records(results)
df.columns = ["firstName", "lastName"]
df


Unnamed: 0,firstName,lastName
0,Jacqueline,Lynch
1,Tegan,Levine
2,Sara,Johnson


### Drop the tables before closing out the sessions

In [112]:
# Drop the table before closing out the sessions
try:
    session.execute('DROP TABLE music_library_by_sessions')
except:
    print('Unable to drop music_library_by_sessions')

try:
    session.execute('DROP TABLE music_library_by_users')
except:
    print('Unable to drop music_library_by_users')

try:
    session.execute('DROP TABLE music_library_by_songs')
except:
    print('Unable to drop music_library_by_songs')


### Close the session and cluster connection¶

In [113]:
session.shutdown()
cluster.shutdown()