## Introduction

**TO-DO:** Keeping for reference before submission. Insert more details here on what can be done in the pipeline, and the business questions we will be answering.

In this report, we will demonstate how to build a pipeline to enable a data science team to obtain valuable insights on game events.  

The latest mobile game in a game development company has three events:  join a guild, purchase a sword, and purchase a horse.  To enable analytics for these events and the metadata of each of the events, a pipeline has been created to allow analysts to access the data.  

In kafka a singular topic called "topics" has been created to track the events of join a guild, purchase a sword, and purchase a sword.   A single topic is intentional to allow filtering.   

Python script filters for each event have been created and submits separated event tables to hdfs depending on the event type.  Error handling has also been added to ensure events are added.  This approach allows the availability to quickly adjust each event.  For new events, a seperate python script can be created.  For example, if meta data is needed for swords the write_sword_events.py can be edited.  If a new event such as, purchase a spell book is needed in the future, a wrtie_spellbook_events.py should be created.

In the flask app, events can be triggered with the corresponding meta data and host information for anlaysis.  Values are saved in a dictionary in a single key to value to allow simplicity in reporting and scale.   Meaning, if additional values are added or exisitng values are replaced, the schema is easily adjustable.

Meta data to each event:

* purchase_a_sword: accepts color and quantity parameters  
* purchase_a_horse: accepts speed, size, and quantity parameters  
* guild: accepts inputs to join or leave the guild  
* default_response: this does not accept any parameters and is run when the user runs a blank input  

Meta data for the users when an event is triggered outside of events and user agent:

* timestamp:  information allowing analysis on when the event occurs.   Use this value for understanding state changes over time.
* Host:  information for allowing analysis on the User Id.

Data is then saved in parquet for querying to answer the following business questions:

* Guild Stats - How many people are in the guild?   
* Guild Stats - How many people joined the guild in the past year?   
* Horse Stats - How many horses have been purchased by size?   
* Sword Stats - How many swords have been purchased by color?   
* Sword Stats - How many swords have been purchased in the past year?  

Tables are setup with a positive or negative quantity to determine the state (total value) of swords and horses and state of being in a guild.    To query the total, sum the quantity of swords and horses.   To determine the state of a guild, a value of +1 indicates in a guild and a -1 indicates not in a guild.

For unit testing, apache bench was leveraged to send batch records to create mock data into parquet and enable testing of reporting.    Scenarios of testing data was coded into apache bench this approach ensures if 10 values were created we can expect the 10 values in the reporting/analysis.


## Setting up the pipeline
Firstly, we will showcase how we built the pipeline. We have chosen to use Kafka, Hadoop and Spark to transport, transform and store the game events. We have defined the configuration for each of these in the **docker-compose.yml** file, below is a description of each part of this file.

### Zookeeper & Kafka
Zookeeper allows us to easily access and manage the Kafka instance. Kafka allows us to easily create a pipeline that we can publish game events to, which can then be consumed by Spark.  

We have opened ports in Zookeeper we have have also reference in the Kafka configuration, this allows us to create topics in Kafka via Zookeeper. We have also exposed ports in Kafka that allow us to publish and consume messages from the topics.

```bash
zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    expose:
      - "2181"
      - "2888"
      - "32181"
      - "3888"
    extra_hosts:
      - "moby:127.0.0.1"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    expose:
      - "9092"
      - "29092"
    extra_hosts:
      - "moby:127.0.0.1"
```

### Hadoop
We have added Cloudera configuration to be able to use Hadoop.

```bash
  cloudera:
    image: midsw205/cdh-minimal:latest
    expose:
      - "8020" # nn
      - "50070" # nn http
      - "8888" # hue
    extra_hosts:
      - "moby:127.0.0.1"
```

### Spark
We have set up Spark using the MIDS W205 Spark Python base image. We're specifying the dependencyof this service on Cloudera and the Hadoop name node that Spark will use when writing to HDFS.    
In addition to this, we have exposed additional ports which allow us to connect to Spark from a notebook.

```bash
  spark:
    image: midsw205/spark-python:0.0.5
    stdin_open: true
    tty: true   
    volumes:
      - "~/w205:/w205"
    command: bash
    depends_on:
      - cloudera
    environment:
      HADOOP_NAMENODE: cloudera
    expose:
      - "8888"
      - "7000" #jupyter notebook      
    ports:
      - "8888:8888"
      - "7000:7000" # map instance:service port   
    extra_hosts:
      - "moby:127.0.0.1"
```

