# Video Game Event Logging

First, I want to spin up my docker containers

```{}
docker-compose up -d
docker-compose ps
```

If everything goes well, we should have the zookeeer,kafka,cloudera,spark,presto,and mids containers up.

```
Creating network "project3mbearwberkeley_default" with the default driver
Creating project3mbearwberkeley_presto_1
Creating project3mbearwberkeley_cloudera_1
Creating project3mbearwberkeley_zookeeper_1
Creating project3mbearwberkeley_mids_1
Creating project3mbearwberkeley_kafka_1
Creating project3mbearwberkeley_spark_1
```

Next I want to create a topic called "events" where the item purchasing data is published. 

```
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```

Then I want to begin my game.api Flask app


```
docker-compose exec mids env FLASK_APP=/w205/project-3-mbearw-berkeley/game_api.py flask run --host 0.0.0.0
```
if done correctly, we should see it run at 0.0.0.0

Next, we want to start kafkacat to see item purchasing in real-time

```
docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning
```
it won't return anything at first, but that'll change once users start to purchase items!

Since BOTW2 isn't out yet, let's simulate some user action using Apache Bench

```
while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword;docker-compose exec mids ab -n 4 -H "Host: user3.tmobile.com" http://localhost:5000/purchase_a_blade;docker-compose exec mids ab -n 1 -H "Host: user3.tmobile.com" http://localhost:5000/join_mages_guild; sleep 5; done
```

Before we can use a Pyspark notebook to filter these records, we have to mount the directory.

```
docker-compose exec spark bash
ln -s /w205 w205
exit
```

now we can open the notebook

In [4]:
import json
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

We need to define our schema and define functions for filtering

In [5]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("description",StringType(),True)
    ])

In [6]:
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if (event['event_type'] == 'purchase_sword')|(event['event_type']=='purchase_blade'):
        return True
    return False


Let's load in the data from "events"...

In [7]:
raw_events = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .load()

And filter out sword purchases...

In [8]:
sword_purchases = raw_events \
    .filter(is_sword_purchase(raw_events.value.cast('string'))) \
    .select(raw_events.value.cast('string').alias('raw_event'),
            raw_events.timestamp.cast('string'),
            from_json(raw_events.value.cast('string'),
                      purchase_sword_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')


Finally we set up a StreamingQuery so our Hive table can incorporate new data on the fly

In [9]:
sink = sword_purchases \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
    .option("path", "/tmp/sword_purchases") \
    .trigger(processingTime="10 seconds") \
    .start()

In [None]:
sink.stop() # this stops the stream

Let's check out our files in Hadoop

```
docker-compose exec cloudera hadoop fs -ls /tmp/sword_purchases
```

we should see these two files in there

```
drwxr-xr-x   - root supergroup          0 2020-12-07 06:49 /tmp/sword_purchases/_spark_metadata
-rw-r--r--   1 root supergroup        777 2020-12-07 06:49 /tmp/sword_purchases/part-00000-dc691dfc-cd95-49a2-97dc-a7c4492cd18e-c000.snappy.parquet
```

Next let's create our Hive table and query it with Presto

Open Hive
```
docker-compose exec cloudera hive
```
and run

```
create external table if not exists default.sword_purchases (raw_event string, timestamp string,Accept string, Host string, User_Agent string, event_type string ) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY");
```
which will return 

```
OK
Time taken: 0.764 seconds
```

exit and go to Presto to query the data

```
exit;

docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
```
I'm interested in what weapons people have bought so far. Let's find out

```
select distinct (event_type) from sword_purchases;
```

which returns

```
   event_type   
----------------
 purchase_sword 
 purchase_blade 
(2 rows)
Query 20201207_065455_00002_xc2rd, FINISHED, 1 node
Splits: 35 total, 19 done (54.29%)
0:03 [340 rows, 46.2KB] [99 rows/s, 13.6KB/s]
```
We have 2 types of weapons purchased so far, a sword and a blade. 

Once we're finished with our table analysis (more weapons to come!) we can close everything down with 

```
docker-compose down
```


## Looks like we need more weapons for a complete game! 

This concludes the report~