In [None]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv
from cql_queries import *
from functions import *
from drop_create_tables import *

# Part I
## ETL Pipeline: Pre-Processing the files (EXTRACT + TRANSFORM)

In [None]:
# get the current working directory
pwd = os.getcwd()

# calling the preprocessing function
preprocessing_csv_files(pwd)

# check the number of rows in the combined csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print("Total rows merged:", sum(1 for line in f))

# Part II
## ETL Pipeline: Creating and Filling the Apache Cassandra Database (TRANSFORM + LOAD)

### The event_datafile_new.csv contains the following columns: 
- <strong>artist</strong> = artist name
- <strong>firstName</strong> = user's first name
- <strong>gender</strong> = users' gender
- <strong>itemInSession</strong> = item number in session
- <strong>lastName</strong> = user's last name
- <strong>length</strong> = song's duration (in seconds)
- <strong>level</strong> = indicates if the song is paid or free
- <strong>location</strong> = user's location
- <strong>sessionId</strong> = users's session ID
- <strong>song</strong> = song's title
- <strong>userId</strong> = user's ID

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

### Let's start building our database

### The first thing we need to do is to create a Keyspace

#### Create a Cluster and establish connection to it

In [None]:
# Make a connection to the local machine Cassandra instance (127.0.0.1)
from cassandra.cluster import Cluster
cluster = Cluster()

# Create a session to establish connection and begin executing queries
session = cluster.connect()

#### Create and set a Keyspace

##### Drop a Keyspace if it already exists

In [None]:
try:
    session.execute(drop_keyspace)
except Exception as e:
    print(e)

##### Create a new Keyspace

In [None]:
try:
    session.execute(create_keyspace)
except Exception as e:
    print(e)

##### Set a Keyspace

In [None]:
try:
    session.set_keyspace('sparkifydb')
except Exception as e:
    print(e)

### When working with NoSQL Databases (Apache Cassandra in our case), we model the tables on the queries we intend to run 

### Let's take a look at the queries we want to run on our database:

#### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


#### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

#### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

### Let's create a separate table for each of the queries above and name them as:

#### 1. session_song_details

#### 2. user_session_songs

#### 3. user_songs

In [None]:
try:
    drop_tables(session)
except Exception as e:
    print(e)
    
try:
    create_tables(session)
except Exception as e:
    print(e)

### Let's fill the tables with the data from event_datafile_new.csv by executing corresponding INSERT statements

In [None]:
file = 'event_datafile_new.csv'

#### WARNING: The next code snippet's execution may take some time to finish
#### That's because it combines INSERT statements for all three tables created

In [None]:
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        session.execute(first_table_insert, ( int(line[8]), int(line[3]), line[0], line[9], float(line[5])) )
        session.execute(second_table_insert, ( int(line[10]), int(line[8]), int(line[3]), line[1], line[4], line[0], line[9] ) )
        session.execute(third_table_insert, ( line[9], int(line[10]), line[1], line[4] )

# Part III
## Data Verification

### Let's now execute SELECT statements for each of the table to ensure the data has been successfully inserted

1. Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

#### PRIMARY KEY: (sessionId, itemInSession) 
##### PARTITION KEY: sessionId, itemInSession
##### CLUSTERING COLUMN(S): -
#### For the first table, the columns sessionId, itemInSession were used as a Partition Key because the SELECT query needs to filter by these columns.
#### No column(s) were used as Clustering because there can be only one resulting row for each sessionId / itemInSession pair. So there's no need to sort anything.

In [None]:
try:
    rows = session.execute(first_table_select)
except Exception as e:
    print(e)
    
data = []
for row in rows:
    data.append(row)
    df = pd.DataFrame(data)
df

2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182¶

#### PRIMARY KEY: ((userId , sessionId),  itemInSession)
##### PARTITION KEY: userId , sessionId
##### CLUSTERING COLUMN: itemInSession
#### For the second table, the columns userId, sessionId were used as a Partition Key because the SELECT query needs to filter by these columns.
#### Though the two columns above represent a Partition Key, they're not enough to make the PRIMARY KEY unique.
#### To fix that, one more column is included in the PRIMARY KEY - itemInSession. It will be a Clustering Column because the SELECT query requests the data to be sorted by itemInSession.

In [None]:
try:
    rows = session.execute(second_table_select)
except Exception as e:
    print(e)
    
data = []
for row in rows:
    data.append(row)
    df = pd.DataFrame(data)
df

3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### PRIMARY KEY: ((song),  userId)
##### PARTITION KEY: song
##### CLUSTERING COLUMN: userId
#### For the third table, the column song was used as a Partition Key because the SELECT query needs to filter by this column.
#### Though this column may represent a Partition Key, it's not enough to make the PRIMARY KEY unique. 
#### To fix that, one more column (Clustering) is included in the PRIMARY KEY - userId. 

In [None]:
try:
    rows = session.execute(third_table_select)
except Exception as e:
    print(e)
    
data = []
for row in rows:
    data.append(row)
    df = pd.DataFrame(data)
df

### Let's drop the tables before closing out the session

In [None]:
try:
    drop_tables(session)
except Exception as e:
    print(e)

### And now let's close the session and cluster connection¶

In [None]:
session.shutdown()
cluster.shutdown()