# Project 3: Understanding User Behavior

#### By: Shanie Hsieh

As a data scientist at a game development company, we want to look at two main events of our mobile game: `buy a sword` & `join guild`. We also have other events to explore such as `buy armor` and `buy a shield`. In this notebook, we will walk through the data pipeline to catch these events, test data, and some basic analysis. This notebook will be using **Docker**, **Kafka**, **Apache Bench**, **Spark**, **Hadoop**, and **Presto**.

##### **Setup**

##### Docker File

I want to start off with initial files from Course Materials to begin my project and create the files needed in order to go through the pipeline smoothly. We copy `docker-compose.yml` from Week 13.

```
cp ~/w205/course-content/13-Understanding-Data/docker-compose.yml .
```

This file contains the containers needed:
- zookeeper: lets us manage kafka on the server
- kafka: store and read past and real-time streaming data
- cloudera: data cloud to allow us access to other tools
- spark: helps us analyze and query data
- presto: query engine with wider range of sql syntax than spark
- mids: from UC Berkeley to help with command lines

##### game_api File

I also make a copy of `game_api.py` from Week 13, but also add a couple functions for analysis.

```
cp ~/w205/course-content/13-Understanding-Data/game_api.py .
```

This file already contains functions for `log_to_kafka()`, `default_response()`, and `purchase_a_sword`. For analysis of `join guild`, I defined the event `join_guild()`. I also wanted to add more purchasing options so I added `purchase_armor` and `purchase_a_shield`. We want our events to be in a readable form in JSON which is then sent to Kafka to use. The `log_to_kafka()` function does this. All other functions are used for defining each event.

##### **Spin up cluster**

Now we're ready to begin going through the pipeline. Let's spin up our containers from docker-compose.

```
docker-compose up -d
```

##### **Create Topic**

In a new terminal, we want to create the kafka topic `events` to read the events.

```
# had to create topic but now broker creates the topic
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
# depending on version of kafka, you could use the one corresponding to your version. I used the one below.
docker-compose exec kafka kafka-topics --describe --topic events --bootstrap-server kafka:29092
```

##### **Run Flask**

Let's start up our web app server with `game_api.py` on the Flask server.

```
docker-compose exec mids env FLASK_APP=/w205/project-3-shhsieh99/game_api.py flask run --host 0.0.0.0
```

We will be running this in one terminal in the background as we continue on to a new terminal.

##### **Stream and Hive**

A lot of the work from the pipeline falls here. We want to stream the data for our game into Kafka and let it move and process through Spark where it eventually ends up in Hadoop. This is all done in the file `stream_and_hive.py`. A copy of the file is reproduced below with commented cells to explain the process:

```
# import statements
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

# define the schema to streamline
def event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

# filtering events
# This event is for all purchases a user makes, whether it me sword, armor, or shield
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    elif event['event_type'] == 'purchase_armor':
        return True
    elif event['event_type'] == 'purchase_shield':
        return True
    return False

# This event is for a user joining a guild
@udf('boolean')
def is_guild_join(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'join_guild':
        return True
    return False


def main():
    """main
    """
    # starts Spark session, notice we enable Hive here
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .enableHiveSupport()\
        .getOrCreate()

    # reads stream from Kafka and loads in the data
    raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

    # the next 2 chunks help filter the data nd transforms them for better processing where we create tables with all necessary information
    all_purchases = raw_events \
        .filter(is_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')
    guilds_join = raw_events \
        .filter(is_guild_join(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')
    
    # here we create the actual tables to send to HDFS
    spark.sql("drop table if exists all_purchases")
    purchase_sql_string = """
        create external table if not exists all_purchases (
            raw_event string,
            timestamp string,
            Accept string,
            Host string,
            `User-Agent` string,
            event_type string
            )
            stored as parquet
            location '/tmp/all_purchases'
            tblproperties ("parquet.compress"="SNAPPY")
            """
    spark.sql(purchase_sql_string)   
    spark.sql("drop table if exists guilds_join")
    guild_sql_string = """
        create external table if not exists guilds_join (
            raw_event string,
            timestamp string,
            Accept string,
            Host string,
            `User-Agent` string,
            event_type string
            )
            stored as parquet
            location '/tmp/guilds_join'
            tblproperties ("parquet.compress"="SNAPPY")
            """
    spark.sql(guild_sql_string)
    
    # now we send these tables to HDFS as parquets and update ever 10 seconds
    purchase_sink = all_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_purchases") \
        .option("path", "/tmp/all_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()
    guild_sink = guilds_join \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_guilds") \
        .option("path", "/tmp/guilds_join") \
        .trigger(processingTime="10 seconds") \
        .start()
     
    # waits for queries to be termianted
    purchase_sink.awaitTermination()
    guild_sink.awaitTermination()


if __name__ == "__main__":
    main()
```

