# Project 3 - W205 - Full Pipeline 
## Ernesto Oropeza

## Summary

A complete pipeline is designed to stream data from a Game Aplication in a Docker container. The API server code consists on three actions: purchase weapons, create guilds and join existing guilds. As data is streaming each action is filter individually according to the type of event and written to hadoop. Then Hive and presto are used to query the data. A script designed to reandomly produce the game application output is used to test the pipeline. Sql queries from Presto show the effectiveness of the created pipeline. Different commands are executed simultaneausly on different terminals (Highlighted before each command line). 

## Files Submitted

1. Ernesto_Oropeza_Project_3.ipynb (This Report)
1. game_p3_api.py
2. docker-compose.yml
3. stream_write_p3.py
4. random_event_generator.sh

## Full Pipeline Description

### 1. Set Up

**Terminal 1**
> docker-compose.yml

### 2. Spin up The container

**Terminal 1**
   > docker-compose up -d
   
Output:

Creating network "project3oropezaev_default" with the default driver\
Creating project3oropezaev_zookeeper_1\
Creating project3oropezaev_presto_1\
Creating project3oropezaev_cloudera_1\
Creating project3oropezaev_mids_1\
Creating project3oropezaev_spark_1\
Creating project3oropezaev_kafka_1

The command to verify that all containers are spin up is:
  > docker-compose ps

output:

|            Name               |               Command           |        State             |                   Ports                                              | 
|-----------------------------  |-----------------------------    |--------------------------|-----------------------------------------------------                 |
|project3oropezaev_cloudera_1   | /usr/bin/docker-entrypoint ...  | Up                       |     10000/tcp, 50070/tcp, 8020/tcp, 0.0.0.0:8888->8888/tcp, 9083/tcp |
|project3oropezaev_kafka_1      | /etc/confluent/docker/run       | Up                       |     29092/tcp, 9092/tcp                                              |
|project3oropezaev_mids_1       | /bin/bash                       | Up                       |     0.0.0.0:5000->5000/tcp, 8888/tcp                                 |
|project3oropezaev_presto_1     | /usr/bin/docker-entrypoint ...  | Up                       |     8080/tcp                                                         |
|project3oropezaev_spark_1      | docker-entrypoint.sh bash       | Up                       |     8888/tcp                                                         |
|project3oropezaev_zookeeper_1  | /etc/confluent/docker/run       | Up                       |     2181/tcp, 2888/tcp, 32181/tcp, 3888/tcp                          |

### 3. Create Kafka Topic and Monitoring Events

The events that are produced are consumed in Kafka in a created topic named ***events*** as follows:\
**Terminal 1**\
Command:
  > docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

Output:\
Created topic events.

To monitor the events produced to Kafka, the following command is used to check it:\
Command:
  > docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning

Output (For the very first event produced):\
{"event_type": "purchase", "Accept": "*/*", "User-Agent": "ApacheBench/2.3", "item_purchased": "sword", "Host": "user4.comcast.com", "type": "samurai"}


### 4. Game API

The code for the game aplication is included in the file ***game_p3_api.py***.
The user is allowed to create three events:
  * Purchase a Weapon
  * Create a Guild
  * Join an existing Guild
  
The actions are very open so the application can extend the options freely. To purchase a weapon the name and type of the weapon needs to be especified AS FOLLOWS:
  * /purchase/***WEAPON_NAME***/***TYPE_OF_WEAPON***

For example, to purchase a Samurai Sword the option would be as follows: ***http://localhost:5000/purchase/sword/samurai***.
Here the only fixed option is ***purchaese***, the weapon and type if user defined. 

