# Project 3 - MIDS 205 - Oscar Casas

## The APP

In this app I created a user interface to choose a fighter where one can do some events such as joining a guild and buying a sword. The interface sends the user data to a full streaming data pipeline to be stored in cloudera hadoop in parquet compressed files. I use Hive to create the temporary tables that I then couple with presto to query the data to generate business knowledge. 

This game has users log in and then has the possibility for users to buy different type of swords and also has users be able to join different type of guilds. Users can also log in and out of the game as needed.

## Files Submitted

1) docker-compose.yml: is a config file for docker-compose. It allows to deploy, combine and configure multiple docker-container at the same time.

2) game_api.py: code for game application which allows users to create events in the application such as purchase a sword and join a guild.

3) Oscar_Casas_Project_3.ipynb: this notebook is the report on the project and should explain the process from start to finish on building a full data pipeline.

4) write-stream.py & filter_writes.py: reads events from kafka, proceedes to filter the data and then transfer output to hadoop.

5) generate.sh: generates random events as dictated in the file

## Full Pipeline Process

### First Steps

cd into the correct directory in gcp terminal
> cd w205
> cd project-3-oscarcasas

spin up docker-compose
> docker-compose up -d

verify containers are running
> docker-compose ps

### docker-compose.yml
```
---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    expose:
      - "2181"
      - "2888"
      - "32181"
      - "3888"
    extra_hosts:
      - "moby:127.0.0.1"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    expose:
      - "9092"
      - "29092"
    extra_hosts:
      - "moby:127.0.0.1"

  cloudera:
    image: midsw205/hadoop:0.0.2
    hostname: cloudera
    expose:
      - "8020" # nn
      - "8888" # hue
      - "9083" # hive thrift
      - "10000" # hive jdbc
      - "50070" # nn http
    ports:
      - "8888:8888"
    extra_hosts:
      - "moby:127.0.0.1"

  spark:
    image: midsw205/spark-python:0.0.6
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205
    expose:
      - "8888"
    #ports:
    #  - "8888:8888"
    depends_on:
      - cloudera
    environment:
      HADOOP_NAMENODE: cloudera
      HIVE_THRIFTSERVER: cloudera:9083
    extra_hosts:
      - "moby:127.0.0.1"
    command: bash

  presto:
    image: midsw205/presto:0.0.1
    hostname: presto
    volumes:
      - ~/w205:/w205
    expose:
      - "8080"
    environment:
      HIVE_THRIFTSERVER: cloudera:9083
    extra_hosts:
      - "moby:127.0.0.1"

  mids:
    image: midsw205/base:0.1.9
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205
    expose:
      - "5000"
    ports:
      - "5000:5000"
    extra_hosts:
      - "moby:127.0.0.1"
```

### Create Kafka Topic and Run Flask

Create topic events in kafka to consume the produced events
> docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092

Run Flask on game_api.py
> docker-compose exec mids env FLASK_APP=/w205/project-3-oscarcasas21/game_api.py flask run --host 0.0.0.0

Open a new terminal and monitor the events
> docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning

## game_api.py

```
#!/usr/bin/env python
import json
from kafka import KafkaProducer
from flask import Flask, request

app = Flask(__name__)
producer = KafkaProducer(bootstrap_servers='kafka:29092')


def log_to_kafka(topic, event):
    event.update(request.headers)
    producer.send(topic, json.dumps(event).encode())


@app.route("/")
def default_response():
    default_event = {'event_type': 'default'}
    log_to_kafka('events', default_event)
    return "This is the default response!\n"

@app.route("/purchase_sword/<type>")
def purchase_sword(type):
    purchase_sword_event = {'event_type': 'purchase_sword','sword_type':type}
    log_to_kafka('events', purchase_sword_event)
    return "Guild Joined! "+ type +"\n"

@app.route("/join_guild/<type>")
def join_guild(type):
    join_guild_event = {'event_type': 'join_guild','guild_type':type}
    log_to_kafka('events', join_guild_event)
    return "Guild Joined! "+ type +"\n"

@app.route("/login", methods=['POST'])
def login():
    id = request.args.get('id',default=0,type=int)
    login_event = {'event_type': 'login','id':id}
    log_to_kafka('events', login_event)
    return "User logged in = "+str(id)+"\n"

@app.route("/logout", methods=['POST'])
def logout():
    id = request.args.get('id',default=0,type=int)
    logout_event = {'event_type': 'logout','id':id}
    log_to_kafka('events', logout_event)
    return "User logged out = "+str(id)+"\n"
```

