# w205 Project 3 (Maria Auslander)

## Purpose of Project

The purpose of this project is to store and query game events using an API. This consists of the following tasks (taken from project statement):
* Send APU server log events to Kafka
* Create a data pipeline to capture events, using spark streaming to filter event types from Kafka and land them into HDFS and make them available for analysis using presto
* Use Apache Bench to create test data for the pipeline
* Create an analytics report which provides a description of the pipeline and an analysis of the events.

### Docker File

The docker file for this project is below:

```
---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    expose:
      - "2181"
      - "2888"
      - "32181"
      - "3888"
    extra_hosts:
      - "moby:127.0.0.1"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    expose:
      - "9092"
      - "29092"
    extra_hosts:
      - "moby:127.0.0.1"

  cloudera:
    image: midsw205/hadoop:0.0.2
    hostname: cloudera
    expose:
      - "8020" # nn
      - "8888" # hue
      - "9083" # hive thrift
      - "10000" # hive jdbc
      - "50070" # nn http
    ports:
      - "8888"
    extra_hosts:
      - "moby:127.0.0.1"

  spark:
    image: midsw205/spark-python:0.0.6
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205
    expose:
      - "8888"
    ports:
      - "8889:8888" # 8888 conflicts with hue
    depends_on:
      - cloudera
    environment:
      HADOOP_NAMENODE: cloudera
      HIVE_THRIFTSERVER: cloudera:9083
    extra_hosts:
      - "moby:127.0.0.1"
    command: bash

  presto:
    image: midsw205/presto:0.0.1
    hostname: presto
    volumes:
      - ~/w205:/w205
    expose:
      - "8080"
    environment:
      HIVE_THRIFTSERVER: cloudera:9083
    extra_hosts:
      - "moby:127.0.0.1"

  mids:
    image: midsw205/base:0.1.9
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205
    expose:
      - "5000"
    ports:
      - "5000:5000"
    extra_hosts:
      - "moby:127.0.0.1"
```

The process was started by spinning up the docker cluster

``` 
docker-compose up -d
``` 

Afterwards, I ran the app through flask:

```
docker-compose exec mids \
  env FLASK_APP=/w205/project-3-mariaauslander/game_api.py \
  flask run --host 0.0.0.0
```

The code for the game_api is listed below:

```
#!/usr/bin/env python
import json
from kafka import KafkaProducer
from flask import Flask, request
import random

app = Flask(__name__)
producer = KafkaProducer(bootstrap_servers='kafka:29092')


def log_to_kafka(topic, event):
    event.update(request.headers)
    producer.send(topic, json.dumps(event).encode())


@app.route("/")
def default_response():
    default_event = {'event_type': 'default'}
    log_to_kafka('events', default_event)
    return "This is the default response!\n"


@app.route("/purchase_a_sword")
def purchase_a_sword():
    sword_name=random.choice(['Zulfiqar','Joyeuse','Masamune','Curved Saber of San Martin','Durendal','Seven Branched Sword','Legbiter','Excalibur'])
    sword_type={'Zulfiqar':'Double bladed sword','Joyeuse':'Medieval coronation sword','Masamune':'Japanese sword created by Masamune','Curved Saber of San Martin':'Curved saber',
            'Durendal':'Medievel one-handed straight sword','Seven Branched Sword':'Japanese ceremonial sword','Legbiter':'gaddhjat sword from Norway','Excalibur':'sword of King Arthur'}
    purchase_sword_event = {'event_type': 'purchase_sword','item_type':'sword','item_name':sword_name,'item_desc':sword_type[sword_name]}
    log_to_kafka('events', purchase_sword_event)
    return "Sword Purchased!\n"

@app.route("/join_guild")
def join_guild():
    guild_name=random.choice(['corpus naviculariorum','Freemen of the City','Basoche','Za','Masonry','Carpentry','Bakery'])
    guild_type={'corpus naviculariorum':'guild of long-distance shippers in Rome','Freemen of the City':'Medieval guild whose members had freedom from serfdom',
            'Basoche':'French guild of legal clerks','Za':'Trade guild of Japan','Masonry':'Masonry guild','Carpentry':'Organization of carpenters','Bakery':'Organization of bakers'}
    join_guild_event={'event_type':'join_guild','item_type':'guild','item_name':guild_name,'item_desc':guild_type[guild_name]}
    log_to_kafka('events',join_guild_event)
    return "You joined a guild!\n"

@app.route("/choose_nemesis")
def choose_nemesis():
    nemesis_name=random.choice(['Ursula','Darth Vader','Loki','Lord Voldemort','Norman Bates'])
    nemesis_type={'Ursula':'Tormentor of the little mermaid','Darth Vader':'villain and evil ruler of the Star Wars franchise',
            'Loki':'Agitator of the Marvel Universe','Lord Voldemort':'Evil mind of the Harry Potter world','Norman Bates':'Fictional serial killer, a total psycho'}
    choose_nemesis_event={'event_type':'choose_nemesis','item_type':'nemesis','item_name':nemesis_name,'item_desc':nemesis_type[nemesis_name]}
    log_to_kafka('events',choose_nemesis_event)
    return "You have a nemesis!\n"
    
```