To to create a guild, the following structure is required: the inquiry is as follows:
  * /creat_guild/***NAME OF THE GUILD***
  
For example, to create a guild named Barbarians the option would be: ***http://localhost:5000/create_guild/barbarians***.
Here the only fixed option is ***create_guild***. The name is user defined. The application would have to verify that the name is available.

To join a guild, the procedure the option is similar than for creating one:
  * /join_guild/***NAME OF THE GUILD***

For example, to join the created guild Barbarians the option would be: ***http://localhost:5000/join_guild/barbarians***.
Here the only fixed option is ***join_guild***. Again the name is optional and the application would have to verify the availability and protocol for joining the guild.

There is a default response for all the posibilities (e.g missing somthing required like name Guild Name). In these cases response is created but no action is taken. 


### 5. Running API in FLASK

The code ***game_p3_api.py*** is run using FLASK.\
**Terminal 2**\
Command:
  > docker-compose exec mids env FLASK_APP=/w205/project-3-oropezaev/game_p3_api.py flask run --host 0.0.0.0
  
Output (For the very first event produced):\
Starting events
  * Serving Flask app "game_p3_api"
  * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)\

127.0.0.1 - - [11/Apr/2020 23:58:42] "GET /purchase/sword/samurai HTTP/1.0" 200 -  

### 6. Streamin, Filtering and Writing Code 

#### Pseudo-code
The code in ***stream_write_p3.py*** (See details in the .py file) is used to read the events from Kafka, filter the data and write the output to Hadoop.\
A pseudo code would be as follows:\

1. Read the events from Kafka in streaming mode

3. Raw events are with the respective outpur from ***action*** create the filter events: purchase_events, create_events and join_events.
  * Define function ***action*** to filter the raw events
  * Each event has an event type: purchase, create_guild and join_guild. This function return an integer 0, 1 or 2 respectively labeling the event to proceed accordingly
3. Write the events in HDFS in three different folders:\
  * /tmp/purchases
  * /tmp/create
  * /tmp/join
  * There are 2 different schemas for purchase and guilds (create and join)
4. Await for user termination

#### Running Streming Code

**Terminal 3**\
Command:\
  > docker-compose exec spark spark-submit /w205/project-3-oropezaev/stream_write_p3.py

Output (Not included - Very Large Display).

#### Checking HDFS
To check the data that is written in the above floder the following command can be used:\
**Terminal 7**\
Command:
  > docker-compose exec cloudera hadoop fs -ls /tmp/
  
Output:

Found 9 items
drwxrwxrwt   - root   supergroup          0 2020-04-11 23:58 /tmp/checkpoints_for_create\
drwxrwxrwt   - root   supergroup          0 2020-04-11 23:58 /tmp/checkpoints_for_join\
drwxrwxrwt   - root   supergroup          0 2020-04-11 23:58 /tmp/checkpoints_for_purchases\
drwxr-xr-x   - root   supergroup          0 2020-04-12 01:52 /tmp/create\
drwxrwxrwt   - mapred mapred              0 2016-04-06 02:26 /tmp/hadoop-yarn\
drwx-wx-wx   - hive   supergroup          0 2020-04-12 01:32 /tmp/hive\
drwxr-xr-x   - root   supergroup          0 2020-04-12 01:52 /tmp/join\
drwxrwxrwt   - mapred hadoop              0 2016-04-06 02:28 /tmp/logs\
drwxr-xr-x   - root   supergroup          0 2020-04-12 01:52 /tmp/purchases

### 7. Creating Testing Events 

The data is simulated by building Apache Bench commands randomly among 10 different users, 4 weapons, 4 weapon types and 10 guilds. Incomplete commands are also included to test the response. The following script details the data generation included in the file ***random_event_generator.sh*** 

#!/bin/bash

var1='docker-compose exec mids ab -n 1 -H "Host: '

var2='" http://localhost:5000/'

var3="purchase"

var4="create_guild"

var5="join_guild"

sl="/"

users=(user1@gmail.com user2@yahoo.com user3@hotmail.com user4@aol.com user5@comcast.com user6@att.com user7@tmobil.com user8@dell.com user9@tmb.com user10@xto.com)

weapons=(sword spear axe hammer)

chara=(light heavy large medium)

guild=(bandits warriors rangers knights farmers assesins spies spartans army sailors)

while true; do

        #Dummy1
        ru=$(($RANDOM%10))
        eval $var1${users[ru]}$var2
        
        #Dummy2
        ru=$(($RANDOM%10))
        eval $var1${users[ru]}$var2$var3
        
        #Dummy3
        ru=$(($RANDOM%10))
        rw=$(($RANDOM%4))
        eval $var1${users[ru]}$var2$var3$sl${weapons[rw]}
        
        #Purchase
        ru=$(($RANDOM%10))
        rw=$(($RANDOM%4))
        rc=$(($RANDOM%4))
        eval $var1${users[ru]}$var2$var3$sl${weapons[rw]}$sl${chara[rc]}
        
        #Dummy4
        ru=$(($RANDOM%10))
        eval $var1${users[ru]}$var2$var4
        
        #create
        ru=$(($RANDOM%10))
        rg=$(($RANDOM%10))
        eval $var1${users[ru]}$var2$var4$sl${guild[rg]}
        
        #Dummy5
        ru=$(($RANDOM%10))
        eval $var1${users[ru]}$var2$var5
        
        #create
        ru=$(($RANDOM%10))
        rg=$(($RANDOM%10))
        eval $var1${users[ru]}$var2$var5$sl${guild[rg]}

        sleep 10
done

**Terminal 4**\
Command:\
./random_event_generator.sh

Note that the only events that are going to produce information to write in the output are:\
  - docker-compose exec mids ab -n 1 -H "Host: user4.comcast.com" http://localhost:5000/purchase/sword/samurai \
  - docker-compose exec mids ab -n 1 -H "Host: user6.comcast.com" http://localhost:5000/join_guild/Barbarians \
  - docker-compose exec mids ab -n 1 -H "Host: user8.comcast.com" http://localhost:5000/create_guild/Cacique
  
This creates continuosly the above events until the program is terminated.

### 8. Querying Data


#### Hive
To use update the data while straming Hive is run in  the hadoop container as follows:\

**Terminal 5**\
Command:\
  > docker-compose exec cloudera hive
  
Output:\
2020-04-12 01:32:06,598 WARN  [main] mapreduce.TableMapReduceUtil: The hbase-prefix-tree module jar containing PrefixTreeCodec is not present.  Continuing without it.

Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
WARNING: Hive CLI is deprecated and migration to Beeline is recommended.
hive>

Once the hive is running three different tables are created:\
  * purchase_table
  * create_table
  * join_table
  
Tese table contain the respective (filtered) events.  The following commands are used to creat the tables respectively:\

  > create external table if not exists default.purchases_table (timestamp string, Accept string, Host string, User_Agent string, event_type string, item_purchased string, type string) stored as parquet location '/tmp/purchases'  tblproperties ("parquet.compress"="SNAPPY");
  
  > create external table if not exists default.create_table (timestamp string, Accept string, Host string, User_Agent string, event_type string, guild_name string) stored as parquet location '/tmp/create'  tblproperties ("parquet.compress"="SNAPPY");
  
  > create external table if not exists default.join_table (timestamp string, Accept string, Host string, User_Agent string, event_type string, guild_name string) stored as parquet location '/tmp/join'  tblproperties ("parquet.compress"="SNAPPY");
  
#### Presto
The data is quered in Presto. Once the tables are created, Presto is run by:\
**Terminal 6**\
Command:
  > docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

Output:\
presto:default>

Once in Presto different commands are use with the propose of extract the required information.  First a couple of commands are run to check the data schema and the available tables.

#### Displaying table list
Command:\
presto:default> show tables;

Output:\
  create_table\
  join_table  
  purchase_table\
  (3 rows)

Query 20200411_184234_00005_6aanw, FINISHED, 1 node\
Splits: 2 total, 0 done (0.00%)\
0:00 [0 rows, 0B] [0 rows/s, 0B/s]

#### Table Schema
presto:default> describe purchase_table;

|    Column     |  Type   | Comment 
|----------------|---------|---------
 timestamp      | varchar |         
 accept         | varchar |         
 host           | varchar |         
 user-agent     | varchar |         
 event_type     | varchar |         
 item_purchased | varchar |         
 type           | varchar |         
(7 rows)||

Query 20200411_184346_00006_6aanw, FINISHED, 1 node\
Splits: 2 total, 0 done (0.00%)\
0:02 [0 rows, 0B] [0 rows/s, 0B/s]

#### Querying Data
#### 1. Chequing Data Streaming 
The first query in this part show the created empty table, before streaming data. The secon query is the repetition of the first few minutes later. It shows 16 new purchases made in this time lapse.


presto:default> ***select *  from purchases_table;***

| timestamp | accept | host | user_agent | event_type | item_purchased | type 
|-----------|--------|------|------------|------------|----------------|------

(0 rows)

Query 20200412_160943_00004_xf4bz, FINISHED, 1 node\
Splits: 1 total, 0 done (0.00%)\
0:04 [0 rows, 0B] [0 rows/s, 0B/s]


presto:default> ***select * from purchases_table;***

|        timestamp        | accept |       host        |   user_agent    | event_type | item_purchased |  type  
|-------------------------|--------|-------------------|-----------------|------------|----------------|--------
 2020-04-12 16:20:24.926 | \*/\*    | user4@aol.com     | ApacheBench/2.3 | purchase   | spear          | light  
 2020-04-12 16:14:24.595 | \*/\*    | user3@hotmail.com | ApacheBench/2.3 | purchase   | spear          | large  
 2020-04-12 16:14:43.235 | \*/\*    | user3@hotmail.com | ApacheBench/2.3 | purchase   | hammer         | large  
 2020-04-12 16:15:00.933 | \*/\*    | user10@xto.com    | ApacheBench/2.3 | purchase   | sword          | medium 
 2020-04-12 16:15:20.934 | \*/\*    | user7@tmobil.com  | ApacheBench/2.3 | purchase   | sword          | light  
 2020-04-12 16:15:37.857 | \*/\*    | user8@dell.com    | ApacheBench/2.3 | purchase   | spear          | large  
 2020-04-12 16:21:56.021 | \*/\*    | user8@dell.com    | ApacheBench/2.3 | purchase   | spear          | light  
 2020-04-12 16:18:39.573 | \*/\*    | user2@yahoo.com   | ApacheBench/2.3 | purchase   | spear          | light  
 2020-04-12 16:17:27.304 | \*/\*    | user2@yahoo.com   | ApacheBench/2.3 | purchase   | sword          | medium 
 2020-04-12 16:16:07.094 | \*/\*    | user4@aol.com     | ApacheBench/2.3 | purchase   | sword          | large  
 2020-04-12 16:16:29.424 | \*/\*    | user1@gmail.com   | ApacheBench/2.3 | purchase   | sword          | large  
 2020-04-12 16:16:47.898 | \*/\*    | user7@tmobil.com  | ApacheBench/2.3 | purchase   | hammer         | heavy  
 2020-04-12 16:23:48.296 | \*/\*    | user5@comcast.com | ApacheBench/2.3 | purchase   | hammer         | light  

