# Project Overview
#### This project is composed of two sections. In part I, we process 30 log files and extract their data into a single data file. This file is then used in part II to feed the log data into different database tables. Those tables are created based on the expected queries.
# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking current working directory
print(os.getcwd())

# Get current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*.csv'))
    print(file_path_list)

print("We collected {0} files".format(len(file_path_list)))


/home/workspace
['/home/workspace/event_data/2018-11-27-events.csv', '/home/workspace/event_data/2018-11-04-events.csv', '/home/workspace/event_data/2018-11-07-events.csv', '/home/workspace/event_data/2018-11-09-events.csv', '/home/workspace/event_data/2018-11-19-events.csv', '/home/workspace/event_data/2018-11-05-events.csv', '/home/workspace/event_data/2018-11-22-events.csv', '/home/workspace/event_data/2018-11-16-events.csv', '/home/workspace/event_data/2018-11-26-events.csv', '/home/workspace/event_data/2018-11-24-events.csv', '/home/workspace/event_data/2018-11-29-events.csv', '/home/workspace/event_data/2018-11-15-events.csv', '/home/workspace/event_data/2018-11-20-events.csv', '/home/workspace/event_data/2018-11-06-events.csv', '/home/workspace/event_data/2018-11-18-events.csv', '/home/workspace/event_data/2018-11-21-events.csv', '/home/workspace/event_data/2018-11-10-events.csv', '/home/workspace/event_data/2018-11-23-events.csv', '/home/workspace/event_data/2018-11-02-events.c

#### Processing the files listed above to create the data csv file that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list
print("Processing {0} files".format(len(file_path_list)))
for f in file_path_list:

# reading csv file
    print("Processing:", f)
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
print("Unfiltered Number of Rows:",len(full_data_rows_list))
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_new csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):  #skip rows that don't include info on artist
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


Processing 30 files
Processing: /home/workspace/event_data/2018-11-27-events.csv
Processing: /home/workspace/event_data/2018-11-04-events.csv
Processing: /home/workspace/event_data/2018-11-07-events.csv
Processing: /home/workspace/event_data/2018-11-09-events.csv
Processing: /home/workspace/event_data/2018-11-19-events.csv
Processing: /home/workspace/event_data/2018-11-05-events.csv
Processing: /home/workspace/event_data/2018-11-22-events.csv
Processing: /home/workspace/event_data/2018-11-16-events.csv
Processing: /home/workspace/event_data/2018-11-26-events.csv
Processing: /home/workspace/event_data/2018-11-24-events.csv
Processing: /home/workspace/event_data/2018-11-29-events.csv
Processing: /home/workspace/event_data/2018-11-15-events.csv
Processing: /home/workspace/event_data/2018-11-20-events.csv
Processing: /home/workspace/event_data/2018-11-06-events.csv
Processing: /home/workspace/event_data/2018-11-18-events.csv
Processing: /home/workspace/event_data/2018-11-21-events.csv
Proc

In [4]:
# Checking the number of rows after filtering out the rows that didn't refer to artists
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print("Final number of rows: ", sum(1 for line in f))

Final number of rows:  6821


# Part II. Apache Cassandra coding portion of the project. 

#### The output of Part I was to create a CSV file titled <font color=red>event_datafile_new.csv</font> and located within the Worksapce directory. It includes the data that will feed into our database tables and includes the following columns: 

- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data looks like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

Now that we generated the event_datafile_new.csv file, we can start running our Cassandra commands to feed data into the database.

#### Creating a Cluster to our local Cassandra instance

In [5]:
from cassandra.cluster import Cluster
try: 
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)


#### Creating a simple Cassandra keyspace named jfvanreu. No need to replicate the data for this small application.

In [6]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS jfvanreu 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Creating a session to the keyspace defined above.

In [7]:
try:
    session.set_keyspace('jfvanreu')
except Exception as e:
    print(e)

#### Now that we have a session to our new keyspace (database), we can create tables. In Cassandra, tables always get structured based on expected queries. So, here are the three queries that we'll use to create our tables:

##### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


##### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

##### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




### Create table musicapp_history based on Query 1 requirements.
#### *Query 1:*  Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4
#### We use *sessionId* and *itemInSession* as the primary key, so we can query the database based on those attributes. They also identify each record uniquely. We also use them as first columns for our database for efficiency purpose since Cassandra will use those columns to distribute the data among and within nodes.

In [8]:
query = "CREATE TABLE IF NOT EXISTS musicapp_history"

query = query + "(sessionId int, itemInSession int, artist_name text, song_title text, song_length float, user_firstname text, user_lastname text, PRIMARY KEY (sessionId, itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)

#### Inserting data from event_datafile_new.csv file into the newly created table