### Apache Bench Create Events

#### Apache Bench Simple

The data consists of 3 events, purchase event, join guild event and create guild event. Below are the Apache Bench commands to explain how they work. Purchases needs the item being purchased and the type, such as purchases/sword/katana. Join and create guild however just needs the name of the guild.

- docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/

- docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchases/sword/katana

- docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchases/armour/iron
        
- docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_guild/vikings
        
- docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/

- docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchases/sword/longsword
        
- docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchases/armour/iron
            
- docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild/vikings
        

Create a random event generator with generate.sh below that loops until terminated.

> ls -l generate.sh

> chmod +x generate.sh

> bash ./generate.sh

## generate.sh

```
#!/bin/sh
NOOFUSERS=100
ENDPOINTS=4
NOOFBTYPE=3
NOOFCTYPE=9

GENERATEREQS=1000

REQS=0
until [ $REQS -gt $GENERATEREQS ]; do
    ID=$(( ( RANDOM % $NOOFUSERS )  + 1 ))
    EP=$(( ( RANDOM % $ENDPOINTS )  + 1 ))
    BTYPE=$(( ( RANDOM % $NOOFBTYPE )  + 1 ))
    CTYPE=$(( ( RANDOM % $NOOFCTYPE )  + 1 ))
#    echo $ID $EP
    case $EP in
	1)
#    echo "sword"
	    case $BTYPE in
		1)
		    docker-compose exec mids curl "http://localhost:5000/purchase_sword/katana"
		    ;;
		2)
		    docker-compose exec mids curl "http://localhost:5000/purchase_sword/long-sword"
		    ;;
		3)
		    docker-compose exec mids curl "http://localhost:5000/purchase_sword/saber"
		    ;;
	    esac
	    ;;

	2)
#   	    echo "affiliation"
	    case $BTYPE in
		1)
		    docker-compose exec mids curl "http://localhost:5000/join_guild/nights"
		    ;;
		2)
		    docker-compose exec mids curl "http://localhost:5000/join_guild/vikings"
		    ;;
		3)
		    docker-compose exec mids curl "http://localhost:5000/join_guild/samurai"
		    ;;
	    esac
	    ;;
	3)
	    docker-compose exec mids curl -X POST "http://localhost:5000/login?id="$ID
	    ;;
	4)
	    docker-compose exec mids curl -X POST "http://localhost:5000/logout?id="$ID
	    ;;

    esac
    let REQS=REQS+1
done
```

### Streaming Code

write_stream.py is then used to read the events from kafka, filter to specified data and write to hadoop. To do this the raw events are filtered by a function which defines the events and proceeds to choose whichever event is picked. The code writes the events in two different folders:

/tmp/purchase_sword

/tmp/join_guild

> docker-compose exec spark spark-submit /w205/project-3-oscarcasas21/write_stream.py

> docker-compose exec spark spark-submit /w205/project-3-oscarcasas21/filter_writes.py

> docker-compose exec cloudera hadoop fs -ls /tmp/

