# Project 3 Report: Peter Morgan, Bruce Lam, Eda Kavlakoglu

In [1]:
# Importing Modules
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

## Executive Summary

 The data engineering court at Renaissance Games is pleased to present our lords and ladies with an analytics pipeline to keep a watchful eye on the activities of the merchants within the market and also to note the comings of knights and ladies in the guilds. 
 
 Our sorcery (stack) of choice is as follows:
 
 - Apache Bench - "game client" sending  player events into data pipeline
 - Flask - app that runs the game Application Programming Interface
 - Kafka - platform for ingesting streaming data and passing to downstream applications
 - Spark - tool to filter and transform data and push to or pull from HDFS (Hadoop Distributed File System)
 - Hadoop - distributed file system for managing parquet files
 - Hive - intermediary to track and agree upon schema and create tables
 - Presto - query tool for summarizing and reporting analytics on purchases and guild activity
 
Prithee see the current summary of activity in the market and guilds.
 
 ### Sword sale summary:
 
 Most popular sword: sharp = 154, normal = 154
 
 Total swords purchased: 308
 
 ### Guild summary
 
 Kavlakoglu guild member count: 199
 
 Morgan guild member count: 201
 
 Lam guild member count: 216
 
 For a detailed breakdown of randomly and manually generated events, prithee see the Business Analytics Questions section
 
Gramercy

## Repository Description
* Project_3.ipynb - documentation of steps, calls, and code
* docker-compose.yml - configuration of the docker containers used for this analytics pipline
* game_api.py - application programming interface to create events

## Detailed Pipeline Breakdown

### Set Up
Below we will describe in detail how the pipeline is spun up. First, we need to change into the w205 directory and clone the project repository.

```console
cd ~/w205
git clone https://github.com/mids-w205-martin-mims/project-3-superpeter55.git
```

Next, we must move into our new directory and create a new branch "assignemnt". Using the commands below, we can switch over to that branch and confirm that we are on the assignment branch. 

```console
cd project-3-superpeter55
git branch assignment
git checkout assignment
git status
```
```console
On branch assignment
```

From there, we'll copy our docker-compose file from week 13 content to run our docker images. The period "." at the end of the command copies the selected file into the current directory. From there, we'll update the copied file to ensure our docker containers run properly. We'll do this by: 
- Commenting out two lines in the cloudera service that read "ports:" and "8888:8888"
- Uncommenting these lines in the spark service, enabling us to run a jupyter notebook with pyspark in port 8888

```console
cp ~/w205/course-content/13-Understanding-Data/docker-compose.yml .
```

Now, we will spin up our cluster. The -d at the end runs the containers in the background and we are able to continue to use our console. Next, we will confirm that all the containers are up. If all the containers are in an "up" state, we can proceed.

```console
docker-compose up -d
```

Since our cluster is running, we will create our kafka topic. We will use the exec command on kafka to execute a single kafka command. In this case, we run the kafka-topics command with the --create option to create a topic. The --topic option is used to name the topic "events". We chose to name this topic events because we are collecting game event data for this project. We use the --partitions option to specify that we only need one partition. The --if-not-exists option prevents two topics with the same name from being created. The --zookeeper option is specifying which port we would like to connect our topic to. If the console outputs "Created topic events", we know that this command has successfully executed.

```console
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```

Next, we will set up the game_api.py file to define different event types. We will copy the game_api.py file from week 12 folder and then make some modifications, using the command below:

```console
cp ~/w205/course-content/12-Querying-Data-II/game_api.py .
```

The game_api.py file from week 12 does not contain the functions, "buy a sword" or "join a guild", so we will need to create them. For the first function, we give the user the option to buy a sword or to buy a sharp sword. We also have 3 guilds that the user can join; each of which represent a team member on this project. These include: 
- the Morgan guild
- the Lam guild
- the Kavlakoglu guild

Separate events for each of these guilds have been created for a user to join that specific guild; the generic join_guild event will randomly select a guild to join.

