# Data modeling with Cassandra

A startup called Sparkify wants to analyze the data they've been collecting on songs and user activity on their new music streaming app. The analysis team is particularly interested in understanding what songs users are listening to. Currently, there is no easy way to query the data to generate the results, since the data reside in a directory of CSV files on user activity on the app.

In this project I will create an Apache Cassandra database which can create queries on song play data to answer the questions.

## Datasets

The dataset `event_data` is stored in the directory [event_data](event_data/)   which contains CSV files partioned by date. Here are examples of filepaths to two files in the dataset:

```
event_data/2018-11-08-events.csv
event_data/2018-11-09-events.csv
```

## ETL pipeline

![pipeline](images/pipeline.svg)

The figure above presents the ETL pipeline of project. It consists of three steps:

1. **E**xtract: Process CSV files in the directory [event_data](event_data/) to create a new CSV file.
2. **T**ransform: Create an Apache Cassandra database and transform data from the new CSV file into tables.
3. **L**oad: Do queries on database.

## 1. Process CSV files

### Import Python packages 

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

### Process CSV files

#### Create list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print("current working directory: %s" % os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    # join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))

current working directory: /mnt/d/GitHub/data-modeling-with-cassandra


#### Process the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# set filename to the new csv file
new_datafile = "event_datafile_new.csv"

In [4]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:
    # reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        # extracting each data row one by one and append it        
        for line in csvreader:
            full_data_rows_list.append(line) 

# check total rows
print("total rows: %s" % len(full_data_rows_list))

# creating a smaller event data csv file called event_datafile_new csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open(new_datafile, 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

total rows: 8056


In [5]:
# check the number of rows in your csv file
print("number of rows in csv file:", end=" ")
with open(new_datafile, 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

number of rows in csv file: 6821


## 2. Transform data into a Cassandra database

The image below is a screenshot of what the denormalized data should appear like in the `event_datafile_new.csv` after the processing of CSV files.<br>

<img src="images/image_event_datafile_new.jpg">

The `event_datafile_new.csv` contains the following columns: 
0. artist
1. firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

### 2.1. Initialize cluster and session

#### Create a Cluster

In [6]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)
from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

In [7]:
try:
    session.execute("CREATE KEYSPACE IF NOT EXISTS sparkify \
        WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}"
    )
except Exception as e:
    print(e)

#### Set Keyspace

In [8]:
try:
    session.set_keyspace('sparkify')
except Exception as e:
    print(e)

### 2.2. Create tables and insert data

In this step we will create tables to answer the following three questions of the data:
- Question 1: Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4
- Question 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
- Question 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### Question 1: Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

To answer this question, we will create the table `question_1` which consists of 5 columns:
- `artist`: stores the artist name, data type: text
- `song`: stores the song title, data type: text
- `length`: stores the song's length, data type: float
- `session_id`: stores the session id, data type: int
- `item_in_session`: stores the item number in session, data type: int

The partition key is `session_id`, and the column key is `item_in_session`.

##### Create table

In [9]:
query = "CREATE TABLE IF NOT EXISTS question_1 "
query += "(artist text, song text, length double, session_id int, item_in_session int, \
PRIMARY KEY (session_id, item_in_session))"
try:
    session.execute(query)
except Exception as e:
    print(e)    

##### Insert data

In [10]:
with open(new_datafile, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO question_1 (artist, song, length, session_id, item_in_session)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (line[0], line[9], float(line[5]), int(line[8]), int(line[3])))

#### Question 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

To answer this question, we will create the table `question_2` which consists of 7 columns:

- `artist`: stores the artist name, data type: text
- `song`: stores the song title, data type: text
- `session_id`: stores the session id, data type: int
- `item_in_session`: stores the item number in session, data type: int
- `user_id`: stores the user id, data type: int
- `first_name`: stores the first name of user, data type: text
- `last_name`: stores the last name of, data type: text

The partition key is `user_id`, and the column keys are `session_id` and `item_in_session`. Since we need songs sorted by item in section then `item_in_session` is added to column keys.

##### Create table

In [11]:
query = "CREATE TABLE IF NOT EXISTS question_2 "
query += "(artist text, song text, session_id int, item_in_session int, user_id int, first_name text, last_name text, \
PRIMARY KEY (user_id, session_id, item_in_session))"
try:
    session.execute(query)
except Exception as e:
    print(e)

##### Insert data

In [12]:
with open(new_datafile, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO question_2 (artist, song, session_id, item_in_session, user_id, first_name, last_name)"
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (line[0], line[9], int(line[8]), int(line[3]), int(line[10]), line[1], line[4]))  

#### Question 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

To answer this question, we will create the table `question_3` which consists of 4 columns:

- `song`: stores the song title, data type: text
- `user_id`: stores the user id, data type: int
- `first_name`: stores the first name of user, data type: text
- `last_name`: stores the last name of, data type: text

The partition key is `song`, and the column key is `user_id`. We add `user_id` to primary key to ensure the row data is unique.

##### Create table

In [13]:
query = "CREATE TABLE IF NOT EXISTS question_3 "
query += "(song text, user_id int, first_name text, last_name text, PRIMARY KEY (song, user_id))"
try:
    session.execute(query)
except Exception as e:
    print(e)

##### Insert data

In [14]:
with open(new_datafile, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO question_3 (song, user_id, first_name, last_name)"
        query = query + " VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

## 3. Do queries on database

### 3.1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

In [15]:
query = "SELECT * FROM question_1 WHERE session_id=338 and item_in_session=4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print("artist: {}, song: {}, length: {}".format(row.artist, row.song, row.length))

artist: Faithless, song: Music Matters (Mark Knight Dub), length: 495.3073


### 3.2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [16]:
query = "SELECT * FROM question_2 WHERE user_id=10 and session_id=182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print("artist: {}, song: {} (item in section: {}), user: {} {}".format(
            row.artist, row.song, row.item_in_session, row.first_name, row.last_name)
         )

artist: Down To The Bone, song: Keep On Keepin' On (item in section: 0), user: Sylvie Cruz
artist: Three Drives, song: Greece 2000 (item in section: 1), user: Sylvie Cruz
artist: Sebastien Tellier, song: Kilometer (item in section: 2), user: Sylvie Cruz
artist: Lonnie Gordon, song: Catch You Baby (Steve Pitron & Max Sanna Radio Edit) (item in section: 3), user: Sylvie Cruz


### 3.3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [17]:
query = "SELECT * FROM question_3 WHERE song='All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print("user: {} {}, user_id: {}".format(row.first_name, row.last_name, row.user_id))

user: Jacqueline Lynch, user_id: 29
user: Tegan Levine, user_id: 80
user: Sara Johnson, user_id: 95


## 4. Drop the tables

In [18]:
tables = ["question_1", "question_2", "question_3"]
for table in tables:
    query = "drop table " + table
    try:
        session.execute(query)
    except Exception as e:
        print(e)

## 5. Close the session and cluster connection¶

In [19]:
session.shutdown()
cluster.shutdown()