# Data Modeling with Apache Cassandra

#### Import Python packages 

In [46]:
import os

import pandas as pd

from file_mgr import FileMgr
from cassandra_mgr import CassandraMgr

#### Configurations

Cassandra configuration:

In [47]:
cassandra_config = {
    'ip' : ['127.0.0.1'],
    'replication_factor' : '1',
    'replication_class' : 'SimpleStrategy',
    'key_space' : 'sparkify',
    'tables' : ['music_sessions', 'music_users', 'music_songs']
}

Event file configuration:

In [48]:
event_directory: str = os.getcwd() + '/event_data'
    
event_file_path = 'event_datafile_new.csv'
   
csv_columns_interested = ['artist','firstName','gender','itemInSession','lastName','length', \
                          'level','location','sessionId','song','userId']

pandas_events_schema = {
    csv_columns_interested[0]:'str',
    csv_columns_interested[1]:'str',
    csv_columns_interested[2]:'str',
    csv_columns_interested[3]:'int',
    csv_columns_interested[4]:'str',
    csv_columns_interested[5]:'float',
    csv_columns_interested[6]:'str',
    csv_columns_interested[7]:'str',
    csv_columns_interested[8]:'int32',
    csv_columns_interested[9]:'str',
    csv_columns_interested[10]:'int32',
}

#### Creating list of filepaths to process original event csv data files

In [49]:
file_path_list = FileMgr.get_directory_files_list(event_directory)

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [50]:
df_total = FileMgr.files_to_pd(file_path_list)

df_total_interested = df_total[csv_columns_interested]
df_total_interested = df_total_interested.dropna(subset=['artist'])

FileMgr.pd_to_file(event_file_path, df_total_interested)

#### Check event file rows

In [51]:
print(FileMgr.file_num_rows(event_file_path))

6821


#### Read the event file into a pandas dataframe

In [52]:
df = FileMgr.read_file_to_pd(event_file_path, pandas_events_schema)


### Now the csv file <font color=red>event_datafile_new.csv</font> is located within the Workspace directory. 

The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster, create and set Keyspace

In [53]:
cass = CassandraMgr(cassandra_config)
session = cass.connect()

### Now we need to create tables to run the following queries.

Remember, with Apache Cassandra you model the database tables on the queries you want to run.

### Query 1: Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

#### Primary key
It is adapted to the the filter of the query. Both fields are part of the partition key. 

In [54]:
primary_key1 = "(session_id, item_in_session)"

#### Create table
The fields required are:
- session_id: bigint. It's possible to have a lot of sessions in a future.
- item_in_session: int. There's no many items into a session.
- artist: text
- song_title: text
- song_length: decimal

In [55]:
fields1 = ['session_id bigint', 'item_in_session int', 'artist text', 'song_title text','song_length float']

CassandraMgr.create_table(session, cassandra_config['tables'][0], fields1, primary_key1)                   

#### Insert info into table

In [56]:
columns_query1 = ['session_id', 'item_in_session', 'artist', 'song_title', 'song_length']
projection_query1 = ['sessionId', 'itemInSession', 'artist', 'song', 'length']

df1 = df[projection_query1]

CassandraMgr.insert_cassandra_from_df(session, cassandra_config['tables'][0], columns_query1, df1)   

#### SELECT to verify that the data have been inserted well

In [57]:
fields = ['artist', 'song_title', 'song_length']
filters = "session_id = 338 and item_in_session = 4"
rows = CassandraMgr.select(session, fields, cassandra_config['tables'][0], filters)
    
for row in rows:
    print (row.artist, row.song_title, row.song_length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


### Query 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

#### Primary key
It is adapted to the the filter of the query. Either user_id or session_id are part of the PARTITION KEY. I have used item_in_session as CLUSTERED COLUMN because we need the result sortered by that field.

In [58]:
primary_key2 = "((user_id, session_id), item_in_session)"

#### Create table
The fields required are:
- user_id: bigint. It's possible to have a lot of users in a future.
- session_id: bigint. It's possible to have a lot of sessions in a future.
- item_in_session: int. There's no many items into a session.
- artist: text
- song_title: text
- first_name: text
- last_name: text

In [59]:
fields2 = ['user_id bigint', 'session_id bigint', 'item_in_session int', 'artist text', \
                'song_title text', 'first_name text', 'last_name text']

CassandraMgr.create_table(session, cassandra_config['tables'][1], fields2, primary_key2)                          

#### Insert info into table

In [60]:
columns_query2 = ['user_id', 'session_id', 'item_in_session', 'artist', 'song_title', 'first_name', 'last_name']
projection_query2 = ['userId', 'sessionId', 'itemInSession', 'artist', 'song', 'firstName', 'lastName']

df2 = df[projection_query2]

CassandraMgr.insert_cassandra_from_df(session, cassandra_config['tables'][1], columns_query2, df2)   

#### SELECT to verify that the data have been inserted well

In [61]:
fields = ['artist', 'song_title', 'first_name', 'last_name']
filters = "user_id = 10 and session_id = 182"
rows = CassandraMgr.select(session, fields, cassandra_config['tables'][1], filters)
    
for row in rows:
    print (row.artist, row.song_title, row.first_name, row.last_name)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


### Query 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### Primary key

It is adapted to the the filter of the query. Song is the PARTITION KEY, but it is not unique and we need more columns to make a unique primary key. We use artist, length and user_id as CLUSTERED COLUMNS.

In [62]:
primary_key3 = "((song), user_id)"

#### Create table
The fields required are:
- song: text
- user_id bigint
- first_name: text
- last_name: text

In [63]:
fields3 = ['song text', 'user_id bigint', 'first_name text', 'last_name text']

CassandraMgr.create_table(session, cassandra_config['tables'][2], fields3, primary_key3)                                              

#### Insert info into table

In [64]:
columns_query3 = ['song', 'user_id', 'first_name', 'last_name']
projection_query3 = ['song', 'userId', 'firstName', 'lastName']

df3 = df[projection_query3]

CassandraMgr.insert_cassandra_from_df(session, cassandra_config['tables'][2], columns_query3, df3)   

#### SELECT to verify that the data have been inserted well

In [65]:
fields = ['first_name', 'last_name']
filters = "song= 'All Hands Against His Own'"
rows = CassandraMgr.select(session, fields, cassandra_config['tables'][2], filters)
    
for row in rows:
    print (row.first_name, row.last_name)

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Drop the tables before closing out the sessions

In [66]:
CassandraMgr.drop_table(session, cassandra_config['tables'][0])

In [67]:
CassandraMgr.drop_table(session, cassandra_config['tables'][1])

In [68]:
CassandraMgr.drop_table(session, cassandra_config['tables'][2])

### Close the session and cluster connection¶

In [69]:
cass.disconnect(session)