# Part I. ETL Pipeline for Pre-Processing the Files
### Import Python packages 

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

import etl
from queries import create_keyspace_stmt, create_tables_stmts, queries

### Creating list of filepaths to process original event csv data files

In [2]:
print(os.getcwd())
filepath = os.getcwd() + '/event_data'
for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))

/home/workspace


### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
full_data_rows_list = [] 
for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)     
        for line in csvreader:
            full_data_rows_list.append(line) 
            
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


### Check the number of rows in the csv file

In [4]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of the project. 
### Create the keyspace

In [3]:
from cassandra.cluster import Cluster
cluster = Cluster()

session = cluster.connect()

In [7]:
try:
    session.execute("""CREATE KEYSPACE IF NOT EXISTS sparkify
    WITH REPLICATION = {
        'class': 'SimpleStrategy',
        'replication_factor': 1
    }""")
except Exception as e:
    print(e)

### Set Keyspace

In [8]:
session.set_keyspace('sparkify')

### Create the tables.
Use the queries defined in ```queries.py``` to create the project tables.

In [9]:
for lib in create_tables_stmts:
    try:
        session.execute(lib)
    except Exception as e:
        print(e)

## Create queries to ask the following three questions of the data

1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

### Call the function ```insert_data``` imported from ```etl.py``` to insert the data.


In [10]:
etl.insert_data(session)

### Simple ```SELECT``` statemenst to verify that the data has been inserted into each table

In [11]:
try:
    rows = session.execute("select count(*) from session_library")
    for row in rows:
        print(row)
        
    rows = session.execute("select count(*) from user_library")
    for row in rows:
        print(row)
        
    rows = session.execute("select count(*) from song_library")
    for row in rows:
        print(row)
except Exception as e:
    print(e)

Row(count=6680)
Row(count=6680)
Row(count=6485)


### Implement the ```SELECT``` statements to answer the project requirements. 
1. Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4
- This query selects the *artist_name, song_title, song_length* columns from the ```session_library``` table and the filters by *session_id* and *item_in_session*. The filter keys were used to define a composite ```PRIMARY KEY``` for the table.

In [13]:
try:
    rows = session.execute(queries['session_library']['select'])
    for row in rows:
        print(row)
except Exception as e:
    print(e)

Row(artist_name='Faithless', song_title='Music Matters (Mark Knight Dub)', song_length=495.30731201171875)


2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name)for userid = 10, sessionid = 182
- This query selects the * artist_name, song_title, user_first_name, user_last_name* columns from the ```user_library``` table and the filters by *user_id* and *session_id*. The filter keys were used to define a composite ```PRIMARY KEY``` for the table.

In [14]:
try:
    rows = session.execute(queries['user_library']['select'])
    for row in rows:
        print(row)
except Exception as e:
    print(e)                    

Row(artist_name='Down To The Bone', song_title="Keep On Keepin' On", user_first_name='Sylvie', user_last_name='Cruz')
Row(artist_name='Three Drives', song_title='Greece 2000', user_first_name='Sylvie', user_last_name='Cruz')
Row(artist_name='Sebastien Tellier', song_title='Kilometer', user_first_name='Sylvie', user_last_name='Cruz')
Row(artist_name='Lonnie Gordon', song_title='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', user_first_name='Sylvie', user_last_name='Cruz')


3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
- This query selects the *user_first_name, user_last_name* columns from the ```song_library``` table and the filters by *song_title*. The filter key and *user_id* were used to define a composite ```PRIMARY KEY``` for the table.

In [15]:
try:
    rows = session.execute(queries['song_library']['select'])
    for row in rows:
        print(row)
except Exception as e:
    print(e)  

Row(user_first_name='Sara', user_last_name='Johnson')
Row(user_first_name='Tegan', user_last_name='Levine')


### Drop the tables before closing out the sessions

In [16]:
for tab in ['session_library', 'song_library', 'user_library']:
    try:
        session.execute(f'DROP TABLE IF EXISTS {tab}')
    except Exception as e:
        print(e)

### Close the session and cluster connection¶

In [17]:
session.shutdown()
cluster.shutdown()