In a separate terminal window, I set up to watch the kafka topic "events":
```
docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning
```

Afterwards, in separate terminal windows, I set up streaming for three different event types:

<u>Sword Purchases</u>
```
docker-compose exec spark spark-submit /w205/project-3-mariaauslander/sword_stream.py
```

<u>Guilds Joined</u>
```
docker-compose exec spark spark-submit /w205/project-3-mariaauslander/guild_stream.py
```

<u>Nemeses Chosen</u>
```
docker-compose exec spark spark-submit /w205/project-3-mariaauslander/nemesis_stream.py
```

Afterwards, I made sure the folders wrote to hadoop:
```
docker-compose exec cloudera hadoop fs -ls /tmp/
```

```
Found 9 items
drwxrwxrwt   - root   supergroup          0 2020-04-10 02:16 /tmp/checkpoints_for_guilds_joined
drwxrwxrwt   - root   supergroup          0 2020-04-10 02:17 /tmp/checkpoints_for_nemeses_chosen
drwxrwxrwt   - root   supergroup          0 2020-04-10 02:16 /tmp/checkpoints_for_sword_purchases
drwxr-xr-x   - root   supergroup          0 2020-04-10 02:16 /tmp/guilds_joined
drwxrwxrwt   - mapred mapred              0 2016-04-06 02:26 /tmp/hadoop-yarn
drwx-wx-wx   - hive   supergroup          0 2020-04-10 02:10 /tmp/hive
drwxrwxrwt   - mapred hadoop              0 2016-04-06 02:28 /tmp/logs
drwxr-xr-x   - root   supergroup          0 2020-04-10 02:17 /tmp/nemeses_chosen
drwxr-xr-x   - root   supergroup          0 2020-04-10 02:16 /tmp/sword_purchases
```

Afterwards, I created the tables through hive
```
docker-compose exec cloudera hive
```

```
create external table if not exists default.sword_purchases (
    raw_event string,
    timestamp string,
    separator string,
    Host string,
    User_Agent string,
    event_type string,
    item_type string,
    item_name string,
    item_desc string
  )
  stored as parquet 
  location '/tmp/sword_purchases'
  tblproperties ("parquet.compress"="SNAPPY");
```

```
create external table if not exists default.nemeses_chosen (
    raw_event string,
    timestamp string,
    separator string,
    Host string,
    User_Agent string,
    event_type string,
    item_type string,
    item_name string,
    item_desc string
  )
  stored as parquet 
  location '/tmp/nemeses_chosen'
  tblproperties ("parquet.compress"="SNAPPY");
```

```
create external table if not exists default.guilds_joined (
    raw_event string,
    timestamp string,
    separator string,
    Host string,
    User_Agent string,
    event_type string,
    item_type string,
    item_name string,
    item_desc string
  )
  stored as parquet 
  location '/tmp/guilds_joined'
  tblproperties ("parquet.compress"="SNAPPY");
```

Once the streaming and tables were set up through hive, I queried them through presto

```
docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
```

```
show tables;

      Table      
-----------------
 guilds_joined   
 nemeses_chosen  
 sword_purchases 
```

### Apache Bench to Generate Data

With the tables set up, I added data using Apache Bench

The following commands were run to generate events as part of the data pipeline