(13 rows)

Query 20200412_163948_00011_xf4bz, FINISHED, 1 node\
Splits: 15 total, 10 done (66.67%)\
0:03 [9 rows, 10.7KB] [3 rows/s, 3.98KB/s]

#### 2. Most popular weapons 

presto:default> ***select item_purchased, count(***\****) as amount_purchased from purchases_table group by item_purchased order by amount_purchased desc;***

|item_purchased | amount_purchased 
|----------------|------------------
 hammer         |               23 
 sword          |               22 
 spear          |               19 
 axe            |               17 

(4 rows)

Query 20200412_174128_00008_2uqu9, FINISHED, 1 node\
Splits: 26 total, 20 done (76.92%)\
0:01 [65 rows, 36.8KB] [57 rows/s, 32.5KB/s]

#### 3. Most popular weapons with type 

presto:default> ***select specific_weapon, count(***\****) as amount_purchased from (select concat(item_purchased,'-',type) as specific_weapon from purchases_table) group by specific_weapon order by amount_purchased desc;***
 
| specific_weapon | amount_purchased 
|-----------------|------------------
 axe-light       |               11 
 sword-heavy     |               10 
 hammer-light    |                9 
 axe-large       |                9 
 hammer-heavy    |                9 
 spear-large     |                8 
 spear-light     |                8 
 sword-light     |                8 
 sword-large     |                7 
 sword-medium    |                7 
 spear-heavy     |                7 
 spear-medium    |                6 
 hammer-medium   |                6 
 hammer-large    |                6 
 axe-medium      |                6 
 axe-heavy       |                3 

