# MIDS W205 Project 3 Report - Ruby Han (Spring 2021)

## Summary

- In this project, you're a data scientist at a game development company trying to understand user behavior. 
- Your latest mobile game has three events you're interested in tracking: `purchase_a_sword`, `join_guild` and `purchase_an_axe` in which each has its metadata characteristic (i.e. axe type, guild name etc.)

## Two Approaches
User behavior was analyzed using two approaches:
- Jupyter Notebook - Elaborated in this notebook
- Command Line Interface (CLI) - Elaborated in `console_approach.md` file

## 1.0 Jupyter Notebook Approach
### Data Pipeline

This section provides a high-level overview on tools/platforms used to extract, query and process data.

- **Docker:** Provides contained environment to work in
- **Kafka:** Message broker/Ingests data in a topic (**events** in this case)
- **Flask:** Builds the game engine API
- **Apache Bench:** Generates test data which will be used in the below analyses
- **Spark:** Processes and reads data from Kafka in JSON format and extracts it
- **HDFS (Hadoop Distributed File System):** Stores data in a format that is retrievable by other data scientists to query events questions and answer business questions

The below section will dive deep into multiple sets of commands used to process data and spinning up the pipeline.

### Linux Command Lines

This section outlines housekeeping and set-up of docker containers

### $\underline{\text{Perform below commands only once:}}$
- Copy in yml file
```
cp ~/w205/course-content/12-Querying-Data-II/docker-compose.yml .
```
- Copy game_api.py file
```
cp ~/w205/course-content/12-Querying-Data-II/game_api.py .
```
- Add events to game_api.py file
```
vi game_api.py
```
- Added the below events

```python
@app.route("/join_guild")
def join_guild():
    join_guild_event = {'event_type': 'join_guild', 'description': 'large guild'}
    log_to_kafka('events', join_guild_event)
    return "Guild Joined!\n"

@app.route("/purchase_an_axe")
def purchase_an_axe():
    purchase_axe_event = {'event_type': 'purchase_axe', 'description': 'large axe'}
    log_to_kafka('events', purchase_axe_event)
    return "Axe Purchased!\n"
```
- Create shell script (ab.sh) to automate seeding of events streaming

```bash
#!/bin/bash
 
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_a_guild
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_a_guild
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_an_axe
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_an_axe
```

### Docker
- Change into project 3 directory
```
cd ~w205/project-3-ruby-han/
```
- Bring up cluster and check
```
docker-compose up -d
```
- Check stray containers
```
docker-compose ps
```
- Remove stray containers
```
docker rm -f <NAMES>
```
- Do not remove below
```
3a417c3d09e7   gcr.io/inverting-proxy/agent       "/bin/sh -c '/opt/bi…"   4 hours ago         Up 4 hours                                                              proxy-agent
```
- Check containers
```
docker ps -a
```
- Should output below if correct
```
CONTAINER ID   IMAGE                              COMMAND                  CREATED             STATUS             PORTS                                                NAMES
51b139c1746f   confluentinc/cp-kafka:latest       "/etc/confluent/dock…"   About an hour ago   Up About an hour   9092/tcp, 29092/tcp                                  project-3-ruby-han_kafka_1
52a4cadf803f   midsw205/spark-python:0.0.6        "docker-entrypoint.s…"   About an hour ago   Up About an hour   0.0.0.0:8888->8888/tcp                               project-3-ruby-han_spark_1
c3ee9c2c584e   confluentinc/cp-zookeeper:latest   "/etc/confluent/dock…"   About an hour ago   Up About an hour   2181/tcp, 2888/tcp, 3888/tcp, 32181/tcp              project-3-ruby-han_zookeeper_1
36afcb5f1f5c   midsw205/hadoop:0.0.2              "/usr/bin/docker-ent…"   About an hour ago   Up About an hour   8020/tcp, 8888/tcp, 9083/tcp, 10000/tcp, 50070/tcp   project-3-ruby-han_cloudera_1
df30d4f2a911   midsw205/base:0.1.9                "/bin/bash"              About an hour ago   Up About an hour   0.0.0.0:5000->5000/tcp, 8888/tcp                     project-3-ruby-han_mids_1
158f599295d7   midsw205/presto:0.0.1              "/usr/bin/docker-ent…"   About an hour ago   Up About an hour   8080/tcp                                             project-3-ruby-han_presto_1
3a417c3d09e7   gcr.io/inverting-proxy/agent       "/bin/sh -c '/opt/bi…"   4 hours ago         Up 4 hours                                                              proxy-agent
```
- To spin down cluster
```
docker-compose down
```
- Verify clusters are down
```
docker-compose ps
docker ps -a
```

### Kafka
- Create a topic named "events"
```
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```

### Flask
- Run Flask
```
docker-compose exec mids env FLASK_APP=/w205/project-3-ruby-han/game_api.py flask run --host 0.0.0.0
```

### Apache Bench
- Switch to new terminal - navigate to project-3 directory
```
cd w205/project-3-ruby-han/
```
- Stream events by running shell script
```
sh ab.sh
```

