# Project: Data Modeling with Cassandra

A startup called Sparkify wants to analyze the data they've been collecting on songs and user activity on their new music streaming app. The analysis team is particularly interested in understanding what songs users are listening to. Currently, there is no easy way to query the data to generate the results, since the data reside in a directory of CSV files on user activity on the app.

They'd like a data engineer to create an Apache Cassandra database which can create queries on song play data to answer the questions, and wish to bring you on the project. Your role is to create a database for this analysis. You'll be able to test your database by running queries given to you by the analytics team from Sparkify to create the results.

## Part I: The ETL pipeline

In this part we'll read a bunch of raw CSV files, parse them and store the result in a new aggregate CSV file.

### Import modules

We must import all the modules on which we rely at the beginning of the notebook.

In [None]:
import glob
import csv
import os
import pandas as pd

from cassandra.cluster import Cluster
from cassandra.query import BatchStatement, BatchType, ConsistencyLevel

### Constants

It's important to avoid hardcoded magic values along the code. It's always a good practice to keep them in constants.

In [None]:
# The current working directory.
HERE = os.getcwd()

# The directory where the raw CSV files are stored.
INPUT_DIR = os.path.join(HERE, "event_data")

# The pattern to use for recursive raw CSV file search.
INPUT_DIR_SEARCH_PATTERN = os.path.join(INPUT_DIR, "**", "*.csv")

# The path to the aggregate CSV.
OUTPUT_CSV = os.path.join(HERE, "event_datafile_new.csv")

# The size of the chunks in which the CSV files will be loaded.
CHUNK_SIZE = 100

### Aggregate CSV creation

We aggregate multiple raw CSV files into one that can be used as master.

In [None]:
read = 0
written = 0

# Prepare the aggregate CSV for writing.
with open(OUTPUT_CSV, "w", encoding="utf8", newline="") as output_file:

    writer = csv.writer(
        output_file,
        quoting=csv.QUOTE_ALL,
        skipinitialspace=True
    )

    # Write a first row that represents the header.
    writer.writerow((
        "artist_name",
        "user_first_name",
        "item_in_session",
        "user_last_name",
        "song_length",
        "session_id",
        "song_title",
        "user_id"
    ))

    # Walk down through the directory where the raw CSV files are stored.
    for path in glob.glob(INPUT_DIR_SEARCH_PATTERN, recursive=True):

        # Prepare one by one for reading.
        with open(path, "r", encoding="utf8", newline="") as input_file:

            reader = csv.reader(input_file)

            # Every raw CSV file has a header that we must skip.
            next(reader)

            # Once skipped, we iterate over the remaining rows.
            for row in reader:

                read += 1

                # Check if the current event is music related.
                if row[10] == "NextSong":

                    # If it is, we write it down to the aggregate CSV file (only the columns we need).
                    writer.writerow((
                        row[0],
                        row[2],
                        row[4],
                        row[5],
                        row[6],
                        row[12],
                        row[13],
                        row[16]
                    ))

                    written += 1

print("Read: {}\nWritten: {}\nDiscarded: {}".format(read, written, read - written))

## Part II: Working with Cassandra

In this part we'll read the aggregate CSV file an push the rows to the Cassandra's cluster. Later we'll query it to check that the job was successfully accomplished.

### Initializing the database

At this point, we initialize the database. It consist in the following three steps:

1. Connect with the Cassandra's cluster
2. Create the keyspace if it doesn't exist yet
3. Set the created keyspace in the current cluster session

In [None]:
try:

    # Connect with the cluster.
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()

    # Create the keyspace.
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS sparkify
        WITH REPLICATION = {
            'class': 'SimpleStrategy',
            'replication_factor': 1
        }
    """)

    # Set the keyspace.
    session.set_keyspace("sparkify")

except Exception as e:
    print("Error while initializing the database")
    print(e)

### Songs by session

In order to execute queries like this:

> Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

We create the table `songs_by_session`. We set its primary key using the fields we later use to filter the records: `session_id` and `item_in_session`.

In [None]:
try:
    session.execute("""
        CREATE TABLE IF NOT EXISTS songs_by_session (
            session_id int,
            item_in_session int,
            artist_name text,
            song_title text,
            song_length float,
            PRIMARY KEY (
                session_id,
                item_in_session
            )
        )
    """)
except Exception as e:
    print(e)

### Songs by user

Given the following query as an example:

> Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

We create the table `songs_by_user`. We use the fields `user_id`, `session_id` and `item_in_session` as its primary key: the first two fields will be used in the `WHERE` clause, while the third one will be used for sorting.

In [None]:
try:
    session.execute("""
        CREATE TABLE IF NOT EXISTS songs_by_user (
            user_id int,
            session_id int,
            item_in_session int,
            artist_name text,
            song_title text,
            user_first_name text,
            user_last_name text,
            PRIMARY KEY (
                user_id,
                session_id,
                item_in_session
            )
        )
    """)
except Exception as e:
    print(e)

### Users by song

And last but not least, the table `users_by_song`.

> Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

This is the simplest table of the given three. In fact, it has no composite primary key like the previous tables; instead, we use `song_title` as primary key.

In [None]:
try:
    session.execute("""
        CREATE TABLE IF NOT EXISTS users_by_song (
            song_title text,
            user_first_name text,
            user_last_name text,
            PRIMARY KEY (
                song_title
            )
        )
    """)
except Exception as e:
    print(e)

### Batch processing

Because the size of our data can be huge, we must take care of the performance while working with databases. One thing we can do is to bulk insert the events in the cluster, instead of insert them one by one. Cassandra support batch operations, so we are going to take advantage from it.

#### Table songs_by_session

Here is the insert query. Also, a list with the columns we will use from the aggregate CSV file.

In [None]:
# Prepare the insert query.
songs_by_session_insert_query = session.prepare("""
    INSERT INTO songs_by_session (
        session_id,
        item_in_session,
        artist_name,
        song_title,
        song_length
    )
    VALUES (?, ?, ?, ?, ?)