##### Output
```
Found 5 items
drwxrwxrwt   - mapred mapred              0 2016-04-06 02:26 /tmp/hadoop-yarn
drwx-wx-wx   - hive   supergroup          0 2021-12-10 02:57 /tmp/hive
drwxr-xr-x   - root   supergroup          0 2021-12-10 02:58 /tmp/join_guild
drwxrwxrwt   - mapred hadoop              0 2016-04-06 02:28 /tmp/logs
drwxr-xr-x   - root   supergroup          0 2021-12-10 02:58 /tmp/purchase_sword
```

Peak the actual folder in hadoop

> docker-compose exec cloudera hadoop fs -ls /tmp/purchase_sword/

#### Output

```
Found 2 items
-rw-r--r--   1 root supergroup          0 2021-12-10 02:58 /tmp/purchase_sword/_SUCCESS
-rw-r--r--   1 root supergroup       2834 2021-12-10 02:58 /tmp/purchase_sword/part-00000-419d3986-138c-4c78-ae01-4
26b7aae0fbc-c000.snappy.parquet
```

## write_stream.py

```
#!/usr/bin/env python
"""Extract events from kafka and write them to hdfs
"""
import json
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf


@udf('boolean')
def is_join(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'join_guild':
        return True
    return False

def main():
    """main
    """
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .enableHiveSupport() \
        .getOrCreate()

    raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

    join_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_join('raw'))
    
    extracted_join_events = join_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()
    extracted_join_events.printSchema()
    extracted_join_events.show()

    extracted_join_events.registerTempTable("extracted_join_events")
    
    spark.sql("""
        create external table enrollment_join
        stored as parquet
        location '/tmp/join_guild'
        as
        select * from extracted_join_events
    """)

if __name__ == "__main__":
    main()
```

### Prepping Querying Data

Call the pyspark container to call the data in hdfs for querying

> docker-compose exec spark pyspark



This isn't necessary since it is already in the write_stream files but it is useful to know.

#### Swords
Create external table in parquet as parquet snappy format

> df = spark.read.parquet('/tmp/purchase_sword')

> df.registerTempTable('swords')

> query = "create external table purchase_swords stored as parquet location '/tmp/purchase_swords' as select * from swords"

> spark.sql(query)

#### Join Guilds
Create external table in parquet as parquet snappy format

> df = spark.read.parquet('/tmp/join_guild')

> df.registerTempTable('guilds')

> query = "create external table join_guild stored as parquet location '/tmp/join_guilds' as select * from guilds"

> spark.sql(query)


### Presto

The tables are then queried using presto.

> docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

> show tables;

#### Output

```
presto:default> show tables;
        Table        
---------------------
 enrollment_join     
 enrollment_purchase 
(2 rows)
```

> describe enrollment_join;

#### Output

```
presto:default> describe enrollment_join;
   Column   |  Type   | Comment 
------------+---------+---------
 accept     | varchar |         
 host       | varchar |         
 user-agent | varchar |         
 event_type | varchar |         
 guild_name | varchar |         
 timestamp  | varchar |         
(6 rows)
```

> describe enrollment_purchase;

#### Output

```
presto:default> describe enrollment_purchase;
   Column   |  Type   | Comment 
------------+---------+---------
 accept     | varchar |         
 host       | varchar |         
 user-agent | varchar |         
 event_type | varchar |         
 sword_type | varchar |         
 timestamp  | varchar |         
(6 rows)
```


### Queries

#### Select All

> select * from enrollment_purchase;

