#  Understand User Behavior
## Project 3 - Final Report
### 07/30/2020 - Shyamkarthik Rameshbabu

## Introduction

- You're a data scientist at a game development company  

- Your latest mobile game has two events you're interested in tracking: `purchase_a_sword` and `join_a_guild`

- Each has metadata characterstic of such events (i.e., sword type, guild name,
  etc)
  
  
Our goal here will be to simulate user interactions with our "mobile game" while tracking and processing events through the entire pipeline end to end. We will take advantage of a variety of tools to achieve each step of this pipeline which will be detailed below.

## Commands Walkthrough

### Spin up the pipeline.

First we will run our favorite commmand to get all our containers up and running. Please refer to the ```docker-compose.yml``` file for descriptions of each container.

```bat

docker-compose up -d

```

### Kafka Consumer.

Here we tell Kafka to consume messages from the ```game_events``` topic from the beginning onward. Running this command twice will create the topic if it doesn't already exist. We run Kafkacat without the ```-e``` option so it will run continuously.

```bat

docker-compose exec mids kafkacat -C -b kafka:29092 -t game_events -o beginning

```

### Run Flask Server.

Here we start our Flask server via the ```mids``` container. We specify the global environment variable so that Flask knows where to run from and we open up localhost as specified in the YML file on port 5000. This will listen for any and all requests on port 5000 to the Flask app.

```bat

docker-compose exec mids env FLASK_APP=/w205/project-3-karthikrbabu/game_api.py flask run --host 0.0.0.0

```

### Test the Flask Server


Here we run a variety of commands that make requests to our Flask server in various ways.


Individual **cURL request.** cURL is for transferring data using various network protocols. The name stands for "Client URL", and here we use HTTP to fire a request to our localhost.
```bat 
docker-compose exec mids curl 'http://localhost:5000/join_a_guild?region=cali'

docker-compose exec mids curl 'http://localhost:5000/purchase_sword?metal=copper&power_level=100&magical=True'
```

______________________________________________________________________

This results in **404** because this API route is not supported
```bat 
docker-compose exec mids curl 'http://localhost:5000/purchase_a_dummy_sword'
```


______________________________________________________________________

Here we use **Apache Bench** to fire batch requests with the specified options
* -n : is the number of requests
* -H : adds an Auth header (could be Basic or Token)
```bat 
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" 'http://localhost:5000/purchase_sword'
```

### Test the Flask Server (part 2)

Here we use a BASH script that consolidates the various commands listed previously to submit multiple requests to our Flask server in a pseudo-automated fashion. I have created the below listed files, please look into them for detailed descriptions.

* ```basic_ab.sh```
* ```complex_ab_limit.sh```
* ```complex_ab_infinite.sh```    <em>(this will be used later on)</em>

Will run the basic script to generate events into Kafka. It is essentially a sequence of Apache Bench commands.
```bat 
sh scripts/basic_ab.sh
```

______________________________________________________________________

Will run the limited complex script to generate events into Kafka. The script has some variety based on a counter and if you are on an even or odd iteration of the loop. The loop will run the number of times we specify as a command line argument, in this case 10.
```bat 
sh scripts/complex_ab_limit.sh 10 
```


### Start Spark stream processing.

Using spark submit we kick off a spark streaming job. This listen for, decorates, and processes events that get landed into Kafka in a micro-batch of 15 seconds (aka our threshold that we call streaming!) From here we store events into HDFS as parquet files.

You will find more details in the file ```write_events_stream.py```


```bat

docker-compose exec spark spark-submit /w205/project-3-karthikrbabu/write_events_stream.py

```

### Generate real time events.

Now that Spark is up and running and listening for events. We can start real time event generation achieved through the ```complex_ab_infinite.sh``` BASH script. Running this script will firing requests to our Flask server, and hence pumping events into Kafka. We can let this run as it is an infinite loop. To end the process hit the following key combo ```CTL + D```.


You will find more details in the file ```write_events_stream.py```


```bat

sh scripts/complex_ab_infinite.sh

```

### Create Hive Metastore - a.k.a. our HDFS phonebook.

Create "phonebook" for Presto to read from HDFS using Hive as a meta store that points to the right location to query the data from HDFS. Running the below command will use spark SQL to create these external tables that we can use as a pointer and schema definition to then query into HDFS.


You will find more details in the file ```write_hive_table.py```


```bat

docker-compose exec spark spark-submit /w205/project-3-karthikrbabu/write_hive_table.py

```

### Query with Presto.

Data is now flowing and being processed by Spark to then be stored in HDFS. Because we have setup the Hive metastore we can use Presto, which is a query engine to reference the tables that we have stored in HDFS. Instead of running it from the commmand line, for convenience I have installed the Presto drivers via PyHive below so that we can query directly from this notebook.

In [2]:
#Install PyHive which is a python interface into Hive

import sys
!{sys.executable} -m pip install PyHive

Collecting PyHive
  Downloading https://files.pythonhosted.org/packages/a8/ea/30c7edbd18101f65846bb5a3ffb94335350e2faf3d89cb14ab3a720a4a46/PyHive-0.6.2.tar.gz (42kB)