""")

# The columns from the aggregate CSV file we will use in the query.
songs_by_session_columns = [
    "session_id",
    "item_in_session",
    "artist_name",
    "song_title",
    "song_length"
]

#### Table songs_by_user

Here is the insert query. Also, a list with the columns we will use from the aggregate CSV file.

In [None]:
# Prepare the insert query.
songs_by_user_insert_query = session.prepare("""
    INSERT INTO songs_by_user (
        user_id,
        session_id,
        item_in_session,
        artist_name,
        song_title,
        user_first_name,
        user_last_name
    )
    VALUES (?, ?, ?, ?, ?, ?, ?)
""")

# The columns from the aggregate CSV file we will use in the query.
songs_by_user_columns = [
    "user_id",
    "session_id",
    "item_in_session",
    "artist_name",
    "song_title",
    "user_first_name",
    "user_last_name"
]

#### Table users_by_song

Here is the insert query. Also, a list with the columns we will use from the aggregate CSV file.

In [None]:
# Prepare the insert query.
users_by_song_insert_query = session.prepare("""
    INSERT INTO users_by_song (
        song_title,
        user_first_name,
        user_last_name
    )
    VALUES (?, ?, ?)
""")

# The columns from the aggregate CSV file we will use in the query.
users_by_song_columns = [
    "song_title",
    "user_first_name",
    "user_last_name"
]

#### Run the process

We're ready to take the aggregate CSV file into Cassandra.

In [None]:
batches = 0
records = 0

# Create a batch statement to handle bulk inserts.
batch = BatchStatement(
    batch_type=BatchType.UNLOGGED,
    consistency_level=ConsistencyLevel.ALL
)

# Process the aggregate CSV file in chunks.
for df in pd.read_csv(OUTPUT_CSV, chunksize=CHUNK_SIZE):

    # For every event in the current chunk...
    for _, row in df.iterrows():

        # ...we add the table songs_by_session's insert query to the batch
        batch.add(
            songs_by_session_insert_query,
            row[songs_by_session_columns].values.tolist()
        )

        # ...also the one for the table songs_by_user
        batch.add(
            songs_by_user_insert_query,
            row[songs_by_user_columns].values.tolist()
        )

        # ...and lastly the one for the table users_by_song
        batch.add(
            users_by_song_insert_query,
            row[users_by_song_columns].values.tolist()
        )
        
        records += 3

    try:
        # Execute all the queries in the batch.
        session.execute(batch)

    except Exception as e:
        print("Error while processing a batch statement")
        print(e)

    else:
        # Cassandra's batch processing is limited; we must empty it to reuse.
        batch.clear()
        batches += 1

print("Batches: {}\nRecords: {}".format(batches, records))

### The moment of truth

It's time to query Cassandra to check if the aggregate CSV file was successfully imported in the defined model.

#### Test 1

This query tests the following request:

> Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

In [None]:
try:
    rows = session.execute("""
        SELECT artist_name,
               song_title,
               song_length
          FROM songs_by_session
         WHERE session_id = %s
           AND item_in_session = %s
    """, (338, 4))
except Exception as e:
    print(e)
    
for row in rows:
    print("{} | {} | {}".format(
        row.artist_name,
        row.song_title,
        row.song_length
    ))

#### Test 2

This query tests the following request:

> Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [None]:
try:
    rows = session.execute("""
        SELECT artist_name,
               song_title,
               user_first_name,
               user_last_name
          FROM songs_by_user
         WHERE user_id = %s
           AND session_id = %s
    """, (10, 182))
except Exception as e:
    print(e)
    
for row in rows:
    print("{} | {} | {} {}".format(
        row.artist_name,
        row.song_title,
        row.user_first_name,
        row.user_last_name
    ))

#### Test 3

This query tests the following request:

> Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [None]:
try:
    rows = session.execute("""
        SELECT user_first_name,
               user_last_name
          FROM users_by_song
         WHERE song_title = %s
    """, ("All Hands Against His Own",))
except Exception as e:
    print("Error while initializing the database")
    print(e)
    
for row in rows:
    print("{} {}".format(
        row.user_first_name,
        row.user_last_name
    ))

### Drop the tables

Once the ETL process is tested, we can drop the tables used for the experiment.

In [None]:
try:
    session.execute("DROP TABLE songs_by_session")
    session.execute("DROP TABLE songs_by_user")
    session.execute("DROP TABLE users_by_song")
except Exception as e:
    print("Error while dropping the table")
    print(e)

### Close the session

And finally, we free the connection to the Cassandra's cluster.

In [None]:
session.shutdown()
cluster.shutdown()