In [10]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        # insert relevant data from event_datafile_csv into musicapp_history table.
        query = "INSERT INTO musicapp_history (sessionId, itemInSession, artist_name, song_title, song_length, user_firstname, user_lastname)"
        query = query + "VALUES (%s,%s, %s, %s, %s, %s, %s)"
        #align csv data as expected by insert query.
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5]), line[1], line[4]))

#### Performing a SELECT using Query 1 to verify that the data was properly inserted into the musicapp_history table.

In [11]:
query = "select artist_name, song_title, song_length from musicapp_history WHERE sessionId=338 and itemInSession=4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
print("Query 1 results:")
for row in rows:
    print ("\n\tArtist Name: {0}; Song Title: {1}; Song length: {2}".format(row.artist_name, row.song_title, row.song_length))

Query 1 results:

	Artist Name: Faithless; Song Title: Music Matters (Mark Knight Dub); Song length: 495.30731201171875


##### Great news! Query 1 results are inline with the data included in the event_datafile_new.csv file

### Create table songs_by_user based on Query 2 requirements.
#### Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
#### We use *userid* *sessionId* and *itemInSession* as the primary key, so we can query the database based on those attributes. We use *userId* as the partition key and *sessionId* and *itemInSession* as clustering columns. 

In [12]:
query = "CREATE TABLE IF NOT EXISTS songs_by_user_session"
query = query + "(userId int, sessionId int, itemInSession int, user_firstname text, user_lastname text, song text, artist_name text, PRIMARY KEY (userId, sessionId, itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)

#### Inserting data from event_datafile_new.csv file into the newly created table

In [14]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        # insert relevant data from event_datafile_csv into songs_by_user_session table.
        query = "INSERT INTO songs_by_user_session (userId, sessionId, itemInSession, user_firstName, user_lastName, song, artist_name)"
        query = query + "VALUES (%s,%s, %s, %s, %s, %s, %s)"
        #align csv data as expected by insert query.
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[1], line[4], line[9], line[0]))

#### Performing a SELECT using Query 2 to verify that the data was properly inserted into the songs_by_user_session table.

In [15]:
query = "select artist_name, song, user_firstname, user_lastname from songs_by_user_session WHERE userId=10 and sessionId=182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
print("Query 2 results:")
for row in rows:
    print ("\n\tArtist Name: {0}; Song Title: {1}; User First Name: {2}, User Last Name: {3}".format(row.artist_name, row.song, row.user_firstname, row.user_lastname))

Query 2 results:

	Artist Name: Down To The Bone; Song Title: Keep On Keepin' On; User First Name: Sylvie, User Last Name: Cruz

	Artist Name: Three Drives; Song Title: Greece 2000; User First Name: Sylvie, User Last Name: Cruz

	Artist Name: Sebastien Tellier; Song Title: Kilometer; User First Name: Sylvie, User Last Name: Cruz

	Artist Name: Lonnie Gordon; Song Title: Catch You Baby (Steve Pitron & Max Sanna Radio Edit); User First Name: Sylvie, User Last Name: Cruz


##### Great news! Query 2 results are inline with the data included in the event_datafile_new.csv file

### Create table users_by_song based on Query 3 requirements.
#### Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
#### We use *song* and *userId* as the primary key, so we can query the database based on those attributes. We use *song* as the partition key and *userId* as clustering column. 

In [16]:
query = "CREATE TABLE IF NOT EXISTS users_by_song"
query = query + "(song text, userId int, user_firstname text, user_lastname text, PRIMARY KEY (song, userId))"
try:
    session.execute(query)
except Exception as e:
    print(e)

#### Inserting data from event_datafile_new.csv file into the newly created table

In [19]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        # insert relevant data from event_datafile_csv into users_by_song table.
        query = "INSERT INTO users_by_song (song, userId, user_firstName, user_lastName)"
        query = query + "VALUES (%s,%s, %s, %s)"
        #align csv data as expected by insert query.
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

#### Performing a SELECT using Query 3 to verify that the data was properly inserted into the users_by_song table.

In [20]:
query = "select user_firstname, user_lastname from users_by_song WHERE song='All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
print("Query 3 results:")
for row in rows:
    print ("\n\tUser\'s name: {0} {1}".format(row.user_firstname, row.user_lastname))

Query 3 results:

	User's name: Jacqueline Lynch

	User's name: Tegan Levine

	User's name: Sara Johnson


##### Great news! Query 3 results are inline with the data included in the event_datafile_new.csv file

### Dropping the tables before closing out the sessions

In [21]:
query = "drop table musicapp_history"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [22]:
query = "drop table songs_by_user_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [23]:
query = "drop table users_by_song"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

### Closing the session and cluster connection¶

In [24]:
session.shutdown()
cluster.shutdown()

##### In this project, we created a database (keyspace in the Cassandra world). We transfered logging data from a 30 csv files into a single file and use that file to insert data into our 3 database tables. Those 3 tables were created and labeled by analyzing their respective queries. Test queries display the expected results as verified in the .csv file. 