(16 rows)

Query 20200412_175229_00013_2uqu9, FINISHED, 1 node\
Splits: 37 total, 33 done (89.19%)\
0:02 [109 rows, 60.2KB] [57 rows/s, 31.7KB/s]

#### 4. User and Guild Created 

presto:default> ***select host, count(***\****) as no_guild_created from create_table group by host order by no_guild_created desc;***

|       host        | no_guild_created 
|-------------------|------------------
 user3@hotmail.com |               17 
 user7@tmobil.com  |               16 
 user10@xto.com    |               15 
 user6@att.com     |               14 
 user2@yahoo.com   |               13 
 user1@gmail.com   |               12 
 user9@tmb.com     |               12 
 user5@comcast.com |               12 
 user4@aol.com     |               11 
 user8@dell.com    |                9 

(10 rows)

Query 20200412_175550_00016_2uqu9, FINISHED, 1 node\
Splits: 40 total, 29 done (72.50%)\
0:02 [102 rows, 50.6KB] [46 rows/s, 22.9KB/s]

#### 5. Popular Guild (Users Joined) 

presto:default> ***select guild_name, count(***\****) as no_join_request from join_table group by guild_name order by no_join_request desc;***

| guild_name | no_join_request 
|------------|-----------------
 spies      |              21 
 warriors   |              17 
 assesins   |              17 
 sailors    |              16 
 farmers    |              14 
 army       |              14 
 spartans   |              12 
 bandits    |              10 
 knights    |              10 
 rangers    |              10 

