# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
print(os.getcwd())
filepath = os.getcwd() + '/event_data'
for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
full_data_rows_list = [] 
for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)

        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
print('Total Rows: ', len(full_data_rows_list))
# print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print('Rows in file: ', sum(1 for line in f))

Rows in file:  6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster and Keyspace - with exception handling!

In [5]:
from cassandra.cluster import Cluster
try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print('Error connecting to cluster')
    print(e)

try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify
    WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}
    """)
except Exception as e:
    print('Error creating KEYSPACE')
    print(e)

## Set the Keyspace
try:
    session.set_keyspace('sparkify')
except Exception as e:
    print('Error setting KEYSPACE')

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

1. Give me the *artist*, *song title* and *song's length* in the music app history that was heard during  *sessionId = 338*, and *itemInSession  = 4*
2. Give me only the following: name of *artist*, *song (sorted by itemInSession)* and *user (first and last name)* for *userid = 10*, *sessionid = 182*
3. Give me every *user name (first and last)* in my music app history who listened to the *song 'All Hands Against His Own'*

---

## Query 1 Notes

* <mark>**Query**: Give me the _artist_, _song title_ and _song's length_ in the music app history that was heard during `session_id` = 338, and `item_in_session` = 4</mark>
* To get a unique `PRIMARY KEY` it is necessary to combine the `session_id` with `item_in_session`. The `session_id` alone may have multiple entries if more than one song was played in the session.
* The `session_id` can be used as the `PARTITION KEY` while `item_in_session` can be set as a `CLUSTERING` column.
* The query requires a `WHERE` on `item_in_session`.

In [8]:
## CREATE TABLE
query = "CREATE TABLE IF NOT EXISTS songs_by_session "
query += """(artist text,
            item_in_session int,
            song_length float,
            session_id int,
            song_title text,
            PRIMARY KEY (session_id, item_in_session));"""
try:
    session.execute(query)
except Exception as e:
    print('Error creating TABLE')
    print(e)

## INSERT ROWS
file = 'event_datafile_new.csv'
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO songs_by_session (artist, item_in_session, song_length, session_id, song_title) "
        query += "VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (line[0], int(line[3]), float(line[5]), int(line[8]), line[9]))

In [9]:
## VERIFY DATA
query = "SELECT * FROM songs_by_session LIMIT 5;"
try:
    rows = session.execute(query)
except Exception as e:
    print('Error reading from table')
    print(e)

print('{:<20}  {:<8}  {:<7}  {:<8}  {:<20}'.format('Artist', 'Item', 'Length', 'Session', 'SongTitle'))
print('-'*60)
for row in rows:
    print('{:<20}  {:<8}  {:3.3f}  {:<8}  {:<20}'.format(row.artist, row.item_in_session, row.song_length, row.session_id, row.song_title))

Artist                Item      Length   Session   SongTitle           
------------------------------------------------------------
Regina Spektor        0         191.085  23        The Calculation (Album Version)
Octopus Project       1         250.958  23        All Of The Champs That Ever Lived
Tegan And Sara        2         180.062  23        So Jealous          
Dragonette            3         153.391  23        Okay Dolores        
Lil Wayne / Eminem    4         229.590  23        Drop The World      


In [10]:
## RUN QUERY
query = """SELECT artist, song_title, song_length
        FROM songs_by_session WHERE session_id=%s
        AND item_in_session=%s;"""
try:
    rows = session.execute(query, (338, 4))
except Exception as e:
    print('Error running query')
    print(e)

print('{:<10} {:<35} {:<7}'.format('Artist', 'SongTitle', 'SongLength'))
print('-'*54)
for row in rows:
    print('{:<10} {:<35} {:3.3f}'.format(row.artist, row.song_title, row.song_length))

Artist     SongTitle                           SongLength
------------------------------------------------------
Faithless  Music Matters (Mark Knight Dub)     495.307


---

## Query 2 Notes

* <mark>**Query**: Give me only the following: name of *artist*, *song (sorted by itemInSession)* and *user (first and last name)* for `userid` = 10, `sessionid = 182`</mark>
* To get a unique `PRIMARY KEY` for each `item_in_session`, the `user_id`,`session_id` and `item_in_session` keys are combined.
* The `user_id` and `session_id` can be combined as the `PARTITION KEY` - this gives more partitions than using only `user_id`.
* The query requires a `WHERE` on `user_id` and `session_id` and a `SORT` on `item_in_session`.

In [11]:
## CREATE TABLE
query = "CREATE TABLE IF NOT EXISTS songs_by_user_session "
query += """(artist text,
            user_first text,
            item_in_session int,
            user_last text,
            session_id int,
            song_title text,
            user_id int,
            PRIMARY KEY ((user_id, session_id), item_in_session)) """
query += "WITH CLUSTERING ORDER BY (item_in_session ASC);"
try:
    session.execute(query)
except Exception as e:
    print('Error creating TABLE')
    print(e)

## INSERT ROWS
file = 'event_datafile_new.csv'
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO songs_by_user_session (artist, user_first, item_in_session, user_last, session_id, song_title, user_id) "
        query +="VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (line[0], line[1], int(line[3]), line[4], int(line[8]), line[9], int(line[10])))

In [12]:
## VERIFY DATA
query = "SELECT * FROM songs_by_user_session LIMIT 5;"
try:
    rows = session.execute(query)
except Exception as e:
    print('Error reading from table')
    print(e)

print('{:<8} {:<15} {:<15} {:<8} {:<8} {:<25} {:<20}'.format('UserId', 'UserFirst', 'UserLast', 'Session', 'Item', 'Artist', 'SongTitle'))
print('-'*110)
for row in rows:
    print('{:<8} {:<15} {:<15} {:<8} {:<8} {:<25} {:<20}'.format(row.user_id, row.user_first, row.user_last, row.session_id, row.item_in_session, row.artist, row.song_title))

UserId   UserFirst       UserLast        Session  Item     Artist                    SongTitle           
--------------------------------------------------------------------------------------------------------------
58       Emily           Benson          768      0        System of a Down          Sad Statue          
58       Emily           Benson          768      1        Ghostland Observatory     Stranger Lover      
58       Emily           Benson          768      2        Evergreen Terrace         Zero                
85       Kinsley         Young           776      2        Deftones                  Head Up (LP Version)
85       Kinsley         Young           776      3        The Notorious B.I.G.      Playa Hater (Amended Version)


In [13]:
## RUN QUERY
# Order by ASC item_in_session (done at table creation above)
query = """SELECT artist, song_title, user_first, user_last
        FROM songs_by_user_session
        WHERE user_id=%s AND session_id=%s;"""
try:
    rows = session.execute(query, (10, 182))
except Exception as e:
    print('Error running query')
    print(e)

print('{:<20}  {:<20}  {:<20}  {:<20}'.format('Artist', 'Song Title', 'User First Name', 'User Last Name'))
print('-'*80)
for row in rows:
    print('{:<20}  {:<20}  {:<20}  {:<20}'.format(row.artist, row.song_title, row.user_first, row.user_last))

Artist                Song Title            User First Name       User Last Name      
--------------------------------------------------------------------------------
Down To The Bone      Keep On Keepin' On    Sylvie                Cruz                
Three Drives          Greece 2000           Sylvie                Cruz                
Sebastien Tellier     Kilometer             Sylvie                Cruz                
Lonnie Gordon         Catch You Baby (Steve Pitron & Max Sanna Radio Edit)  Sylvie                Cruz                


---

## Query 3 Notes

* <mark>**Query**: Give me every *user name (first and last)* in my music app history who listened to the *song 'All Hands Against His Own'*</mark>
* To get a unique `PRIMARY KEY`, the `song_title` and `user_id` keys are combined.
* The `song_title` is set as the `PARTITION KEY` - this gives lots of partitions (one for each song).
* The query requires a `WHERE` on `song_title`.

In [14]:
## CREATE TABLE
query = "CREATE TABLE IF NOT EXISTS user_by_song "
query += """(song_title text,
            user_id int,
            user_first text,
            user_last text,
            PRIMARY KEY (song_title, user_id));"""
try:
    session.execute(query)
except Exception as e:
    print('Error creating TABLE')
    print(e)

## INSERT ROWS
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO user_by_song (user_first, user_last, song_title, user_id) "
        query += "VALUES (%s, %s, %s, %s);"
        session.execute(query, (line[1], line[4], line[9], int(line[10])))

In [15]:
## VERIFY DATA
query = "SELECT * FROM user_by_song LIMIT 5;"
try:
    rows = session.execute(query)
except Exception as e:
    print('Error reading from table')
    print(e)

print('{:<8} {:<15} {:<15} {:<25}'.format('UserId', 'UserFirst', 'UserLast', 'SongTitle'))
print('-'*110)
for row in rows:
    print('{:<8} {:<15} {:<15} {:<25}'.format(row.user_id, row.user_first, row.user_last, row.song_title))

UserId   UserFirst       UserLast        SongTitle                
--------------------------------------------------------------------------------------------------------------
49       Chloe           Cuevas          Wonder What's Next       
49       Chloe           Cuevas          In The Dragon's Den      
44       Aleena          Kirby           Too Tough (1994 Digital Remaster)
49       Chloe           Cuevas          Rio De Janeiro Blue (Album Version)
15       Lily            Koch            My Place                 


In [16]:
## RUN QUERY
query = """SELECT user_first, user_last
        FROM user_by_song
        WHERE song_title=%s;"""
try:
    rows = session.execute(query, ('All Hands Against His Own',))
except Exception as e:
    print('Error running query')
    print(e)

print('{:<20} {:<20}'.format('UserFirst', 'UserLast'))
print('-'*40)
for row in rows:
    print('{:<20} {:<20}'.format(row.user_first, row.user_last))

UserFirst            UserLast            
----------------------------------------
Jacqueline           Lynch               
Tegan                Levine              
Sara                 Johnson             


### Drop the tables before closing out the sessions

In [17]:
try:
    session.execute('DROP TABLE songs_by_session')
    session.execute('DROP TABLE songs_by_user_session')
    session.execute('DROP TABLE user_by_song')
except Exception as (e):
    print('Error dropping tables.')
    print(e)

<cassandra.cluster.ResultSet at 0x7faef9d1add8>

### Close the session and cluster connection¶

In [18]:
session.shutdown()
cluster.shutdown()