# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [2]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [3]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    print(file_path_list)

/home/workspace/Repo/udacity_project2
['/home/workspace/Repo/udacity_project2/event_data/2018-11-18-events.csv', '/home/workspace/Repo/udacity_project2/event_data/2018-11-02-events.csv', '/home/workspace/Repo/udacity_project2/event_data/2018-11-03-events.csv', '/home/workspace/Repo/udacity_project2/event_data/2018-11-13-events.csv', '/home/workspace/Repo/udacity_project2/event_data/2018-11-14-events.csv', '/home/workspace/Repo/udacity_project2/event_data/2018-11-10-events.csv', '/home/workspace/Repo/udacity_project2/event_data/2018-11-22-events.csv', '/home/workspace/Repo/udacity_project2/event_data/2018-11-30-events.csv', '/home/workspace/Repo/udacity_project2/event_data/2018-11-17-events.csv', '/home/workspace/Repo/udacity_project2/event_data/2018-11-05-events.csv', '/home/workspace/Repo/udacity_project2/event_data/2018-11-23-events.csv', '/home/workspace/Repo/udacity_project2/event_data/2018-11-12-events.csv', '/home/workspace/Repo/udacity_project2/event_data/2018-11-29-events.csv',

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [4]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [5]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


In [145]:
df = pd.read_csv('./event_datafile_new.csv', encoding = 'utf8')
print(df.shape[0])
df.head()

6820


Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Rokia TraorÃÂ©,Stefany,F,0,White,274.88608,free,"Lubbock, TX",693,Zen,83
1,Camila,Tucker,M,1,Garrison,230.81751,free,"Oxnard-Thousand Oaks-Ventura, CA",555,Abrazame (Version Acustica),40
2,Carl Thomas,Tucker,M,0,Garrison,196.67546,free,"Oxnard-Thousand Oaks-Ventura, CA",698,You Ain't Right (Album Version),40
3,N.E.R.D.,James,M,0,Martin,242.99057,free,"Dallas-Fort Worth-Arlington, TX",78,Provider (Remix Radio Edit),79
4,Lil Jon / The East Side Boyz / DJ Flexx,Jacqueline,F,3,Lynch,285.30893,paid,"Atlanta-Sandy Springs-Roswell, GA",589,Aww Skeet Skeet,29


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [7]:
# Make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
try:
    cluster = Cluster(['127.0.0.1'])
    # To establish connection and begin executing queries, n
    session = cluster.connect()
except Exceptions as e:
    print(e)

#### Create Keyspace

In [8]:
try:
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS udacity
        WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1}
    """)
except Exception as e:
    print(e)

#### Set Keyspace

In [9]:
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

In [234]:
class BasicTableManager:
    '''
    This class helps you to create tables for custom queries.
    You should inherit from it, set table name and specify fields in a dictionary.
    Example table used in current class.
    Simple use-case for this class:
    table_manager = BasicTableManager()
    table_manager.drop_table(session)
    table_manager.create_table(session)
    table_manager.insert_table(df_data = df, session = session)
    table_manager.print_table_description()
    table_manager.select_from_table()
    
    Attributes:
        table_name (str): name of table
        fields_dict (dict): dictionary with table columns. Keys of dict becomes column names. In dictionary items you set type of column, 
                            source column from data frame, is column become primarykey or cluster column.
    
    '''
    def __init__(self):
        self.table_name = 'example'
        self.fields_dict = {
            'example_field':{'type':'text', 'ext_col_name':'example_field', 'is_primary': True, 'is_clustering':False}
        }
    
    def get_external_col_names(self):
        '''
        Returns list with columns from external data-frame required for insert
        '''
        return [field_parameters['ext_col_name'] for field, field_parameters in self.fields_dict.items()]
    
     
    def get_query_string_create_table(self):
        '''
        Generate statement to create table from self.table_name and self.fields_dict
        
        Returns:
            str: string-statement to create table
        '''
        query = "CREATE TABLE IF NOT EXISTS " + self.table_name + " ("
        for field, field_parameters in self.fields_dict.items():
            query  = query + field +' ' + field_parameters['type'] + ','
        query = query + " PRIMARY KEY ("
        # primary keys begin
        query = query + "("
        is_add_sep = False
        for field, field_parameters in self.fields_dict.items():
            if field_parameters['is_primary']:
                if is_add_sep:
                    query  = query + ', '
                else:
                    is_add_sep = True
                query  = query + field 
        query = query + ")"
        # primary keys end
        # clustering columns begin
        is_add_sep = False
        for field, field_parameters in self.fields_dict.items():
            if field_parameters['is_clustering']:
                if is_add_sep:
                    query  = query + ', '
                else:
                    is_add_sep = True
                query  =  query + ', ' + field 
        query = query + ")"
        # clustering columns end
        query = query + ")"
        return query
    
    def get_query_string_drop_table(self):
        '''
        Generate statement to drop table
        
        Returns:
            str: string-statement to drop table
        '''
        query = "DROP TABLE IF EXISTS " + self.table_name
        return query
    
    def get_query_string_select(self, input_pars):
        '''
        Generate statement to select from table
        
        Args:
            input_pars (dict): input parameters used in where statements (if required)
        
        Returns:
            str: string-statement to select
        '''
        query_template = "SELECT * FROM {table_name};"
        query_string = query_template.format(
            table_name = self.table_name
        )
        return query_string    
    
    def get_insert_template(self): 
        '''
        Generate template-statement to insert table
        
        Returns:
            str: template-statement to insert
        '''
        str_fields_sequence = ""
        str_template_sequence = ""
        is_add_sep = False
        for field, field_parameters in self.fields_dict.items():
            if is_add_sep:
                str_fields_sequence  = str_fields_sequence + ', '
                str_template_sequence = str_template_sequence + ', '
            else:
                is_add_sep = True
            str_fields_sequence  = str_fields_sequence + field 
            str_template_sequence = str_template_sequence + '%s'
        
        insert_template = "INSERT INTO " + self.table_name + " ("
        insert_template = insert_template + str_fields_sequence
        insert_template = insert_template + ") VALUES ("
        insert_template = insert_template + str_template_sequence
        insert_template = insert_template +  ")"
        return insert_template
    
    def print_table_description(self):
        '''
        Print all string query-statements created by class functions
        '''
        print('\n')
        print('Query to create table:')
        print(self.get_query_string_create_table())
        print('\n')
        print('Query to drop table:')
        print(self.get_query_string_drop_table())
        print('\n')
        print('Query to insert new records:')
        print(self.get_insert_template())
        print('\n')
        print('Default select example for table:')
        print(self.get_query_string_select())
        print('\n')
        
    def create_table(self, session):
        '''
        Create table using session object
        
        Args:
            session: cassandra session object
        '''
        session.execute(self.get_query_string_create_table())
        
    def drop_table(self, session):
        '''
        Drop table using session object
        
        Args:
            session: cassandra session object
        '''
        session.execute(self.get_query_string_drop_table())
        
    def select_from_table(self, session, input_pars):
        '''
        Select from table using session object and input parameters for where conditions
        
        Args:
            session: cassandra session object
        
        Returns:
             pd.DataFrame: query results 
        '''
        rows = session.execute(self.get_query_string_select(input_pars))
        df_result = pd.DataFrame([row for row in rows])
        return df_result
        
    def insert_one_row(self, session, row_data_list):
        '''
        Insert one record using session object
        
        Args:
            session: cassandra session object
            row_data_list (list): list with values to insert
        '''
        session.execute(self.get_insert_template(), row_data_list)
    
    def insert_table(self, session, df_data):
        '''
        Insert multiple records from data-frame
        
        Args:
            session: cassandra session object
            df_data (pd.DataFrame): data-frame with values to insert
        '''
        for i, row in df_data[self.get_external_col_names()].iterrows():
            self.insert_one_row(session=session, row_data_list= list(row.values))
            


class MultipleTableManager:
    '''
    Class to manage multiple tables. Contains helper functions to call multiple methods from BasicTableManager.
    Can hold multiple TableManager configured for different queries
    '''
    def __init__(self):
        self.table_managers_dict = {}
    
    def add_table_manager(self, table_manager_name: str, table_manager_object: BasicTableManager):
        self.table_managers_dict[table_manager_name] = table_manager_object
    
    def list_table_manager_names(self):
        return self.table_managers_dict.keys()
    
    def create_and_fill_table_by_name(self, table_manager_name, session, df):
        table_manager = self.table_managers_dict[table_manager_name]
        print('drop table for ' + table_manager_name)
        table_manager.drop_table(session)
        print('create table for ' + table_manager_name)
        table_manager.create_table(session)
        print('insert data to table for ' + table_manager_name)
        table_manager.insert_table(df_data = df, session = session)
        print('completed for ' + table_manager_name)
        
    def drop_all_tables(self, session):
        for table_manager_name, table_manager in self.table_managers_dict.items():
            print('drop table for ' + table_manager_name)
            table_manager.drop_table(session)
            print('completed for ' + table_manager_name)

## Create queries to ask the following three questions of the data

In [235]:
multiple_table_manager = MultipleTableManager()

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

In [236]:
class TableManagerQuery1(BasicTableManager):
    def __init__(self):
        self.table_name = 'song_artist_app'
        self.fields_dict = {
            'sessionId':{'type':'int', 'ext_col_name':'sessionId', 'is_primary': True, 'is_clustering':False},
            'itemInSession':{'type':'int', 'ext_col_name':'itemInSession', 'is_primary': True, 'is_clustering':False},
            'artist':{'type':'text', 'ext_col_name':'artist', 'is_primary': False, 'is_clustering':False},
            'song':{'type':'text', 'ext_col_name':'song', 'is_primary': False, 'is_clustering':False},
            'length':{'type':'float', 'ext_col_name':'length', 'is_primary': False, 'is_clustering':False}
        }
        
    def get_query_string_select(self, input_pars = {'sessionId':338,'itemInSession': 4} ):
        query_template = "SELECT artist, song, length FROM {table_name} where sessionId = {sessionId} and itemInSession = {itemInSession};"
        query_string = query_template.format(
            table_name = self.table_name, 
            sessionId = input_pars['sessionId'],  
            itemInSession = input_pars['itemInSession']
        )
        return query_string

multiple_table_manager.add_table_manager(
    table_manager_name = 'query1', 
    table_manager_object = TableManagerQuery1()
)

In [237]:
multiple_table_manager.create_and_fill_table_by_name(table_manager_name = 'query1', session = session, df = df)

drop table for query1
create table for query1
insert data to table for query1
completed for query1


In [238]:
multiple_table_manager.table_managers_dict['query1'].print_table_description()



Query to create table:
CREATE TABLE IF NOT EXISTS song_artist_app (sessionId int,itemInSession int,artist text,song text,length float, PRIMARY KEY ((sessionId, itemInSession)))


Query to drop table:
DROP TABLE IF EXISTS song_artist_app


Query to insert new records:
INSERT INTO song_artist_app (sessionId, itemInSession, artist, song, length) VALUES (%s, %s, %s, %s, %s)


Default select example for table:
SELECT artist, song, length FROM song_artist_app where sessionId = 338 and itemInSession = 4;




In [239]:
multiple_table_manager.table_managers_dict['query1'].select_from_table(
    session = session, 
    input_pars={
    'sessionId':338,
    'itemInSession': 4
    }
)

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [240]:
class TableManagerQuery2(BasicTableManager):
    def __init__(self):
        self.table_name = 'song_user_app'
        self.fields_dict = {
            'userId':{'type':'int', 'ext_col_name':'userId', 'is_primary': True, 'is_clustering':False},
            'sessionId':{'type':'int', 'ext_col_name':'sessionId', 'is_primary': True, 'is_clustering':False},
            'itemInSession':{'type':'int', 'ext_col_name':'itemInSession', 'is_primary': False, 'is_clustering':True},
            'artist':{'type':'text', 'ext_col_name':'artist', 'is_primary': False, 'is_clustering':False},
            'song':{'type':'text', 'ext_col_name':'song', 'is_primary': False, 'is_clustering':False},
            'firstName':{'type':'text', 'ext_col_name':'firstName', 'is_primary': False, 'is_clustering':False},
            'lastName':{'type':'text', 'ext_col_name':'lastName', 'is_primary': False, 'is_clustering':False}
        }
        
    def get_query_string_select(self, input_pars = {'userId':10, 'sessionId': 182} ):
        query_template = "SELECT itemInSession, artist, song, firstName, lastName FROM {table_name} where userId = {userId} and sessionId = {sessionId} ORDER BY itemInSession;"
        query_string = query_template.format(
            table_name = self.table_name, 
            userId = input_pars['userId'],  
            sessionId = input_pars['sessionId']
        )
        return query_string

multiple_table_manager.add_table_manager(
    table_manager_name = 'query2', 
    table_manager_object = TableManagerQuery2()
)

In [241]:
multiple_table_manager.create_and_fill_table_by_name(table_manager_name = 'query2', session = session, df = df)

drop table for query2
create table for query2
insert data to table for query2
completed for query2


In [242]:
multiple_table_manager.table_managers_dict['query2'].print_table_description()



Query to create table:
CREATE TABLE IF NOT EXISTS song_user_app (userId int,sessionId int,itemInSession int,artist text,song text,firstName text,lastName text, PRIMARY KEY ((userId, sessionId), itemInSession))


Query to drop table:
DROP TABLE IF EXISTS song_user_app


Query to insert new records:
INSERT INTO song_user_app (userId, sessionId, itemInSession, artist, song, firstName, lastName) VALUES (%s, %s, %s, %s, %s, %s, %s)


Default select example for table:
SELECT itemInSession, artist, song, firstName, lastName FROM song_user_app where userId = 10 and sessionId = 182 ORDER BY itemInSession;




In [243]:
multiple_table_manager.table_managers_dict['query2'].select_from_table(
    session = session, 
    input_pars={'userId':10, 'sessionId': 182} 
)

Unnamed: 0,iteminsession,artist,song,firstname,lastname
0,0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,1,Three Drives,Greece 2000,Sylvie,Cruz
2,2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [244]:
class TableManagerQuery3(BasicTableManager):
    def __init__(self):
        self.table_name = 'song_user'
        self.fields_dict = {
            'song':{'type':'text', 'ext_col_name':'song', 'is_primary': True, 'is_clustering':False},
            'firstName':{'type':'text', 'ext_col_name':'firstName', 'is_primary': False, 'is_clustering':False},
            'lastName':{'type':'text', 'ext_col_name':'lastName', 'is_primary': False, 'is_clustering':False}
        }
        
    def get_query_string_select(self, input_pars = {'song':'All Hands Against His Own'} ):
        query_template = "SELECT firstName, lastName FROM {table_name} WHERE song = '{song}';"
        query_string = query_template.format(
            table_name = self.table_name, 
            song = input_pars['song']
        )
        return query_string
    
multiple_table_manager.add_table_manager(
    table_manager_name = 'query3', 
    table_manager_object = TableManagerQuery3()
)

In [245]:
multiple_table_manager.create_and_fill_table_by_name(table_manager_name = 'query3', session = session, df = df)

drop table for query3
create table for query3
insert data to table for query3
completed for query3


In [246]:
multiple_table_manager.table_managers_dict['query3'].print_table_description()



Query to create table:
CREATE TABLE IF NOT EXISTS song_user (song text,firstName text,lastName text, PRIMARY KEY ((song)))


Query to drop table:
DROP TABLE IF EXISTS song_user


Query to insert new records:
INSERT INTO song_user (song, firstName, lastName) VALUES (%s, %s, %s)


Default select example for table:
SELECT firstName, lastName FROM song_user WHERE song = 'All Hands Against His Own';




In [247]:
multiple_table_manager.table_managers_dict['query3'].select_from_table(
    session = session, 
    input_pars = {'song':'All Hands Against His Own'}
)

Unnamed: 0,firstname,lastname
0,Tegan,Levine


## List of tables

In [248]:
rows = session.execute("SELECT keyspace_name, table_name FROM system_schema.tables WHERE keyspace_name = 'udacity';")
df_result = pd.DataFrame([row for row in rows])
df_result.head()

Unnamed: 0,keyspace_name,table_name
0,udacity,song_artist_app
1,udacity,song_user
2,udacity,song_user_app


### Drop the tables before closing out the sessions

In [250]:
multiple_table_manager.drop_all_tables(session)

drop table for query1
completed for query1
drop table for query2
completed for query2
drop table for query3
completed for query3


### Close the session and cluster connection¶

In [None]:
session.shutdown()
cluster.shutdown()