These functions are very similar to the function that already exists in the game_api.py file named purchase_a_sword(). The first line of each function specifies the path to trigger each event. More specifically, we use /buy_a_sword or /purchase_a_sharp_sword to trigger the buy sword event. Similarly, to join a guild, we use /join_guild to join a random guild or we use specified guild paths for the user to choose a particular guild. Each of these functions define a dictionary of metadata, and all events have an event type. For buy_sword events, we specify the sword_type as either normal or sharp. For join_guild events, we specify the guild_name, and the assignment_type. The assignment type indicates if users joined the guild randomly or if they chose their guild. The log_to_kafka step sends the event and event metadata to the kafka topic "events". Finally, the return statement notifies the user.  

```python
@app.route("/buy_a_sword")
def buy_a_sword():
    buy_sword_event = {'event_type': 'buy_sword',
                       'sword_type': 'normal'}
    log_to_kafka('events', buy_sword_event)
    return "Sword Bought!\n"

@app.route("/join_guild")
def join_guild():
    guilds = ['Morgan','Lam','Kavlakoglu']
    guild = random.choice(guilds)
    join_guild_event = {'event_type': 'join_guild',
                        'guild_name': guild,
                        'assignment_type': 'random'}
    log_to_kafka('events', join_guild_event)
    return "Joined " + guild + " Guild!\n"

@app.route("/purchase_a_sharp_sword")
def purchase_a_sharp_sword():
    purchase_sword_event = {'event_type': 'buy_sword',
                            'sword_type': 'sharp'}
    log_to_kafka('events', purchase_sword_event)
    return "Sharp Sword Purchased!\n"

@app.route("/join_guild_morgan")
def join_guild_morgan():
    join_guild_event = {'event_type': 'join_guild',
                        'guild_name': 'Morgan',
                        'assignment_type': 'manual'}
    log_to_kafka('events', join_guild_event)
    return "Joined Morgan Guild!\n"

@app.route("/join_guild_lam")
def join_guild_lam():
    join_guild_event = {'event_type': 'join_guild',
                        'guild_name': 'Lam',
                        'assignment_type': 'manual'}
    log_to_kafka('events', join_guild_event)
    return "Joined Lam Guild!\n"

@app.route("/join_guild_kavlakoglu")
def join_guild_kavlakoglu():
    join_guild_event = {'event_type': 'join_guild',
                        'guild_name': 'Kavlakoglu',
                        'assignment_type': 'manual'}
    log_to_kafka('events', join_guild_event)
    return "Joined Kavlakoglu Guild!\n"
```

Now that we have updated our game_api.py file, we can run our flask app through the game_api.py file. The code to run our flask app already exists in the game_api file so we just need to run the file and specify the host which in this case is 0.0.0.0. 

```console
docker-compose exec mids env FLASK_APP=/w205/project-3-superpeter55/game_api.py flask run --host 0.0.0.0
```

Note that the flask app is now running in the window. We will need to open a new terminal to perform commands. When we do this, we need to navigate to the appropriate directory using the following commands.

```console
cd w205
cd project-3-superpeter55/
```

Next, we want to spin up a jupyter notebook with pyspark, but before we can, we must create a symbolic link to access our mounted w205 directory. The first line uses docker-compose exec to open up a spark bash shell. The second line creates the link using the ln command and specifies the -s option to create a symbolic link. The third line exits and returns us to the shell.

```console
docker-compose exec spark bash
ln -s /w205 w205
exit 
```

Now we are ready to spin up our notebook using pyspark with the command below. Once again we are using docker-compose exec to run a single spark command. The spark command sets up a notebook in port 8888 and ip address 0.0.0.0. After this command, a URL is returned. In that URL, we replace 0.0.0.0 with our local machine ip address, and we paste the link in our browser to access our notebook.

```console
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark
```

Now that we are in our notebook, we want to check that the spark session and spark context are accessible in the notebook. The two code cells below show that spark is properly running in this instance and we are ready to proceed. We will move forward to testing our pipeline in batch mode and generating data with apache bench.

