# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [25]:
import os

from file_utils import FileUtils
from cassandra_utils import CassandraUtils

#### Configurations

Cassandra configuration:

In [26]:
cassandra_config = {
    'ip' : ['172.17.0.2'],
    'replication_factor' : '1',
    'replication_class' : 'SimpleStrategy',
    'key_space' : 'sparkify',
    'tables' : ['music_sessions', 'music_users', 'music_songs']
}

Event file configuration:

In [27]:
event_directory: str = os.getcwd() + '/event_data'
    
event_file_path = 'event_datafile_new.csv'
   
headers_new_csv = ['artist','firstName','gender','itemInSession','lastName','length', \
                          'level','location','sessionId','song','userId']

pandas_events_schema = {
    headers_new_csv[0]:'str',
    headers_new_csv[1]:'str',
    headers_new_csv[2]:'str',
    headers_new_csv[3]:'int',
    headers_new_csv[4]:'str',
    headers_new_csv[5]:'float',
    headers_new_csv[6]:'str',
    headers_new_csv[7]:'str',
    headers_new_csv[8]:'int32',
    headers_new_csv[9]:'str',
    headers_new_csv[10]:'int32',
}

#### Creating list of filepaths to process original event csv data files

In [28]:
file_path_list = FileUtils.get_directory_files_list(event_directory, ".csv")

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [29]:
df_total = FileUtils.files_to_pd(file_path_list)

df_total_interested = df_total[headers_new_csv]
df_total_interested = df_total_interested.dropna(subset=['artist'])

FileUtils.pd_to_file(event_file_path, df_total_interested)

#### Check event file rows

In [30]:
print(FileUtils.file_num_rows(event_file_path))

6821


#### Read the event file into a pandas dataframe

In [31]:
df = FileUtils.read_file_to_pd(event_file_path, pandas_events_schema)

# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster, create and set Keyspace

In [32]:
cass = CassandraUtils(cassandra_config)
session = cass.connect()


            CREATE KEYSPACE IF NOT EXISTS sparkify WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
        


### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### Query 1: Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

#### Primary key
It is adapted to the the filter of the query. Both fields are part of the partition key. 

In [33]:
primary_key1 = "(session_id, item_in_session)"

#### Create table
The fields required are:
- session_id: bigint. It's possible to have a lot of sessions in the future.
- item_in_session: int. There's no many items into a session.
- artist: text
- song_title: text
- song_length: decimal

In [34]:
fields1 = ['session_id bigint', 'item_in_session int', 'artist text', 'song_title text','song_length float']

CassandraUtils.create_table(session, cassandra_config['tables'][0], fields1, primary_key1)                   

#### Populate table

In [35]:
columns_query1 = ['session_id', 'item_in_session', 'artist', 'song_title', 'song_length']
df_query1 = ['sessionId', 'itemInSession', 'artist', 'song', 'length']

df1 = df[df_query1]

CassandraUtils.insert_cassandra_from_df(session, cassandra_config['tables'][0], columns_query1, df1)   

#### SELECT to verify that the data have been inserted well

In [36]:
fields = ['artist', 'song_title', 'song_length']
conditions = "session_id = 338 and item_in_session = 4"
rows = CassandraUtils.select(session, fields, cassandra_config['tables'][0], conditions)
    
for row in rows:
    print(row.artist, row.song_title, row.song_length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


### Query 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

#### Primary key
It is adapted to the the filter of the query. Either user_id or session_id are part of the PARTITION KEY. I have used item_in_session as CLUSTERing COLUMN because we need the result sortered by that field.

In [37]:
primary_key2 = "((user_id, session_id), item_in_session)"

#### Create table
The fields required are:
- user_id: bigint. It's possible to have a lot of users in a future.
- session_id: bigint. It's possible to have a lot of sessions in a future.
- item_in_session: int. There's no many items into a session.
- artist: text
- song_title: text
- first_name: text
- last_name: text

In [38]:
fields2 = ['user_id bigint', 'session_id bigint', 'item_in_session int', 'artist text', \
                'song_title text', 'first_name text', 'last_name text']

CassandraUtils.create_table(session, cassandra_config['tables'][1], fields2, primary_key2)                          

#### Insert info into table

In [39]:
columns_query2 = ['user_id', 'session_id', 'item_in_session', 'artist', 'song_title', 'first_name', 'last_name']
df_query2 = ['userId', 'sessionId', 'itemInSession', 'artist', 'song', 'firstName', 'lastName']

df2 = df[df_query2]

CassandraUtils.insert_cassandra_from_df(session, cassandra_config['tables'][1], columns_query2, df2)   

#### SELECT to verify that the data have been inserted well

In [40]:
fields = ['artist', 'song_title', 'first_name', 'last_name']
conditions = "user_id = 10 and session_id = 182"
rows = CassandraUtils.select(session, fields, cassandra_config['tables'][1], conditions)
    
for row in rows:
    print (row.artist, row.song_title, row.first_name, row.last_name)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


### Query 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### Primary key

It is adapted to the the filter of the query. We need to partition using song but since it's not unique we use user id too.

In [41]:
primary_key3 = "(song, user_id)"

#### Create table
The fields required are:
- song: text
- user_id bigint
- first_name: text
- last_name: text

In [42]:
fields3 = ['song text', 'user_id bigint', 'first_name text', 'last_name text']

CassandraUtils.create_table(session, cassandra_config['tables'][2], fields3, primary_key3)                                              

#### Insert info into table

In [43]:
columns_query3 = ['song', 'user_id', 'first_name', 'last_name']
df_query3 = ['song', 'userId', 'firstName', 'lastName']

df3 = df[df_query3]

CassandraUtils.insert_cassandra_from_df(session, cassandra_config['tables'][2], columns_query3, df3)   

#### SELECT to verify that the data have been inserted well

In [44]:
fields = ['first_name', 'last_name']
conditions = "song = 'All Hands Against His Own'"
rows = CassandraUtils.select(session, fields, cassandra_config['tables'][2], conditions)
    
for row in rows:
    print (row.first_name, row.last_name)

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Drop the tables before closing out the sessions

In [45]:
for table in cassandra_config['tables']:
    CassandraUtils.drop_table(session, table)

### Close the session and cluster connection

In [None]:
cass.disconnect(session)