### Apache Spark in Jupyter Notebook
- Check docker-compose.yml file to ensure that Spark container has an expose section and ports section with below:
```
expose:
  - "8888"
ports:
  - "8888:8888"
```
- Execute a bash shell into Spark container
```
docker-compose exec spark bash
```
- Create symbolic link from Spark directory to /w205
```
ln -s /w205 w205
```
- Exit container
```
exit
```
- Use Jupyter Notebook for Pyspark kernel and set IP address to 0.0.0.0
```
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark
```
- Copy URL and change IP address to external IP address from Google Cloud VM to access Jupyter Notebook using Google Chrome brower in incognito mode
```
http://35.247.22.116:8888/?token=e8869fcc1b03b41aad26041e2efa208763aa9c1841cb58fa
```

### Hadoop
- Check Hadoop is running correctly
```
docker-compose exec cloudera hadoop fs -ls /tmp/
```
- Should output below if it Hadoop is running correctly
```
Found 2 items
drwxrwxrwt   - mapred mapred              0 2018-02-06 18:27 /tmp/hadoop-yarn
drwx-wx-wx   - hive   supergroup          0 2021-04-11 03:52 /tmp/hive
```

## Spark Streaming

- Filter select event types from Kafka, land them into HDFS/parquet to make them available for analysis using Presto (this will be done through the second approach using the console and can be found in `console_approach.md` in the repo)

In [1]:
import json
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf

import pandas as pd

## Read from Kafka using Spark

### 1.0 Filter for select "purchase_sword" events

In [2]:
# purchase_sword
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

def main():
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .enableHiveSupport() \
        .getOrCreate()

    # batch mode - load raw events
    raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

    # filter for purchase_sword events
    purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

    # lambda function to unnest purchase events table
    extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()
    
    # print extracted_purchase_events schema
    extracted_purchase_events.printSchema()
    
    # show extracted_purchase_events
    extracted_purchase_events.show()
        
    # register extracted_purchase_events to temp table
    extracted_purchase_events.registerTempTable("extracted_purchase_events")
            
    # overwrite existing file 
    extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')
    
    
if __name__ == "__main__":
    main()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)

+------+-----------------+---------------+--------------+--------------------+
|Accept|             Host|     User-Agent|    event_type|           timestamp|
+------+-----------------+---------------+--------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   *

### Query with Spark SQL

In [3]:
# query sword purchases
purchases_by_example2 = spark.sql("select * from extracted_purchase_events") #where Host = 'user1.comcast.com'")

# show table
purchases_by_example2.show()

+------+-----------------+---------------+--------------+--------------------+
|Accept|             Host|     User-Agent|    event_type|           timestamp|
+------+-----------------+---------------+--------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_s

In [4]:
# convert to pd dataframme
df = purchases_by_example2.toPandas()
df

Unnamed: 0,Accept,Host,User-Agent,event_type,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2021-04-11 03:51:31.057
1,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2021-04-11 03:51:31.062
2,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2021-04-11 03:51:31.063
3,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2021-04-11 03:51:31.066
4,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2021-04-11 03:51:31.068
5,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2021-04-11 03:51:31.069
6,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2021-04-11 03:51:31.071
7,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2021-04-11 03:51:31.073
8,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2021-04-11 03:51:31.074
9,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2021-04-11 03:51:31.076


## Analyses on `purchase_sword` events

In [5]:
# describe df
df.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,timestamp
count,20,20,20,20,20
unique,1,2,1,1,20
top,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2021-04-11 03:51:31.508
freq,20,10,20,20,1


## Business Q1: How many users are there and what are their host names in "purchase_sword" events?

In [6]:
print("Number of Users:", len(df.Host.unique()), "\nUser Host Name(s):", *df.Host.unique())

Number of Users: 2 
User Host Name(s): user1.comcast.com user2.att.com


## Business Q2: When did these events occur?

In [7]:
print(*pd.to_datetime(df['timestamp']).dt.date.unique())

2021-04-11


### 2.0 Filter for select "purchase_axe" events

In [8]:
# purchase_axe
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_axe':
        return True
    return False

def main():
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .enableHiveSupport() \
        .getOrCreate()

    # load raw events
    raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

    # filter for purchase_sword events
    purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

    # lambda function to unnest purchase events table
    extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()
    
    # print extracted_purchase_events schema
    extracted_purchase_events.printSchema()
    
    # show extracted_purchase_events
    extracted_purchase_events.show()

    # register extracted_purchase_events to temp table
    extracted_purchase_events.registerTempTable("extracted_purchase_events_axe")
    
    # overwrite existing file
    extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases_axe')

if __name__ == "__main__":
    main()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- description: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)

+------+-----------------+---------------+-----------+------------+--------------------+
|Accept|             Host|     User-Agent|description|  event_type|           timestamp|
+------+-----------------+---------------+-----------+------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|  large axe|purchase_axe|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|  large axe|purchase_axe|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|  large axe|purchase_axe|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|  large axe|purchase_axe|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|  large axe|purchase_axe|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.