In [2]:
spark

In [3]:
sc

## Batch Mode

### Generating Events

To test our pipeline, we will use Apache Bench to generate events and run them through the pipeline. To test our pipeline, we will generate 3 events of each type from 2 separate users, which we will call, Player_1 and Player_2. Once again, we will use docker compose exec to execute Apache Bench commands below. Each event is very similar in that it calls "ab", which stands for Apache Bench. The -n argument specifies how many times we would like to call this event. As you can see below, we call each event 3 times. The -H argument specifies the host or user which we specify as either Player_1 or Player_2. Finally, the path at the end indicates which event from our game_api.py file will be called. 

We have decided to call every event in the api 3 times across each user to account for all possible actions an individual user can take. Since we have 6 events in our api and 2 users generating test data, it makes sense that we have 12 calls to apache bench specified below. 

Please Note: you must run each line separately

```console
docker-compose exec mids ab -n 3 -H "Host: Player_1" http://localhost:5000/
docker-compose exec mids ab -n 3 -H "Host: Player_2" http://localhost:5000/
docker-compose exec mids ab -n 3 -H "Host: Player_1" http://localhost:5000/purchase_a_sharp_sword
docker-compose exec mids ab -n 3 -H "Host: Player_2" http://localhost:5000/purchase_a_sharp_sword
docker-compose exec mids ab -n 3 -H "Host: Player_1" http://localhost:5000/buy_a_sword
docker-compose exec mids ab -n 3 -H "Host: Player_2" http://localhost:5000/buy_a_sword
docker-compose exec mids ab -n 3 -H "Host: Player_1" http://localhost:5000/join_guild
docker-compose exec mids ab -n 3 -H "Host: Player_2" http://localhost:5000/join_guild
docker-compose exec mids ab -n 3 -H "Host: Player_1" http://localhost:5000/join_guild_morgan
docker-compose exec mids ab -n 3 -H "Host: Player_2" http://localhost:5000/join_guild_morgan
docker-compose exec mids ab -n 3 -H "Host: Player_2" http://localhost:5000/join_guild_lam
docker-compose exec mids ab -n 3 -H "Host: Player_1" http://localhost:5000/join_guild_lam
docker-compose exec mids ab -n 3 -H "Host: Player_2" http://localhost:5000/join_guild_kavlakoglu
docker-compose exec mids ab -n 3 -H "Host: Player_1" http://localhost:5000/join_guild_kavlakoglu
```

Below, we define functions that will determine if each event is a purchase event, join event, or default event and these will be used to classify events both in batch mode and later in streaming mode. 

In [4]:
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'buy_sword':
        return True
    return False

