# Assemble Pipeline: UNDERSTANDING USER BEHAVIOR
-----------


# A. Description of Data Pipeline:

## I. Instrument API server and send log events to Kafka

### 1. Spin up the pipeline

Steps to spin up the pipeline: start with setting up the enviroment using docker-compose.yml

* Navigate to the project folder where the docker-compose.yml file is stored.
* Explanation about the docker-compose.yml file: 
It includes the following containers
    - zookeeper: Docker image for running Zookeeper, set expose to port 32181
    - kafka: Docker image for running Kafka, have single broker cluster with broker id = 1, set depends on zookeeper and connect to zookeeper through port 32181, expose to port 29092
    - cloudera: Docker image for running Hadoop, version 0.0.2, expose to port 8888 for hue and 9083 for hive thrift server, allow other program to connect to cloudera through port 8888. 
    - spark: Docker image for running Spark with Python, set depends on cloudera, connect to Hadoop environment using namenode cloudera, and connect to Hive thrift server cloudera through port 9083.
    - presto: Docker image for running Presto, connect to Hive thrift server cloudera through port 9083
    - mids: Docker image for running Flask, Apache Bench, Python with datascience libraries, etc.


* Check what container exist before spinning up the cluster:

```
docker ps
```

* Spin up multiple Docker containers from the docker-compose.yml file, running the containers in the background:

```
docker-compose up -d
```

* Check what containers are running after spinning up the cluster, show container names and port details:

```
docker-compose ps
```

* Look at the logs to check Kafka broker:  

```
docker-compose logs -f kafka

```

* Check (list) what files/ folders are currently in the /tmp/ directory in Hadoop prior to streaming data, we will not see the streaming data directories in this step yet:

```
docker-compose exec cloudera hadoop fs -ls /tmp/
```

### 2. Running API Server/ Web-app with Flask and send the event logs to Kafka

Running an API Server using Flask and sending the event logs to Kafka, which is executed by running the python scripts game_api.py. 

* This python script contain codes with the following functions:
    - Flask retrieves "GET" and "POST" requests for different event types. The following events are used in this project:
        * defaul_reponse: "GET" request 
        * purchase_a_sword: "GET" request
        * join_a_guild:  "GET" request
        * Earn_more_gold:  "GET" request
        * purchase_a_potion: "GET" and "POST" request
    - The API server is acting as a producer sends these event logs to Kafka bootstrap_server through port 29002 under topic name `events`. Each message is an event log from a "GET" or "POST" request event.

* Command for running flask from the scripts while streaming:

```
docker-compose exec mids \
  env FLASK_APP=/Assemble_Pipeline_API_Streaming/game_api.py \
  flask run --host 0.0.0.0
```

### 3. Use Kafkacat consumer to watch for kafka messages:

Open Terminal 2 window for setting up to watch the messages sent through Kafka:

* Create a Kafka topic name `events` (eventhough this step is not required, setting up the topic prior to watching Kafka could avoid seeing the topic event error when running the code for watching). This topic has 1 partition and connect to zookeeper through port 32181

```
docker-compose exec kafka \
  kafka-topics \
    --create \
    --topic events \
    --partitions 1 \
    --replication-factor 1 \
    --if-not-exists --zookeeper zookeeper:32181
```

* Use kafkacat consumer to read the messages sent in topic `events` from the beginning through port 29092. We will see the messages coming in during streaming the data

```
docker-compose exec mids \
  kafkacat -C -b kafka:29092 -t events -o beginning
```

### 4. Run Spark from python scripts to consume streaming messages and land on Hadoop for later data streaming 

Open Terminal 3 window for running Spark