### MIDS Base Image
This is the base image that we will use in the container, it allows us to run bash commands, as well as using kafkacat to publish messages to Kafka. We're specifying the w205 volume so that we have access to files.  

```bash
  mids:
    image: midsw205/base:latest
    stdin_open: true
    tty: true
    expose:
      - "5000"
    ports:
      - "5000:5000"
    volumes:
      - "~/w205:/w205"
    extra_hosts:
      - "moby:127.0.0.1"
```

## Bring up the pipeline
To do this we run the following command:

```bash
docker-compose up -d
```

## Interacting with the pipeline

### Flask app

Our Flask app is built in the game_api.py file. The app is designed to connect with Kafka, so that any events generated by a user are captured in our Kafla topic called "events". The app consists of four different functions: 

1. `purchase_a_sword`: accepts color and quantity parameters 
2. `purchase_a_horse`: accepts speed, size, and quantity parameters
3. `guild`: accepts inputs to join or leave the guild
4. `default_response`: this does not accept any parameters and is run when the user runs a blank input

Each function is structured as a post API request. As an example, see the code below for the purchase_a_horse function. The function accepts parameters in the API call and are captured in a dictionary. Next, we wrote error handling code so the function only accepts valid user inputs. Finally, the dictionary is written to our Kafka topic, "events", which captures the user inputs from any of our four functions.

```python
@app.route("/purchase_a_horse/<speed>/<size>/<quantity>", methods=["POST"])
def purchase_a_horse(speed, size, quantity):
    """
    Inputs:
    - speed
    - size (small, medium, or large)
    - quantity
    """
    
    # collect user inputs
    purchase_horse_event = {
        "event_type": "purchase_horse", 
        "speed": speed, 
        "size": size, 
        "quantity": quantity}
    
    # error handling
    if purchase_horse_event['size'].lower() not in ['small', 'medium', 'large']:
        raise Exception("Please enter either 'small', 'medium' or 'large' for horse size")
        
    elif float(purchase_horse_event['speed']) < 0:
        raise Exception("Please enter a non-negative value for speed")
        
    elif float(purchase_horse_event['quantity']) < 0:
        raise Exception("Please enter a non-negative value for quantity")
        
    else:
        # clean inputs to collect only lower case values for consistency
        purchase_horse_event['size'] = purchase_horse_event['size'].lower()
        
        # log event to kafka
        log_to_kafka("events", purchase_horse_event)
        return "Horse Purchased!\n"
```


### Extracting events

After the users have used the app and generated events stored in the Kafka topic, we utilized Pyspark to extract the events and land them in HDFS. We wrote three separate Python scripts to individually extract data for each event type (sword, horse, and guild). Each script consists of three main steps: (1) read the events from the Kafka topic, (2) filter them to extract events of a specific type, and (3) write them to HDFS tables. We have three separate HDFS tables with their own unique schema that are specific to the event type.

As an example, the code below shows how we filter the horse events and write them to HDFS in parquet format. 

```python
horse_purchases = raw_events \
    .filter(is_horse_purchase(raw_events.value.cast('string'))) \
    .select(raw_events.value.cast('string').alias('raw_event'),
            raw_events.timestamp.cast('string'),
            from_json(raw_events.value.cast('string'),
                      purchase_horse_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')

horse_purchases.show()
horse_purchases.write.mode("overwrite")\
    .parquet("/tmp/horse_purchases")
```


## Analyzing the events
**TO-DO:** business analysis goes here

In [None]:
sword_purchases = spark.read.parquet('/tmp/sword_purchases')

In [5]:
sword_purchases.show()

+--------------------+--------------------+------+-----------------+---------------+--------------+-----+--------+
|           raw_event|           timestamp|Accept|             Host|     User-Agent|    event_type|color|quantity|
+--------------------+--------------------+------+-----------------+---------------+--------------+-----+--------+
|{"event_type": "p...|2021-07-20 03:47:...|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  red|       2|
|{"event_type": "p...|2021-07-20 03:47:...|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  red|       2|
|{"event_type": "p...|2021-07-20 03:47:...|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  red|       2|
|{"event_type": "p...|2021-07-20 03:47:...|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  red|       2|
|{"event_type": "p...|2021-07-20 03:47:...|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  red|       2|
+--------------------+--------------------+------+-----------------+------------

In [8]:
sword_purchases.registerTempTable('sword_purchases')
total_swords = spark.sql("select Host, color, sum(quantity) total from sword_purchases group by Host, color")
total_swords.show()

+-----------------+-----+-----+
|             Host|color|total|
+-----------------+-----+-----+
|user1.comcast.com|  red| 10.0|
+-----------------+-----+-----+