(10 rows)

Query 20200412_175801_00018_2uqu9, FINISHED, 1 node\
Splits: 43 total, 38 done (88.37%)\
0:02 [129 rows, 64KB] [61 rows/s, 30.3KB/s]

#### 6. Final tables size (before stop streaming)

presto:default> ***select count(***\****) as no_purchases from purchases_table;***

| no_purchases 
|--------------
          149 

(1 row)

Query 20200412_180046_00024_2uqu9, FINISHED, 1 node\
Splits: 44 total, 42 done (95.45%)\
0:03 [149 rows, 81.6KB] [46 rows/s, 25.4KB/s]

presto:default> ***select count(***\****) as no_guild_created from create_table;***

|no_guild_created 
|------------------
              152 

(1 row)

Query 20200412_180127_00025_2uqu9, FINISHED, 1 node\
Splits: 45 total, 39 done (86.67%)\
0:02 [138 rows, 68.1KB] [90 rows/s, 44.7KB/s]

presto:default> ***select count(***\****) as no_guild_join_request from join_table;***

| no_guild_join_request 
|-----------------------
                   156 

(1 row)

Query 20200412_180215_00026_2uqu9, FINISHED, 1 node\
Splits: 46 total, 35 done (76.09%)\
0:02 [123 rows, 60.6KB] [54 rows/s, 27KB/s]