### Query with Spark SQL

In [9]:
# query sword purchases
purchases_by_example3 = spark.sql("select * from extracted_purchase_events_axe") #where Host = 'user1.comcast.com'")

# show table
purchases_by_example3.show()

+------+-----------------+---------------+-----------+------------+--------------------+
|Accept|             Host|     User-Agent|description|  event_type|           timestamp|
+------+-----------------+---------------+-----------+------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|  large axe|purchase_axe|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|  large axe|purchase_axe|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|  large axe|purchase_axe|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|  large axe|purchase_axe|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|  large axe|purchase_axe|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|  large axe|purchase_axe|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|  large axe|purchase_axe|2021-04-11 03:51:...|
|   */*|user1.comcast.com|ApacheBench/2.3|  large axe|purchase_axe|2021-04-11 03:51:...|
|   */*|user1.comcast

In [10]:
# convert to pd dataframme
df2 = purchases_by_example3.toPandas()
df2

Unnamed: 0,Accept,Host,User-Agent,description,event_type,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,large axe,purchase_axe,2021-04-11 03:51:32.564
1,*/*,user1.comcast.com,ApacheBench/2.3,large axe,purchase_axe,2021-04-11 03:51:32.567
2,*/*,user1.comcast.com,ApacheBench/2.3,large axe,purchase_axe,2021-04-11 03:51:32.57
3,*/*,user1.comcast.com,ApacheBench/2.3,large axe,purchase_axe,2021-04-11 03:51:32.571
4,*/*,user1.comcast.com,ApacheBench/2.3,large axe,purchase_axe,2021-04-11 03:51:32.573
5,*/*,user1.comcast.com,ApacheBench/2.3,large axe,purchase_axe,2021-04-11 03:51:32.575
6,*/*,user1.comcast.com,ApacheBench/2.3,large axe,purchase_axe,2021-04-11 03:51:32.576
7,*/*,user1.comcast.com,ApacheBench/2.3,large axe,purchase_axe,2021-04-11 03:51:32.578
8,*/*,user1.comcast.com,ApacheBench/2.3,large axe,purchase_axe,2021-04-11 03:51:32.58
9,*/*,user1.comcast.com,ApacheBench/2.3,large axe,purchase_axe,2021-04-11 03:51:32.582


## Analyses on `purchase_axe` events

In [11]:
# describe df2
df2.describe()

Unnamed: 0,Accept,Host,User-Agent,description,event_type,timestamp
count,20,20,20,20,20,20
unique,1,2,1,1,1,20
top,*/*,user1.comcast.com,ApacheBench/2.3,large axe,purchase_axe,2021-04-11 03:51:32.945
freq,20,10,20,20,20,1


## Business Q1: How many users are there and what are their host names in "purchase_axe" events?

In [12]:
print("Number of Users:", len(df2.Host.unique()), "\nUser Host Name(s):", *df2.Host.unique())

Number of Users: 2 
User Host Name(s): user1.comcast.com user2.att.com


## Business Q2: What type of axe did users purchase?

In [13]:
print(*df2.description.unique())

large axe


### Other Events

- Similar analyses could be performed for `join_guild` in order to pull from Kafka logs as above

#### Verify files wrote to HDFS/parquet by running below commands in terminal
```
docker-compose exec cloudera hadoop fs -ls /tmp/
docker-compose exec cloudera hadoop fs -ls /tmp/purchases
docker-compose exec cloudera hadoop fs -ls /tmp/purchases_axe
```

**Output from terminal:**

```
(base) jupyter@test1:~/w205/project-3-ruby-han$ docker-compose exec cloudera hadoop fs -ls /tmp/
Found 4 items
drwxrwxrwt   - mapred mapred              0 2018-02-06 18:27 /tmp/hadoop-yarn
drwx-wx-wx   - root   supergroup          0 2021-04-05 03:31 /tmp/hive
drwxr-xr-x   - root   supergroup          0 2021-04-05 04:16 /tmp/purchases
drwxr-xr-x   - root   supergroup          0 2021-04-05 04:16 /tmp/purchases_axe
(base) jupyter@test1:~/w205/project-3-ruby-han$ docker-compose exec cloudera hadoop fs -ls /tmp/purchases
Found 2 items
-rw-r--r--   1 root supergroup          0 2021-04-05 04:16 /tmp/purchases/_SUCCESS
-rw-r--r--   1 root supergroup       1645 2021-04-05 04:16 /tmp/purchases/part-00000-d81525e8-6a25-43f3-aa23-2b970c997601-c000.snappy.parquet
(base) jupyter@test1:~/w205/project-3-ruby-han$ docker-compose exec cloudera hadoop fs -ls /tmp/purchases_axe
Found 2 items
-rw-r--r--   1 root supergroup          0 2021-04-05 04:16 /tmp/purchases_axe/_SUCCESS
-rw-r--r--   1 root supergroup       1896 2021-04-05 04:16 /tmp/purchases_axe/part-00000-01aa2ce8-8744-4f2f-a2f9-e62af9258d2c-c000.snappy.parquet
```