* Run the python script write_swords_stream.py to consume the Kafka messages, filter select event types and land them into HDFS parquet file. This python script contain codes with the following functions:

    - Build a session for running Spark 
    - Consume the streaming messages from kafka topic `events` by readStream function, connect to kafka through port 29092. .
    - Tranform the messages with Spark and filter/ select the events of interest:
        * Specify the event schema
        * Filter from raw events and select the event of interested (for example filter the event type "purchase_sword", and event type "join_guild")
        * Cast the timestamp, event value and event schema from binary data to string
        * Return the event value, timestamp and event schema in the streaming messages
    - Write the streaming data to parquet file with writeStream function and land into Hadoop with processing time every 120 seconds
    - Streaming the data until termination (Crtl + C)

Codes for running Spark from write_stream.py

```
docker-compose exec spark \
  spark-submit /Assemble_Pipeline_API_Streaming/write_stream.py
```

### 5. Check the data landed in Hadoop:

Open Terminal 4 window for watching Hadoop

* List all directories in hadoop /tmp/ directory

```
docker-compose exec cloudera hadoop fs -ls /tmp/
```

* Check sword_purchase directory in hadoop with human readable format

```
docker-compose exec cloudera hadoop fs -ls -h /tmp/sword_purchases
```

* Check guild_joins directory in hadoop with human readable format

```
docker-compose exec cloudera hadoop fs -ls -h /tmp/guild_joins
```

* Check gold_earns directory in hadoop with human readable format

```
docker-compose exec cloudera hadoop fs -ls -h /tmp/gold_earns
```


* Check potion_purchases directory in hadoop with human readable format

```
docker-compose exec cloudera hadoop fs -ls -h /tmp/potion_purchases
```

* Check all_events directory in hadoop with human readable format

```
docker-compose exec cloudera hadoop fs -ls -h /tmp/all_events
```

## II. Create Batch Events

### 1. Use Apache Bench to generate batch events, test data:

Open Terminal 5 window for using Apache Bench to generate the events in a batch, we can see these events in the Kafka watch window, and the Flask window. 

* Run the bash script file generate_data.sh which includes codes for generating API requests using Apache Bench:

```
./generate_data.sh
```

  - Example of a GET request in the bash script generate_data.sh:

```
docker-compose exec mids \
  ab \
    -n 5 \
    -H "Host: user1.comcast.com" \
    http://localhost:5000/purchase_a_sword/titanium
```
  
  - Example of a POST request in the bash script generate_data.sh:

```
docker-compose exec mids \
  ab \
    -n 3 \
    -H "Host: user2.att.com" \
    -T "application/json" \
    -p /w205/project-3-latuyetmai/post.txt \
    http://localhost:5000/purchase_a_potion
```

## III. Query Batch Data with Presto

### 1. Create external tables in the hive metastore in hadoop ecosystem

Open Terminal 4 window for running hive and then presto

* Running hive:

```
docker-compose exec cloudera hive
```

* Create an external table for the purchase events, named it purchases:

```
create external table if not exists default.sword_purchases (
    event_type string,
    gold_count string,
    description string,
    value string,
    Accept string,
    Host string,
    User_Agent string,
    timestamp string,
    raw_event string
  )
  stored as parquet 
  location '/tmp/sword_purchases'
  tblproperties ("parquet.compress"="SNAPPY");
```

* Create an external table for the guild join events, named it purchases:

```
create external table if not exists default.guild_joins (
    event_type string,
    gold_count string,
    description string,
    value string,
    Accept string,
    Host string,
    User_Agent string,
    timestamp string,
    raw_event string
  )
  stored as parquet 
  location '/tmp/guild_joins'
  tblproperties ("parquet.compress"="SNAPPY");
```

* Create an external table for the gold earn events, named it purchases:

```
create external table if not exists default.gold_earns (
    event_type string,
    gold_count string,
    description string,
    value string,
    Accept string,
    Host string,
    User_Agent string,
    timestamp string,
    raw_event string
  )
  stored as parquet 
  location '/tmp/gold_earns'
  tblproperties ("parquet.compress"="SNAPPY");
```

* Create an external table for the knife purchase events, named it knife_purchases:

```
create external table if not exists default.potion_purchases (
    event_type string,
    gold_count string,
    description string,
    value string,
    Accept string,
    Host string,
    User_Agent string,
    timestamp string,
    raw_event string
  )
  stored as parquet 
  location '/tmp/potion_purchases'
  tblproperties ("parquet.compress"="SNAPPY");
```
* Create an external table for all events, named it all_events:

```
create external table if not exists default.all_events (
    event_type string,
    gold_count string,
    description string,
    value string,
    Accept string,
    Host string,
    User_Agent string,
    timestamp string,
    raw_event string
  )
  stored as parquet 
  location '/tmp/all_events'
  tblproperties ("parquet.compress"="SNAPPY");
```

* Close hive with `Ctrl-D`

### 2. Query the table with presto:

* Running Presto:

    - Presto is connected to hadoop (hive thrift server) through port 9083, and so it could get the external table that was added in hive. Code for running Presto:

```
docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

```

* Show the tables that we have in hive with Presto:

```
show tables;
```

    Result:

|      Table       |
|:----------------:|
|    all_events    |
|    gold_earns    |
|   guild_joins    |
| potion_purchases |
| sword_purchases  |

* Show the summary of a table with describe function:

```
describe sword_purchases;
```

    Result:

|    Column   |   Type  | Comment |
|:-----------:|:-------:|:-------:|
|  event_type | varchar |         |
|  gold_count | varchar |         |
| description | varchar |         |
|    value    | varchar |         |
|    accept   | varchar |         |
|     host    | varchar |         |
|  user_agent | varchar |         |
|  timestamp  | varchar |         |
|  raw_event  | varchar |         |

* Query: show the first 10 rows data of each table:

```
select * from all_events limit 10;
select * from gold_earns limit 10;
select * from guild_joins limit 10;
select * from potion_purchases limit 10;
select * from sword_purchases limit 10;

```
    Note: exit out of the table with `q`

* Query: How many total events?

```
select count(*) as event_count from all_events;
```

    Result:

|   event_count   |
|:---------------:|
|        132      |


* Query: How many gold_earns events?

```
select count(*) as event_count from gold_earns;
```

    Result:

|   event_count   |
|:---------------:|
|        70       |

* Query: How many guild_joins events?

```
select count(*) as event_count from guild_joins;
```

    Result:

|   event_count   |
|:---------------:|
|        7        |


* Query: How many potion_purchases events?

```
select count(*) as event_count from potion_purchases;
```

    Result:

|   event_count   |
|:---------------:|
|        14       |


* Query: How many purchase_sword events?

```
select count(*) as event_count from sword_purchases;
```

    Result:

|   event_count   |
|:---------------:|
|        21       |


# B. Streaming & Analyse Data:


### 1. Sending streaming data continuously with Apache Bench:

Open Terminal 5 for sending streaming data

* Sending API requests every 30 second, for example sending all the requests from the generate_data.sh file until termination (Ctrl + C):

```
while true; do
    ./generate_data.sh
    sleep 30
done
```

* See streaming events in the terminal running Flask, Kafka and Spark. 

### 2. Analyse streaming data with presto:

Update the queries in Terminal 4 running presto while streaming, watch and see the data growing. Perform query after some time.

* Query: How many total sword_purchase events?

```
select count(*) as event_count from sword_purchases;
```

    Result:

|   event_count   |
|:---------------:|
|        210      |

* Query: How many events each user made, what type of sword, potion they purchased and guild name they join?

```
select host as player, event_type, description, count(raw_event) as number_of_event from all_events group by host, event_type, description order by host, event_type, number_of_event desc;
    
```

    Result:

|       player      |    event_type   |       description      | number_of_event |
|:-----------------:|:---------------:|:----------------------:|:---------------:|
| user1.comcast.com |     default     |                        |       120       |
| user1.comcast.com |    earn_gold    |                        |       420       |
| user1.comcast.com |    join_guild   |       holy_ramen       |        24       |
| user1.comcast.com |    join_guild   |     kungfu_chicken     |        24       |
| user1.comcast.com | purchase_potion |        strength        |        36       |
| user1.comcast.com | purchase_potion |         poison         |        13       |
| user1.comcast.com | purchase_potion |           fly          |        12       |
| user1.comcast.com | purchase_potion | Not_enough_gold_to_buy |        11       |
| user1.comcast.com |  purchase_sword | Not_enough_gold_to_buy |       118       |
| user1.comcast.com |  purchase_sword |        titanium        |        49       |
| user1.comcast.com |  purchase_sword |          steel         |        13       |
|   user2.att.com   |     default     |                        |       120       |
|   user2.att.com   |    earn_gold    |                        |       420       |
|   user2.att.com   |    join_guild   |       holy_ramen       |        12       |
|   user2.att.com   |    join_guild   |     kungfu_chicken     |        12       |
|   user2.att.com   |    join_guild   |      clumsy_witch      |        12       |
|   user2.att.com   | purchase_potion |        strength        |        36       |
|   user2.att.com   | purchase_potion | Not_enough_gold_to_buy |        35       |
|   user2.att.com   | purchase_potion |          speed         |        24       |
|   user2.att.com   | purchase_potion |          love          |        1        |
|   user2.att.com   |  purchase_sword | Not_enough_gold_to_buy |        66       |
|   user2.att.com   |  purchase_sword |         silver         |        5        |
|   user2.att.com   |  purchase_sword |          magic         |        1        |
 

* Query: How many guilds are there and how many member in each guild?

```
select description as guid_name, max(value) as number_of_members from guild_joins group by description order by number_of_members desc;  
``` 
    Result:

|    guid_name   |   number_of_members  |
|:--------------:|:--------------------:|
|   holy_ramen   |          3          |
| kungfu_chicken |          2          |
|  clumsy_witch  |          1          |

* Query: How many sword did each player purchase by sword type and value?

```
select host as player, description as sword_type, value as sword_price, count(raw_event) as number_of_purchases  from sword_purchases group by host, description, value order by player, number_of_purchases desc;  
```

    Result:
    
|       player      |       sword_type       | sword_price | number_of_purchases |
|:-----------------:|:----------------------:|:-----------:|:-------------------:|
| user1.comcast.com | Not_enough_gold_to_buy |      0      |         118         |
| user1.comcast.com |        titanium        |      4      |          49         |
| user1.comcast.com |          steel         |      1      |          13         |
|   user2.att.com   | Not_enough_gold_to_buy |      0      |          66         |
|   user2.att.com   |         silver         |      2      |          5          |
|   user2.att.com   |          magic         |      20     |          1          |

* Query: How many potion did each player purchase by potion type and value?

```
select host as player, description as potion_type, value as potion_price, count(raw_event) as number_of_purchases  from potion_purchases group by host, description, value order by player, number_of_purchases desc;  
```

    Result:
    
|       player      |       potion_type      | potion_price | number_of_purchases |
|:-----------------:|:----------------------:|:------------:|:-------------------:|
| user1.comcast.com |        strength        |       3      |          36         |
| user1.comcast.com |         poison         |       4      |          13         |
| user1.comcast.com |           fly          |       5      |          12         |
| user1.comcast.com | Not_enough_gold_to_buy |       0      |          11         |
|   user2.att.com   |        strength        |       3      |          36         |
|   user2.att.com   | Not_enough_gold_to_buy |       0      |          35         |
|   user2.att.com   |          speed         |       2      |          24         |
|   user2.att.com   |          love          |       6      |          1          |

### 3. Spin down Docker containers

* Terminal 5: Stop sending streaming data with `Ctrl-C`
* Terminal 4: Stop running presto with `Ctrl-D`
* Terminal 3: Stop running Spark with `Ctrl-C`
* Terminal 2: Stop running Kafka with `Ctrl-C`
* Terminal 1: Stop running Flask with `Ctrl-C`
* Spin down docker containers from the docker-compose.yml file:

```
docker-compose down
```

* Check if the containers are down:

```
docker-compose ps
```