@udf('boolean')
def is_join(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'join_guild':
        return True
    return False

@udf('boolean')
def is_default(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'default':
        return True
    return False

Next, we will read in our raw events that we have generated from Apache bench and then show them to see what they look like

In [5]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [6]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2021-12-08 03:32:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2021-12-08 03:32:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2021-12-08 03:32:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2021-12-08 03:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2021-12-08 03:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     5|2021-12-08 03:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     6|2021-12-08 03:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     7|2021-12-08 03:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0

We don't need most of the information above, we really just need the value and timestamp. We use select to pull these values out and we use the is_purchase function defined above to only get the purchased events for analysis. We then show the output

In [7]:
purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

In [8]:
purchase_events.show()

+--------------------+--------------------+
|                 raw|           timestamp|
+--------------------+--------------------+
|{"Host": "Player_...|2021-12-08 03:33:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
+--------------------+--------------------+



Next we need to unpack the json in the raw column which is done below. The following cell shows the resulting schema and the cell after that shows the what the dataframe looks like. We have 12 events as expected because we have 2 players, each buying 3 normal swords and 3 sharp swords.

In [9]:
extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [10]:
extracted_purchase_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [11]:
extracted_purchase_events.show()

+------+--------+---------------+----------+----------+--------------------+
|Accept|    Host|     User-Agent|event_type|sword_type|           timestamp|
+------+--------+---------------+----------+----------+--------------------+
|   */*|Player_1|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|    normal|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|    normal|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|    normal|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3| buy_sword|    normal|2021-12-08 03:33:...|

Finally, we write this data to parquet so it can be be queryable.

In [12]:
extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

We need to repeat this process with both join events and default events. The process for this is identical except we use is_join to filter for join events and is_default to filter for default events. The first cell filters the raw events to only include guild joins and shows these events.

In [13]:
join_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_join('raw'))
        
join_events.show()

+--------------------+--------------------+
|                 raw|           timestamp|
+--------------------+--------------------+
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:33:...|
|{"event_type": "j...|2021-12-08 03:34:...|
|{"event_type": "j...|2021-12-08 03:34:...|
|{"event_type": "j...|2021-12-08 03:34:...|
|{"event_type": "j...|2021-12-08 03:34:...|
|{"event_type": "j...|2021-12-08

Below we unwrap the join events json, print the schema and show the data. You can see that when the assignment type is random the guild the user joins varies.

In [14]:
extracted_join_events = join_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()
        
extracted_join_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- assignment_type: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- guild_name: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [15]:
extracted_join_events.show()

+------+--------+---------------+---------------+----------+----------+--------------------+
|Accept|    Host|     User-Agent|assignment_type|event_type|guild_name|           timestamp|
+------+--------+---------------+---------------+----------+----------+--------------------+
|   */*|Player_1|ApacheBench/2.3|         random|join_guild|       Lam|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3|         random|join_guild|    Morgan|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3|         random|join_guild|Kavlakoglu|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         random|join_guild|       Lam|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         random|join_guild|Kavlakoglu|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         random|join_guild|    Morgan|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         manual|join_guild|    Morgan|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         manual|join_guild|    Morgan

Finally, we are writing our join data to parquet format

In [16]:
extracted_join_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/joins')

Finally, we are on the default events. Like before, we filter out the raw data to only include our default events.

In [17]:
default_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_default('raw'))
        
default_events.show()

+--------------------+--------------------+
|                 raw|           timestamp|
+--------------------+--------------------+
|{"Host": "Player_...|2021-12-08 03:32:...|
|{"Host": "Player_...|2021-12-08 03:32:...|
|{"Host": "Player_...|2021-12-08 03:32:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
|{"Host": "Player_...|2021-12-08 03:33:...|
+--------------------+--------------------+



Once again, we unwrap the json format of our default events, print the schema and print our data. As expected we have 6 entries since we generated 3 events for each player.

In [18]:
extracted_default_events = default_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()
        
extracted_default_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [19]:
extracted_default_events.show()

+------+--------+---------------+----------+--------------------+
|Accept|    Host|     User-Agent|event_type|           timestamp|
+------+--------+---------------+----------+--------------------+
|   */*|Player_1|ApacheBench/2.3|   default|2021-12-08 03:32:...|
|   */*|Player_1|ApacheBench/2.3|   default|2021-12-08 03:32:...|
|   */*|Player_1|ApacheBench/2.3|   default|2021-12-08 03:32:...|
|   */*|Player_2|ApacheBench/2.3|   default|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|   default|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|   default|2021-12-08 03:33:...|
+------+--------+---------------+----------+--------------------+



Now we write our data to parquet format in HDFS.

In [20]:
extracted_default_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/defaults')

Now that we have written our data to parquet. We will read it in and do some basic queries to ensure our pipeline is functioning properly. First we read in our purchase data from HDFS and register a temporary table to perform spark queries.

In [21]:
purchases = spark.read.parquet('/tmp/purchases')
purchases.show()

+------+--------+---------------+----------+----------+--------------------+
|Accept|    Host|     User-Agent|event_type|sword_type|           timestamp|
+------+--------+---------------+----------+----------+--------------------+
|   */*|Player_1|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|    normal|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|    normal|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|    normal|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3| buy_sword|    normal|2021-12-08 03:33:...|

In [22]:
purchases.registerTempTable('purchases')

Now, we perform a simple query to show it is working. In this case we selecting all purchase data from player 1 and as expected we have 6 entries. Three instances of buying a normal sword and 3 instances of buying a sharp sword.

In [23]:
purchases_player1 = spark.sql("select * from purchases where host='Player_1'")
purchases_player1.show()

+------+--------+---------------+----------+----------+--------------------+
|Accept|    Host|     User-Agent|event_type|sword_type|           timestamp|
+------+--------+---------------+----------+----------+--------------------+
|   */*|Player_1|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|     sharp|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|    normal|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|    normal|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3| buy_sword|    normal|2021-12-08 03:33:...|
+------+--------+---------------+----------+----------+--------------------+



We will repeat the process for joins. Below we read in our guild join data from HDFS and show it. This is followed by registering the data as a temp table.

In [24]:
joins = spark.read.parquet('/tmp/joins')
joins.show()

+------+--------+---------------+---------------+----------+----------+--------------------+
|Accept|    Host|     User-Agent|assignment_type|event_type|guild_name|           timestamp|
+------+--------+---------------+---------------+----------+----------+--------------------+
|   */*|Player_1|ApacheBench/2.3|         random|join_guild|       Lam|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3|         random|join_guild|    Morgan|2021-12-08 03:33:...|
|   */*|Player_1|ApacheBench/2.3|         random|join_guild|Kavlakoglu|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         random|join_guild|       Lam|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         random|join_guild|Kavlakoglu|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         random|join_guild|    Morgan|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         manual|join_guild|    Morgan|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         manual|join_guild|    Morgan

In [25]:
joins.registerTempTable('joins')

This time, we will query our data to only show entries for player 2. Interestingly, on player 2's random guild joins. They joined each guild once!

In [26]:
joins_player2 = spark.sql("select * from joins where host='Player_2'")
joins_player2.show()

+------+--------+---------------+---------------+----------+----------+--------------------+
|Accept|    Host|     User-Agent|assignment_type|event_type|guild_name|           timestamp|
+------+--------+---------------+---------------+----------+----------+--------------------+
|   */*|Player_2|ApacheBench/2.3|         random|join_guild|       Lam|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         random|join_guild|Kavlakoglu|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         random|join_guild|    Morgan|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         manual|join_guild|    Morgan|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         manual|join_guild|    Morgan|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         manual|join_guild|    Morgan|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         manual|join_guild|       Lam|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|         manual|join_guild|       Lam

Finally, we finish up the process with reading in the default events and showing them. It is time to move on to streaming mode.

In [27]:
defaults = spark.read.parquet('/tmp/defaults')
defaults.show()

+------+--------+---------------+----------+--------------------+
|Accept|    Host|     User-Agent|event_type|           timestamp|
+------+--------+---------------+----------+--------------------+
|   */*|Player_1|ApacheBench/2.3|   default|2021-12-08 03:32:...|
|   */*|Player_1|ApacheBench/2.3|   default|2021-12-08 03:32:...|
|   */*|Player_1|ApacheBench/2.3|   default|2021-12-08 03:32:...|
|   */*|Player_2|ApacheBench/2.3|   default|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|   default|2021-12-08 03:33:...|
|   */*|Player_2|ApacheBench/2.3|   default|2021-12-08 03:33:...|
+------+--------+---------------+----------+--------------------+



## Streaming Mode
### Setup Event Catching

Before we begin streaming. We must define the schema of our events. We have 3 separate schema. One for purchase events, one for guild join events, and default event schema. This is done below. Note that we don't include timestamps in these schema because this schema is only defining what we need to catch from our json data and the timestamp is separate.

In [None]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- sword_type: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("sword_type", StringType(), True)
    ])

def join_guild_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- assignment_type: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- guild_name: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("assignment_type", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("guild_name", StringType(), True)
    ])

def default_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True)
    ])