<u>Default:</u>
```
docker-compose exec mids \
  ab \
    -n 10 \
    -H "Host: user1.comcast.com" \
    http://localhost:5000/
```

<u>Sword:</u>
```
docker-compose exec mids \
  ab \
    -n 10 \
    -H "Host: user1.comcast.com" \
    http://localhost:5000/purchase_a_sword
```

<u>Guild:</u>
```
docker-compose exec mids \
  ab \
    -n 10 \
    -H "Host: user1.comcast.com" \
    http://localhost:5000/join_guild
```

<u>Nemesis:</u>
```
docker-compose exec mids \
  ab \
    -n 10 \
    -H "Host: user1.comcast.com" \
    http://localhost:5000/choose_nemesis
```

To make sure the tables populated with ten rows each based on the commands above, I queried the tables using presto:

```
presto:default> select count(*) from sword_purchases;
 _col0 
-------
    10 
(1 row)


presto:default> select count(*) from guilds_joined;
 _col0 
-------
    10 
(1 row)


presto:default> select count(*) from nemeses_chosen;
 _col0 
-------
    10 
(1 row)

```

Afterwards, I fed the stream more data using the commands below:
```
while true; do
  docker-compose exec mids \
    ab -n 10 -H "Host: user1.comcast.com" \
      http://localhost:5000/purchase_a_sword
  sleep 10
done
```

```
while true; do
  docker-compose exec mids \
    ab -n 10 -H "Host: user1.comcast.com" \
      http://localhost:5000/join_guild
  sleep 5
done
```

```
while true; do
  docker-compose exec mids \
    ab -n 10 -H "Host: user1.comcast.com" \
      http://localhost:5000/choose_nemesis
  sleep 5
done
```

Once more data was streamed into the tables, I ran some queries to analyze the data within the tables.

#### Sword Purchase Events
```
presto:default> select item_type, count(*) from sword_purchases group by item_type;
 item_type | _col1 
-----------+-------
 sword     |  3080 
(1 row)

presto:default> select item_name,item_desc, count(*) from sword_purchases group by item_name,item_desc;
         item_name          |             item_desc              | _col2 
----------------------------+------------------------------------+-------
 Legbiter                   | gaddhjat sword from Norway         |   419 
 Zulfiqar                   | Double bladed sword                |   379 
 Curved Saber of San Martin | Curved saber                       |   366 
 Excalibur                  | sword of King Arthur               |   364 
 Durendal                   | Medievel one-handed straight sword |   386 
 Joyeuse                    | Medieval coronation sword          |   379 
 Seven Branched Sword       | Japanese ceremonial sword          |   380 
 Masamune                   | Japanese sword created by Masamune |   407 
 ```

#### Guild Events
```
presto:default> select item_type, count(*) from guilds_joined group by item_type;
 item_type | _col1 
-----------+-------
 guild     |  3580 
(1 row)


presto:default> select item_name,item_desc, count(*) from guilds_joined group by item_name,item_desc;
       item_name       |                       item_desc                       | _col2 
-----------------------+-------------------------------------------------------+-------
 corpus naviculariorum | guild of long-distance shippers in Rome               |   519 
 Masonry               | Masonry guild                                         |   531 
 Freemen of the City   | Medieval guild whose members had freedom from serfdom |   540 
 Carpentry             | Organization of carpenters                            |   535 
 Basoche               | French guild of legal clerks                          |   542 
 Za                    | Trade guild of Japan                                  |   545 
 Bakery                | Organization of bakers                                |   508 
(7 rows)
 ```

#### Nemesis Events
```
presto:default> select item_type, count(*) from nemeses_chosen group by item_type;
 item_type | _col1 
-----------+-------
 nemesis   |  4360 
(1 row)


presto:default> select item_name,item_desc, count(*) from nemeses_chosen group by item_name,item_desc;
   item_name    |                     item_desc                     | _col2 
----------------+---------------------------------------------------+-------
 Ursula         | Tormentor of the little mermaid                   |   908 
 Lord Voldemort | Evil mind of the Harry Potter world               |   862 
 Loki           | Agitator of the Marvel Universe                   |   876 
 Norman Bates   | Fictional serial killer, a total psycho           |   879 
 Darth Vader    | villain and evil ruler of the Star Wars franchise |   885 
 ```