With this, we open a new terminal and run the code to begin writing in stream and update tables to Hive.

```
docker-compose exec spark spark-submit /w205/project-3-shhsieh99/stream_and_hive.py
```

##### **Set Up to Watch Kafka**

Now we use kafkacat and continuously read the data and events as it's running.

```
docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning
```

##### **Apache Bench to Generate Data**

This is now the testing part my report. We want to generate data to use for analysis with Apache Bench. We want to generate 10 times the events for 2 users every 10 seconds. For better visualization of the code, this script is stored in `feed_data.sh`. To begin generating data, we want to run this script in a new terminal.

```
bash feed_data.sh
```

##### **See in HDFS**

We can make sure things are running correctly in HDFS for `all_purchases` and `join_guild` in a new terminal.

```
docker-compose exec cloudera hadoop fs -ls /tmp/all_purchases
docker-compose exec cloudera hadoop fs -ls /tmp/join_guilds
```

##### **Queries from Presto**

With the data continuously updating in HDFS, we can perform some analysis using presto. We want to first run Presto.

```
docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
```

We first check what tables currently exist in Presto.

```
presto:default> show tables;
     Table     
---------------
 all_purchases 
 guilds_join   
(2 rows)

Query 20211206_112323_00001_sja3y, FINISHED, 1 node
Splits: 2 total, 2 done (100.00%)
0:05 [2 rows, 66B] [0 rows/s, 12B/s]
```

Describe `all_purchases` table

```
presto:default> describe all_purchases;
   Column   |  Type   | Comment 
------------+---------+---------
 raw_event  | varchar |         
 timestamp  | varchar |         
 accept     | varchar |         
 host       | varchar |         
 user-agent | varchar |         
 event_type | varchar |         
(6 rows)

Query 20211206_112350_00003_sja3y, FINISHED, 1 node
Splits: 2 total, 1 done (50.00%)
0:02 [6 rows, 434B] [3 rows/s, 280B/s]
```

See total guilds joined

```
presto:default> select count(*) from guilds_join;
 _col0 
-------
   140 
(1 row)

Query 20211206_112432_00004_sja3y, FINISHED, 1 node
Splits: 11 total, 8 done (72.73%)
0:11 [110 rows, 13.8KB] [9 rows/s, 1.24KB/s]
```

Grouped by Host

```
presto:default> select Host, count(*) from guilds_join group by Host;
       Host        | _col1 
-------------------+-------
 user1.comcast.com |    80 
 user2.att.com     |    80 
(2 rows)

Query 20211206_112459_00005_sja3y, FINISHED, 1 node
Splits: 14 total, 7 done (50.00%)
0:03 [110 rows, 13.8KB] [38 rows/s, 4.89KB/s]
```

Lets take a look at `all_purchases` and check the count for each type of purchase.

```
presto:default> select event_type, count(*) from all_purchases group by event_type;
   event_type    | _col1 
-----------------+-------
 purchase_sword  |   230 
 purchase_armor  |   230 
 purchase_shield |   220 
(3 rows)

Query 20211206_112545_00006_sja3y, FINISHED, 1 node
Splits: 19 total, 13 done (68.42%)
0:01 [480 rows, 28KB] [538 rows/s, 31.4KB/s]
```

What about the types of purchases from user 1?

```
presto:default> select event_type, count(*) from all_purchases where Host = 'user1.comcast.com' group by event_type;
   event_type    | _col1 
-----------------+-------
 purchase_sword  |   140 
 purchase_armor  |   140 
 purchase_shield |   140 
(3 rows)

Query 20211206_112622_00007_sja3y, FINISHED, 1 node
Splits: 23 total, 19 done (82.61%)
0:02 [780 rows, 45.8KB] [359 rows/s, 21.1KB/s]
```

user 2?

```
presto:default> select event_type, count(*) from all_purchases where Host = 'user2.att.com' group by event_type;
   event_type    | _col1 
-----------------+-------
 purchase_sword  |   150 
 purchase_armor  |   150 
 purchase_shield |   150 
(3 rows)

Query 20211206_112646_00008_sja3y, FINISHED, 1 node
Splits: 25 total, 16 done (64.00%)
0:01 [650 rows, 38.1KB] [607 rows/s, 35.7KB/s]
```

Finally, lets check that our data is constantly being generated and updated.

```
presto:default> select count(*) from guilds_join;
 _col0 
-------
   380 
(1 row)

Query 20211206_112746_00011_sja3y, FINISHED, 1 node
Splits: 29 total, 20 done (68.97%)
0:01 [290 rows, 36.7KB] [303 rows/s, 38.5KB/s]
```

We see that everything is working as it should as data is constantly being generated and updated through the pipeline for analysis. We can also see that the purchases and going through evenly and we can do analyses on purchases and guild joining. We can stop all running terminals and stop our docker-compose.

```
docker-compose down
```