# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import csv
import glob
import os

#### Creating list of filepaths to process original event csv data files

In [2]:
# Check current working directory
print(os.getcwd())

# Get current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# Join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    # print(file_path_list)

/Users/tomas.rojo/Dropbox/Documents/learning/udacity/dend/02_data_modeling/05_project_data_modeling_with_apache_cassandra/cassandra_data_modeling


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# Initiate an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# For every filepath in the file path list 
for f in file_path_list:

# Read csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # Extract each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# Get total number of rows 
print(len(full_data_rows_list))
# Print first row
print(full_data_rows_list[0])

# Create a smaller event data csv file called event_datafile_full csv that will be used to insert data into tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


8056
['Harmonia', 'Logged In', 'Ryan', 'M', '0', 'Smith', '655.77751', 'free', 'San Jose-Sunnyvale-Santa Clara, CA', 'PUT', 'NextSong', '1.54102E+12', '583', 'Sehr kosmisch', '200', '1.54224E+12', '26']


In [4]:
# Check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

**Note**: I tested this notebook on a local instance of Apache Cassandra, where
I had set `authenticator: PasswordAuthenticator` and 
`authorizer: CassandraAuthorizer` in `cassandra.yaml` in order to gain access 
to the `cassandra` superuser so that I could create a `student` user. Hence the 
`PlainTextAuthProvider`.

If running on a local version of cassandra, you can connect to your local
cluster without needing `auth_provider`.

In [5]:
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster

# Use the student:student toy user
auth = PlainTextAuthProvider(username='student', password='student')
cluster = Cluster(['127.0.0.1'], auth_provider=auth)

# Start session
session = cluster.connect()

#### Create Keyspace

In [6]:
# Create a Keyspace 
try:
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS sparkifydb
        WITH REPLICATION = {
            'class': 'SimpleStrategy',
            'replication_factor': 1
        }
    """)
except Exception as e:
    print(e)

#### Set Keyspace

In [7]:
# Set keyspace
try:
    session.set_keyspace('sparkifydb')
except Exception as e:
    print(e)

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




### 1. Song information for a given sessionId and itemInSession

In [8]:
# Create a table which will return song information by sessionId and itemInSession

create_song_by_session_and_item_table = """
    CREATE TABLE IF NOT EXISTS song_by_session_and_item (
        sessionId            INT,
        itemInSession        INT,
        artist               TEXT,
        song                 TEXT,
        length               FLOAT,
        PRIMARY KEY((sessionId, itemInSession))
    );
"""      

session.execute(create_song_by_session_and_item_table)

<cassandra.cluster.ResultSet at 0x7f961201b160>

In [9]:
# Parse the datafile csv and insert rows into new table
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
# Define insert query
        query = """
            INSERT INTO song_by_session_and_item (sessionId, itemInSession, artist, song, length)
            VALUES (%s, %s, %s, %s, %s)
        """
        # Select relevant items from row and insert record
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

For this first query we want to find some information (artist, song, length)
about a song played in a particular `sessionId` and in position `itemInSession`.
Both fields used to filter are therefore part of the table's primary key. 

We chose to go for a composite partition key under the assumption that sessions
can vary greatly in length and in order to distribute the data better across the
nodes we can use the combination of `sessionId` and `itemInSession`. That being
said, the difference in session 'length' might be not that relevant and we could
have as well chosen a composite primary key made with `sessionId` as the
partition key and `itemInSession` as the clustering key.

In [10]:
## Select song information based on sessionId and itemInSession

select_song_by_session_and_item = """
    SELECT artist, song, length
    FROM song_by_session_and_item
    WHERE sessionId=(%s) AND itemInSession=(%s);