#### Output Sword
```
presto:default> select * from enrollment_purchase;
 accept |      host      | user-agent  |   event_type   | sword_type |        timestamp        
--------+----------------+-------------+----------------+------------+-------------------------
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | long-sword | 2021-12-10 02:52:46.225 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | katana     | 2021-12-10 02:52:46.652 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | katana     | 2021-12-10 02:52:47.073 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | katana     | 2021-12-10 02:52:49.614 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | saber      | 2021-12-10 02:52:50.469 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | long-sword | 2021-12-10 02:52:51.321 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | katana     | 2021-12-10 02:52:52.607 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | katana     | 2021-12-10 02:52:53.034 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | katana     | 2021-12-10 02:52:53.897 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | saber      | 2021-12-10 02:52:56.014 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | long-sword | 2021-12-10 02:52:57.707 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | katana     | 2021-12-10 02:52:58.141 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | long-sword | 2021-12-10 02:52:59.881 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | long-sword | 2021-12-10 02:53:02.417 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | long-sword | 2021-12-10 02:53:05.909 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | katana     | 2021-12-10 02:53:07.639 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | long-sword | 2021-12-10 02:53:12.27  
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | saber      | 2021-12-10 02:53:13.97  
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | long-sword | 2021-12-10 02:53:15.685 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | katana     | 2021-12-10 02:53:19.07  
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | katana     | 2021-12-10 02:53:22.426 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | long-sword | 2021-12-10 02:53:24.122 
 */*    | localhost:5000 | curl/7.47.0 | purchase_sword | saber      | 2021-12-10 02:53:24.559 

```
> select * from enrollment_join;

#### Output Guild
```
presto:default> select * from enrollment_join;
 accept |      host      | user-agent  | event_type | guild_name |        timestamp        
--------+----------------+-------------+------------+------------+-------------------------
 */*    | localhost:5000 | curl/7.47.0 | join_guild | samurai    | 2021-12-10 02:52:42.405 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | vikings    | 2021-12-10 02:52:42.829 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | nights     | 2021-12-10 02:52:44.098 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | samurai    | 2021-12-10 02:52:44.522 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | vikings    | 2021-12-10 02:52:47.486 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | vikings    | 2021-12-10 02:52:54.312 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | nights     | 2021-12-10 02:52:56.426 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | vikings    | 2021-12-10 02:52:57.265 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | samurai    | 2021-12-10 02:52:59.013 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | nights     | 2021-12-10 02:53:00.298 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | nights     | 2021-12-10 02:53:01.579 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | samurai    | 2021-12-10 02:53:04.131 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | nights     | 2021-12-10 02:53:04.573 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | samurai    | 2021-12-10 02:53:05.474 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | nights     | 2021-12-10 02:53:08.457 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | nights     | 2021-12-10 02:53:11.003 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | samurai    | 2021-12-10 02:53:11.858 
 */*    | localhost:5000 | curl/7.47.0 | join_guild | samurai    | 2021-12-10 02:53:13.553 

```

#### Select count of sword purchase events

> select count(sword_type) from enrollment_purchase;


##### Output: 115

#### Select count of join guild events

> select count(guild_name) from enrollment_join;

##### Output: 110

#### Select swords count events by type

> select sword_type, count(sword_type) from enrollment_join group by sword_type;

#### Output

```
presto:default> select count(guild_name) from enrollment_join group by guild_name;
 _col0 
-------
    34 
    36 
    40 
(3 rows)
```

#### Select guild type count

> select guild_name, count(guild_name) from enrollment_join group by guild_name;

#### Output

```
presto:default> select guild_name,count(guild_name) from enrollment_join group by guild_name;
 guild_name | _col1 
------------+-------
 samurai    |    34 
 vikings    |    36 
 nights     |    40 
(3 rows)

```

### Panda usage

To use in panda it is fairly straighforward open a python notebook and use the following code.

## Business Intelligence Analysis

After going through the entire pipeline, creating a event generator and analalyzing the data we have some initial data that is interesting from a business perspective. Due to the fact that we are focused on starting a game that logs users purchases and affiliations, it was interesting to follow how many events happened and what happened exactly. From our data we can understand that events are somewhat rnadomly distributed between affiliation and purchases since they have 110 and 115 events respectively. The type of events also showed similar results, where no type took a large demand of the events. For the game this is good because we want even teams with diverse abilities and players. Overall, the business intelligence analysis may be simple, but it is pivotal in understanding the strengths and weaknesses of the game.

### Shutting Down

> docker-compose down