# Part I. ETL Pipeline for Pre-Processing the Files

## First step is runing code import python package that important for python

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
print(os.getcwd())

filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    
    file_path_list = glob.glob(os.path.join(root,'*'))


/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
full_data_rows_list = [] 
    
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# Count numbers records
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. The following part is code Apache Cassandra. 

## Now currently we have CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## This is begining part Apache Cassandra code in the cells below

#### Creating a Cluster

In [5]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

session = cluster.connect()

#### Creating Keyspace

In [6]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [7]:
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

### Now creating tables. Follow the code bellow and run it, code will create tables.

## Below is the task for this Project:

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




In [8]:
query = "CREATE TABLE IF NOT EXISTS music_item_sessions"
query = query + "(session_id int, item_session int, user_id int, artist text, length_song float, song_title text, PRIMARY KEY ((session_id), item_session))"
try:
    session.execute(query)
except Exception as e:
    print(e)

### Creating table for first question. Since we need session Id and item in session we will make session Id as primary key and item session will be clustering key. Partitioning done by sessionId and within that partition, rows will be ordered by the item in session.

In [None]:
query = "CREATE TABLE IF NOT EXISTS music_userid_sessions"
query = query + "(session_id int,user_id int, item_session int, artist text ,song_title text, PRIMARY KEY ((session_id, user_id),item_session))"
try:
    session.execute(query)
except Exception as e:
    print(e)

### Creating table for second task. Since we need user id and session id, we will make it primary keys. Partioning will be done in session id and user id. Also song need to be sorted by item in session, then item in session will be clustering key, so row will be ordered by item in session.

In [None]:
query = "CREATE TABLE IF NOT EXISTS music_sessions_users"
query = query + "(song_title text, user_id int, first_name text, last_name text, PRIMARY KEY (song_title, user_id))"
try:
    session.execute(query)
except Exception as e:
    print(e)       

### Creating table with third task. Since we need filter by song and also it will be better to have sorted by user id, then song title and user id will be primary key. Partioning will be done in session id and user id.

In [9]:
## File name where contain all records for datas
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:

        query_items = "INSERT INTO music_item_sessions (session_id, item_session, user_id, artist, length_song, song_title)"
        query_items = query_items + "VALUES (%s, %s, %s, %s, %s, %s)"

        query_users = "INSERT INTO music_userid_sessions (session_id, user_id , item_session , artist ,song_title)"
        query_users = query_users + "VALUES (%s, %s, %s, %s, %s)"
        
        query_songs = "INSERT INTO music_sessions_users (song_title, user_id,first_name, last_name)"
        query_songs = query_songs + "VALUES (%s, %s, %s, %s)"

        ## This is query insert to the various tables 

        session.execute(query_items, (int(line[8]), int(line[3]),int(line[10]), str(line[0]), float(line[5]), str(line[9])))
        session.execute(query_users, (int(line[8]), int(line[10]), int(line[3]), str(line[0]), str(line[9])))
        session.execute(query_songs, (str(line[9]),int(line[10]), str(line[1]), str(line[4])))

        ## This is insert queries from readline cvs into tables

### The code above is working on csv file that we combine from datas of foler. In the code we include inserting into 3 tables line by line. 

### The code verify that we have some datas in tables. Since we use count because not all datas cannot be show in the jypiter we use count to show how many rows inserted in each tables.

In [None]:
query = "select count(artist) from music_item_sessions WHERE session_id = 338 and item_session= 4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
print(f'How many artist with artist with session_id = 338 and item_session= 4 in music_item_sessions {rows}')

query = "select count(artist) from music_userid_sessions WHERE session_id = 182 AND user_id = 10"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
print(f'How many artist with artist with session id 182 and user id 10 in music_userid_sessions {rows}')

query = "select count(song_title) from music_sessions_users WHERE song_title='All Hands Against His Own' "
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
print(f'How many user listen song "All Hands Against His Own" in music_sessions_users {rows}')

### Below are 3 queries that answer the question about 

####  Query 1:  Give the artist, song title and song's length in the music app history that was heard during
#### sessionId = 338, and itemInSession = 4

In [None]:

query = "select artist, song_title, length_song from music_item_sessions WHERE session_id = 338 and item_session= 4 "
try:
    rows = session.execute(query)
except Exception as e:
    print(e)


for row in rows:
    print (row.artist, row.song_title, row.length_song)

#### Query 2: Give only the following: name of artist, song (sorted by itemInSession) and user (first and last name)\
#### for userid = 10, sessionid = 182

In [None]:

query = "select artist, song_title, first_name, last_name from music_userid_sessions WHERE session_id = 182 AND user_id = 10"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song_title, row.first_name, row.last_name)

#### Query 3: Give every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [18]:
query = "select first_name, last_name from music_sessions_users WHERE song_title='All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.first_name, row.last_name)    

Row(first_name='Jacqueline', last_name='Lynch')
Row(first_name='Tegan', last_name='Levine')
Row(first_name='Sara', last_name='Johnson')


## Last action need clean tables and session

### Drop the tables before closing out the sessions

In [13]:
query = "drop table music_sessions_users"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "drop table music_item_sessions"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "drop table music_userid_sessions"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

## Delete all tables


### Close the session and cluster connection¶

In [35]:
session.shutdown()
cluster.shutdown()
## Close sessions