"""

rows = session.execute(select_song_by_session_and_item, (338, 4))

for row in rows:
    print(row)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', length=495.30731201171875)


### 2. Songs streamed by a given user in a given session, ordered by itemInSession

In [11]:
# Create a table which will return song information for a given user and session, order by item

create_songs_by_user_and_session_table = """
    CREATE TABLE IF NOT EXISTS songs_by_user_and_session (
        userId                 INT,
        sessionId              INT,
        itemInSession          INT,
        artist                 TEXT,
        song                   TEXT,
        first_name             TEXT,
        last_name              TEXT,
        PRIMARY KEY((userId, sessionId), itemInSession)
    );
"""

session.execute(create_songs_by_user_and_session_table)

<cassandra.cluster.ResultSet at 0x7f96136e8f40>

In [12]:
# Parse the datafile csv and insert rows into new table
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
# Define insert query
        query = """
            INSERT INTO songs_by_user_and_session (userId, sessionId, itemInSession, artist, song, first_name, last_name)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
        """
        # Select relevant items from row and insert record
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

The second query requires us to return all songs that a particular user streamed
in a given session, in the order they were played (captured in `itemInSession`).

Our primary key is made up of a composite partition key based on `userId` and
`sessionId` and a clustering key based on `itemInSession` to ensure the songs
are displayed in the correct order.

The reason we chose a composite partition key is that the number of sessions per
user varies greatly. Since we want to distribute our data evenly across the
cluster we want to avoid very large nodes containing all sessions for a given
'super-user'. 

In [13]:
# Select songs and user name for a given user and sessionId

select_songs_by_user_and_session = """
    SELECT artist, song, first_name, last_name
    FROM songs_by_user_and_session
    WHERE userId=(%s) AND sessionId=(%s)
"""

rows = session.execute(select_songs_by_user_and_session, (10, 182))

for row in rows:
    print(row)

Row(artist='Down To The Bone', song="Keep On Keepin' On", first_name='Sylvie', last_name='Cruz')
Row(artist='Three Drives', song='Greece 2000', first_name='Sylvie', last_name='Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', first_name='Sylvie', last_name='Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', first_name='Sylvie', last_name='Cruz')


### 3. List all users that listened to a specific song

In [14]:
# Create a table which will return the users that listened to a specific song

create_users_by_song_table = """
    CREATE TABLE IF NOT EXISTS users_by_song (
        song            TEXT,
        userId          INT,
        sessionId       INT,
        itemInSession   INT,
        first_name      TEXT,
        last_name       TEXT,
        PRIMARY KEY(song, userId)
    )
"""

session.execute(create_users_by_song_table)

<cassandra.cluster.ResultSet at 0x7f96136d0190>

In [15]:
# Parse the datafile csv and insert rows into new table
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
# Define insert query
        query = """
            INSERT INTO users_by_song (song, userId, sessionId, itemInSession, first_name, last_name)
            VALUES (%s, %s, %s, %s, %s, %s)
        """
        # Select relevant items from row and insert record
        session.execute(query, (line[9], int(line[10]), int(line[8]), int(line[3]), line[1], line[4]))

For this particular query we are going to utilise the fact that in Apache
Cassandra all inserts are actually insert/update operations which means that if
there is a row with the same primary key that already exists it gets
overwritten. This is going to come in quite handy since we only care about
whether a user has listened to a given song or not, we don't need to know how
many times they have listened to it. So, by using a primary key made up of a
composite primary key with `song` as partition key and `userId` as clustering key.

In [16]:
# Select all users that have listened to a specific song

select_users_by_song = """
    SELECT first_name, last_name
    FROM users_by_song
    WHERE song='All Hands Against His Own';
"""

rows = session.execute(select_users_by_song)

for row in rows:
    print(row)

Row(first_name='Jacqueline', last_name='Lynch')
Row(first_name='Tegan', last_name='Levine')
Row(first_name='Sara', last_name='Johnson')


### Drop the tables before closing out the sessions

In [18]:
# Drop all tables
for table in ['song_by_session_and_item', 'songs_by_user_and_session', 'users_by_song']:
    session.execute(f'DROP TABLE {table};')

### Close the session and cluster connection¶

In [19]:
# Shutdown session and cluster
session.shutdown()
cluster.shutdown()