# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

My working directory:

In [2]:
print(os.getcwd())

/home/workspace


In [3]:
filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))
    # print(file_path_list) uncomment to check all the event files in the filepath

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [4]:
full_data_rows_list = [] 
    
for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        for line in csvreader:
            full_data_rows_list.append(line) 
            
print('There are',len(full_data_rows_list), 'rows in the original event csv datafiles.')

There are 8056 rows in the original event csv datafiles.


#### Creating a smaller event data csv file called event_datafile_full.csv that will be used to insert data into the Apache Cassandra tables in Part II

In [5]:
event_datafile = 'event_datafile_new.csv'

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open(event_datafile, 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))
        
with open(event_datafile, 'r', encoding = 'utf8') as f:
    print('The file', event_datafile, 'has', sum(1 for line in f), 'rows, \
          \ncontaning only records from the original files where the column `artist` is not null.')

The file event_datafile_new.csv has 6821 rows,           
contaning only records from the original files where the column `artist` is not null.


# Part II. Complete the Apache Cassandra coding portion of your project. 

Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster
Connecting to Apache Cassandra instance on 127.0.0.1

In [6]:
from cassandra.cluster import Cluster
try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [7]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity_laura
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)
except Exception as e:
    print(e)

#### Set Keyspace

In [8]:
try:
    session.set_keyspace('udacity_laura')
except Exception as e:
    print(e)

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

#### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


#### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

#### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### The file <font color=red>**event_datafile_new.csv**</font> will be used to inserto data into the three tables below

In [9]:
print(event_datafile)

event_datafile_new.csv


## Modeling data for question 1

We want to create a table to run the following query:

```sql
query = """
        SELECT artist_name, song_title, song_length
        FROM songs_length_library
        WHERE session_id = 338 AND item_in_session = 4
        """
```

In this case, the PRIMARY KEY for the table `songs_length_library` can be easily identified from the WHERE clause:
* Partition key: `session_id`
* Clustering columns: `item_in_session`   

Notice that `session_id` is not unique, but the combination of (`session_id`, `item_in_session`) makes the PRIMARY KEY unique.

#### 1. Creating table and inserting data into it

In [10]:
query = "CREATE TABLE IF NOT EXISTS songs_length_library"
query = query + "(session_id int, item_in_session int, artist_name text, song_title text, song_length float, \
                PRIMARY KEY (session_id, item_in_session))"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [12]:
with open(event_datafile, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO songs_length_library (session_id, item_in_session, artist_name, song_title, song_length)"
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

#### 2. SELECT query to verify that the data have been inserted into each table

In [13]:
query = """
        SELECT artist_name, song_title, song_length
        FROM songs_length_library
        WHERE session_id = 338 AND item_in_session = 4
        """
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist_name, row.song_title, row.song_length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


## Modeling data for question 2

We want to create a table to run the following query:

```sql
query = """
        SELECT artist_name, song_title, first_name, last_name
        FROM songs_user_library
        WHERE user_id = 10 AND session_id = 182
        """
```

In this case, the PRIMARY KEY for the table `songs_user_library` should be composed of:

* Partition keys: (`user_id`, `session_id`), since we are interested in accessing information byy user and session, and defining both as partition keys will make the query scan data more efficiently
* Clustering columns: `item_in_session`, since we want the songs to be ordered in ascending order according to this variable.

Notice that `user_id` is not unique, but the combination of ((`user_id`, `session_id`), `item_in_session`) makes the PRIMARY KEY unique.

#### 1. Creating table and inserting data into it

In [14]:
query = "CREATE TABLE IF NOT EXISTS songs_user_library"
query = query + "(user_id int, session_id int, item_in_session int, artist_name text, song_title text, \
                first_name text, last_name text, PRIMARY KEY ((user_id, session_id), item_in_session))"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [15]:
with open(event_datafile, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
        query = "INSERT INTO songs_user_library (user_id, session_id, item_in_session, artist_name, song_title, \
                                                first_name, last_name)"
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9],line[1], line[4]))

#### 2. SELECT query to verify that the data have been inserted into each table

In [16]:
query = """
        SELECT artist_name, song_title, first_name, last_name
        FROM songs_user_library
        WHERE user_id = 10 AND session_id = 182
        """
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist_name, row.song_title, row.first_name, row.last_name)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


## Modeling data for question 3

We want to create a table to run the following query:

```sql
query = """
        SELECT first_name, last_name
        FROM songs_search_library
        WHERE song_title = 'All Hands Against His Own'
        """
```

In this case, based on the WHERE clause and ordering requirements, the PRIMARY KEY for the table `songs_search_library` could be simple, composed only by the Partition key `song_title`. However, it wouldn't satisfy the first requirements of primary keys: they must be unique. Therefore, we can add `user_id` as clustering column to satisfy the unique constraint.

* Partition key: `song_title`
* Clustering columns: `user_id`

Note: It can be that the same user will listen to the same songs in more than one session, or even in the same session. So it's important to know that, when trying to insert a primary key that already exists into a Cassandra table, the existing row will be updated. But it is not a problem for the use case of this table, since we just want to retrieve the names of all the users that have ever listened to a given song, no matter how many times.

#### 1. Creating table and inserting data into it

In [17]:
query = "CREATE TABLE IF NOT EXISTS songs_search_library"
query = query + "(song_title text, user_id int, first_name text, last_name text, \
                 PRIMARY KEY (song_title, user_id))"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [21]:
with open(event_datafile, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO songs_search_library (song_title, user_id, first_name, last_name)"
        query = query + "VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

#### 2. SELECT query to verify that the data have been inserted into each table

In [22]:
query = """
        SELECT first_name, last_name
        FROM songs_search_library
        WHERE song_title = 'All Hands Against His Own'
        """
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.first_name, row.last_name)

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Dropping the tables before closing out the sessions

In [23]:
tables_to_drop = ['songs_length_library', 'songs_user_library', 'songs_search_library']

for table in tables_to_drop:
    query = f"DROP TABLE IF EXISTS {table}"
    try:
        rows = session.execute(query)
    except Exception as e:
        print(e)

### Closing the session and cluster connection¶

In [24]:
session.shutdown()
cluster.shutdown()