The cell below uses spark to read our events in from our kafka server. Note that we have not started streaming events now but we are now ready to catch them. The cell below describes which raw data we will catch from each type of event (purchase, join, default). In each case we feed the json schema defined above and the timestamp to complete the data.

In [6]:
raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

In [7]:
sword_purchases = raw_events \
        .filter(is_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('json.*','timestamp')
        
guild_joins = raw_events \
        .filter(is_join(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          join_guild_event_schema()).alias('json')) \
        .select('json.*','timestamp')
        
defaults = raw_events \
        .filter(is_default(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          default_event_schema()).alias('json')) \
        .select('json.*','timestamp')

Below we print all the schema to ensure it looks the same as in batch mode which it does.

In [8]:
sword_purchases.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [9]:
guild_joins.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- assignment_type: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- guild_name: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [10]:
defaults.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



Finally, we will start a sink for each type of event. This actively writes our data to HDFS every 10 seconds. Once again, we have not fed any data yet so right now it will just be blank parquet files but once we begin streaming it will write to HDFS all the data we are streaming.

In [11]:
sink_sword = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()
        
sink_guild = guild_joins \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_guild_joins") \
        .option("path", "/tmp/guild_joins") \
        .trigger(processingTime="10 seconds") \
        .start()
        
sink_default = defaults \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_defaults") \
        .option("path", "/tmp/defaults") \
        .trigger(processingTime="10 seconds") \
        .start()

### Generating Events

Open another terminal window, run while loop to continuously generate events. In this case, each player is going to perform every event the api has to offer once every 10 seconds. Once again, Apache Bench is used to generate the events. The only difference between the streaming version and the batch version is that we are specifying each event to only happen once. Additionally, we are also going to run each bench command once every 10 seconds; in this way, we are continuously streaming data.

```console
while true; do
    docker-compose exec mids ab -n 1 -H "Host: Player_1" http://localhost:5000/
    docker-compose exec mids ab -n 1 -H "Host: Player_2" http://localhost:5000/
    docker-compose exec mids ab -n 1 -H "Host: Player_1" http://localhost:5000/purchase_a_sharp_sword
    docker-compose exec mids ab -n 1 -H "Host: Player_2" http://localhost:5000/purchase_a_sharp_sword
    docker-compose exec mids ab -n 1 -H "Host: Player_1" http://localhost:5000/buy_a_sword
    docker-compose exec mids ab -n 1 -H "Host: Player_2" http://localhost:5000/buy_a_sword
    docker-compose exec mids ab -n 1 -H "Host: Player_1" http://localhost:5000/join_guild
    docker-compose exec mids ab -n 1 -H "Host: Player_2" http://localhost:5000/join_guild
    docker-compose exec mids ab -n 1 -H "Host: Player_1" http://localhost:5000/join_guild_morgan
    docker-compose exec mids ab -n 1 -H "Host: Player_2" http://localhost:5000/join_guild_morgan
    docker-compose exec mids ab -n 1 -H "Host: Player_2" http://localhost:5000/join_guild_lam
    docker-compose exec mids ab -n 1 -H "Host: Player_1" http://localhost:5000/join_guild_lam
    docker-compose exec mids ab -n 1 -H "Host: Player_2" http://localhost:5000/join_guild_kavlakoglu
    docker-compose exec mids ab -n 1 -H "Host: Player_1" http://localhost:5000/join_guild_kavlakoglu
  sleep 10
done
```

Now, we are ready to start Hive to define our schema. This is done below.

```console
docker-compose exec cloudera hive
```

In hive, we run the commands below to set up the schema for our tables in presto.

```console
create external table if not exists default.sword_purchases (Accept string, Host string, `User-Agent` string, event_type string, sword_type string, timestamp string) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY");

create external table if not exists default.guild_joins (Accept string, Host string, `User-Agent` string, assignment_type string, event_type string, guild_name string, timestamp string) stored as parquet location '/tmp/guild_joins'  tblproperties ("parquet.compress"="SNAPPY");

create external table if not exists default.defaults (Accept string, Host string, `User-Agent` string, event_type string, timestamp string) stored as parquet location '/tmp/defaults'  tblproperties ("parquet.compress"="SNAPPY");
```

Now, we exit Hive and startup Presto

```console
exit;
docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
```

We will run a few test queires to ensure the data has landed in presto correctly. First, we look at the first 5 rows of each table we have and make sure they look correct which they do.

```console
select * from sword_purchases limit 5;
```
```console
 accept |   host   |   user-agent    | event_type | sword_type |        timestamp        
--------+----------+-----------------+------------+------------+-------------------------
 */*    | Player_1 | ApacheBench/2.3 | buy_sword  | sharp      | 2021-12-09 14:24:01.578 
 */*    | Player_2 | ApacheBench/2.3 | buy_sword  | sharp      | 2021-12-09 14:24:02.376 
 */*    | Player_1 | ApacheBench/2.3 | buy_sword  | normal     | 2021-12-09 14:24:02.856 
 */*    | Player_2 | ApacheBench/2.3 | buy_sword  | normal     | 2021-12-09 14:24:03.35  
 */*    | Player_1 | ApacheBench/2.3 | buy_sword  | sharp      | 2021-12-09 14:25:47.571 
(5 rows)
```
```console
select * from guild_joins limit 5;
```
```console
 accept |   host   |   user-agent    | assignment_type | event_type | guild_name |        timestamp        
--------+----------+-----------------+-----------------+------------+------------+-------------------------
 */*    | Player_1 | ApacheBench/2.3 | manual          | join_guild | Kavlakoglu | 2021-12-09 14:29:30.718 
 */*    | Player_1 | ApacheBench/2.3 | random          | join_guild | Kavlakoglu | 2021-12-09 14:24:21.025 
 */*    | Player_2 | ApacheBench/2.3 | random          | join_guild | Kavlakoglu | 2021-12-09 14:24:22.19  
 */*    | Player_1 | ApacheBench/2.3 | manual          | join_guild | Morgan     | 2021-12-09 14:24:22.931 
 */*    | Player_2 | ApacheBench/2.3 | manual          | join_guild | Morgan     | 2021-12-09 14:24:23.399 
(5 rows)
```
```console
select * from defaults limit 5;
```
```console
 accept |   host   |   user-agent    | event_type |        timestamp        
--------+----------+-----------------+------------+-------------------------
 */*    | Player_1 | ApacheBench/2.3 | default    | 2021-12-09 14:23:59.827 
 */*    | Player_1 | ApacheBench/2.3 | default    | 2021-12-09 14:29:41.517 
 */*    | Player_2 | ApacheBench/2.3 | default    | 2021-12-09 14:29:41.96  
 */*    | Player_2 | ApacheBench/2.3 | default    | 2021-12-09 14:24:00.601 
 */*    | Player_1 | ApacheBench/2.3 | default    | 2021-12-09 14:30:35.119 
(5 rows)
```

Next, we will test to see if streaming works. To do this, we run queries to count the number of rows. We then will wait a few minutes and run the same queries again to ensure the number of events has grown in each table. The initial set of queries is below.

```console
select count(accept) as num_rows_purchases 
       from sword_purchases;
```
```console
 num_rows_purchases 
--------------------
                216 
(1 row)
```
```console
select count(accept) as num_rows_joins 
       from guild_joins;
```
```console
 num_rows_joins 
----------------
            448 
(1 row)
```
```console
select count(accept) as num_rows_defaults 
       from defaults;
```
```console
 num_rows_defaults 
-------------------
               116 
(1 row)
```
After a few minutes, we run the queries again and can see that the amount of data has increased in each table as expected. It appears that streaming is working so it is time to move on to our analytics questions.
```console
select count(accept) as num_rows 
       from sword_purchases;
```
```console
 num_rows 
----------
      248 
(1 row)
```
```console
select count(accept) as num_rows_joins 
       from guild_joins;
```
```console
 num_rows_joins 
----------------
            504 
(1 row)
```
```console
select count(accept) as num_rows_defaults 
       from defaults;
```
```console
 num_rows_defaults 
-------------------
               128 
(1 row)
```

## Business Analytics Questions

First we stop streaming and then we will perform our analytics.

In [12]:
sink_sword.stop()
sink_guild.stop()
sink_default.stop()

The first set of queries counts the final size of our tables.

```console
select count(accept) as num_rows 
       from sword_purchases;
```
```console
 num_rows 
----------
      308 
(1 row)
```
```console
select count(accept) as num_rows_joins 
       from guild_joins;
```
```console
 num_rows_joins 
----------------
            616 
(1 row)
```
```console
select count(accept) as num_rows_defaults 
       from defaults;
```
```console
 num_rows_defaults 
-------------------
               154 
(1 row)
```

### Sword Sale Summary:

For our sword sale summary, we would like to know which type of sword is more popolar and the total number of swords purchased. We would expect no preference to a certain type of sword since the events are streamed at the same rate.

Most popular sword:

 ```console
 select sword_type, count(sword_type) as amount 
        from sword_purchases 
        group by sword_type;
 ```
 ```console
 sword_type | amount 
------------+--------
 sharp      |    154 
 normal     |    154 
(2 rows)
 ```

Total swords purchased:

 ```console
 select count(accept) as num_swords_purchased 
        from sword_purchases;
 ```
 ```console
 num_swords_purchased 
----------------------
                  308 
(1 row)
 ```

### Guild Summary:

 For our guild summary, our first question we would like to answer is how many members each guild has. As you can see the Lam guild has the most members followed by the Morgan guild and the Kavlakoglu guild has the fewest members.
 
 ```console
 select guild_name, count(guild_name) as members 
        from guild_joins 
        group by guild_name 
        order by members desc;
 ```
 ```console
 guild_name | members 
------------+---------
 Lam        |     216 
 Morgan     |     201 
 Kavlakoglu |     199 
(3 rows)
 ```
 
 Next, we would like to know how many members each guild has by specifying the guild they would like to join. We would expect this to be even since we stream in the same amount of joins for each memeber. We can see below that the number of manually joined member is even as expected.
 
 ```console
 select guild_name, count(guild_name) as manually_joined_members 
        from guild_joins 
        where assignment_type = 'manual' 
        group by guild_name 
        order by manually_joined_members desc;
 ```
 ```console
 guild_name | manually_joined_members 
------------+-------------------------
 Morgan     |                     154 
 Lam        |                     154 
 Kavlakoglu |                     154 
(3 rows)
 ```
 
 We would now like to know how many members each guild has by random assignment. You can see that the Lam guild had the most members randomly assigned followed by the Morgan guild and finally the Kavlakoglu guild. This matches up with the total guild members as expected.
 
 ```console
 select guild_name, count(guild_name) as randomly_joined_members 
        from guild_joins 
        where assignment_type = 'random' 
        group by guild_name 
        order by randomly_joined_members desc;
 ```
 ```console
 guild_name | randomly_joined_members 
------------+-------------------------
 Lam        |                      62 
 Morgan     |                      47 
 Kavlakoglu |                      45 
(3 rows)
 ```
 
 Finally, we see how many guild members were assigned to each guild by player 1 and player 2. Both players favored the Lam guild while player 2 favored the Lam guild more heavily.
 
 ```console
 select host as player, count(case when guild_name = 'Morgan' then 1 end) as morgan_joins,
        count(case when guild_name = 'Lam' then 1 end) as lam_joins,
        count(case when guild_name = 'Kavlakoglu' then 1 end) as kavlakoglu_joins
        from guild_joins group by host;
 ```
 ```console
  player  | morgan_joins | lam_joins | kavlakoglu_joins 
----------+--------------+-----------+------------------
 Player_1 |          101 |       106 |              101 
 Player_2 |          100 |       110 |               98 
(2 rows)
 ```
 
 Gramercy