[K    100% |################################| 51kB 1.1MB/s ta 0:00:011
[?25hCollecting future (from PyHive)
  Downloading https://files.pythonhosted.org/packages/45/0b/38b06fd9b92dc2b68d58b75f900e97884c45bedd2ff83203d933cf5851c9/future-0.18.2.tar.gz (829kB)
[K    100% |################################| 829kB 500kB/s eta 0:00:01
Building wheels for collected packages: PyHive, future
  Running setup.py bdist_wheel for PyHive ... [?25ldone
[?25h  Stored in directory: /root/.cache/pip/wheels/b5/40/44/a8772f31ef706bdd1add700dbe7b1c8bbf1f527f8c1d1912b6
  Running setup.py bdist_wheel for future ... [?25ldone
[?25h  Stored in directory: /root/.cache/pip/wheels/8b/99/a0/81daf51dcd359a9377b110a8a886b3895921802d2fc1b2397e
Successfully built PyHive future
Installing collected packages: future, PyHive
Successfully installed PyHi

In [3]:
#From pyhive import the presto driver that lets us run presto queries
from pyhive import presto
import pandas as pd

In [4]:
# Setup our connection for Presto using Hive as the "catalog"
# Port 8080 works as indicated in the YML file from earlier
cursor = presto.connect(host="presto", port=8080, username='presto', catalog='hive', schema='default').cursor()

#### From here on we can run some very simple queries via Presto to verify what is in HDFS, and see what kind of interactions users have had with our "mobile app". 

In [5]:
#Run some commands! 
cursor.execute('show tables')
print(cursor.fetchall())

[('guild_joins',), ('sword_purchases',)]


#### Here we run the same commands twice consecutively to see the counts of requests growing in HDFS as the stream continues to populate events.

In [6]:
cursor.execute('select count(*) from guild_joins')
print("Requests to join a guild:")
print(cursor.fetchall())


cursor.execute('select count(*) from sword_purchases')
print("Requests to purchase a sword:")
print(cursor.fetchall())

Requests to join a guild:
[(70,)]
Requests to purchase a sword:
[(70,)]


#### Moments later....

In [7]:
cursor.execute('select count(*) from guild_joins')
print("Requests to join a guild:")
print(cursor.fetchall())


cursor.execute('select count(*) from sword_purchases')
print("Requests to purchase a sword:")
print(cursor.fetchall())

Requests to join a guild:
[(100,)]
Requests to purchase a sword:
[(100,)]


<hr>

#### Now we load up the respective tables queried in an instance to pandas dataframes for prettier viewing and some analysis.

In [13]:

cursor.execute('select * from sword_purchases')
sword_purchases = pd.DataFrame(cursor.fetchall(), columns=['raw_event','timestamp', 'Accept', 'Host', 'User_Agent', 'event_type',
                                             'metal', 'power_level','magical', 'remote_addr'])

print(sword_purchases.shape)
sword_purchases.head()

(110, 10)


Unnamed: 0,raw_event,timestamp,Accept,Host,User_Agent,event_type,metal,power_level,magical,remote_addr
0,"{""event_type"": ""purchase_sword"", ""remote_addr""...",2020-08-01 03:56:46.808,*/*,user1.att.com,,purchase_sword,gold,100,False,127.0.0.1
1,"{""event_type"": ""purchase_sword"", ""remote_addr""...",2020-08-01 03:56:46.824,*/*,user1.att.com,,purchase_sword,gold,100,False,127.0.0.1
2,"{""event_type"": ""purchase_sword"", ""remote_addr""...",2020-08-01 03:56:46.835,*/*,user1.att.com,,purchase_sword,gold,100,False,127.0.0.1
3,"{""event_type"": ""purchase_sword"", ""remote_addr""...",2020-08-01 03:56:46.843,*/*,user1.att.com,,purchase_sword,gold,100,False,127.0.0.1
4,"{""event_type"": ""purchase_sword"", ""remote_addr""...",2020-08-01 03:56:46.855,*/*,user1.att.com,,purchase_sword,gold,100,False,127.0.0.1


In [9]:
cursor.execute('select * from guild_joins')
guild_joins = pd.DataFrame(cursor.fetchall(), columns=['raw_event','timestamp', 'Accept', 'Host', 'User_Agent', 'event_type',
                                             'region', 'remote_addr'])

print(guild_joins.shape)
guild_joins.head()

(110, 8)


Unnamed: 0,raw_event,timestamp,Accept,Host,User_Agent,event_type,region,remote_addr
0,"{""Host"": ""user2.comcast.com"", ""event_type"": ""j...",2020-08-01 03:55:22.346,*/*,user2.comcast.com,,join_a_guild,cali,127.0.0.1
1,"{""Host"": ""user2.comcast.com"", ""event_type"": ""j...",2020-08-01 03:55:22.352,*/*,user2.comcast.com,,join_a_guild,cali,127.0.0.1
2,"{""Host"": ""user2.comcast.com"", ""event_type"": ""j...",2020-08-01 03:55:22.358,*/*,user2.comcast.com,,join_a_guild,cali,127.0.0.1
3,"{""Host"": ""user2.comcast.com"", ""event_type"": ""j...",2020-08-01 03:55:22.362,*/*,user2.comcast.com,,join_a_guild,cali,127.0.0.1
4,"{""Host"": ""user2.comcast.com"", ""event_type"": ""j...",2020-08-01 03:55:22.375,*/*,user2.comcast.com,,join_a_guild,cali,127.0.0.1


### Some Analysis

We can see that each table above has 110 rows

#### Run a simple group by operation to see what types of swords were bought!

In [18]:
cursor.execute('select metal, count(*) from sword_purchases GROUP BY metal')
print(cursor.fetchall())

[('copper', 50), ('gold', 60)]


#### Run a simple group by operation to see how many people are in each region's guild!

In [19]:
cursor.execute('select region, count(*) from guild_joins GROUP BY region')
print(cursor.fetchall())

[('ny', 60), ('cali', 50)]


### And thats it!

Through this we've gone through an end to end pipeline of generating events, landing them, and being able to query them. Let us go out into the world and use the knowledge gained from w205! :)

### #docker-compose-down 😅