# Project 3: Understanding User Behavior
**Project Team: Jude Wentian Zhu, Rohit Barkshi, Rathin Bector**

## Description of Project Files


<span style="color:red">**Need to fill in**</span>

## Project Goal

<span style="color:red">**Need to fill in**</span>

## Summary of Data Pipeline

<span style="color:red">**Need to fill in**</span>

### Explanation of Game

## Explanation of Game (game_api.py)

Our game is a simple multiplayer fighting game. Each player initializes with a set amount of health, money, and a basic weapon (stick). They can purchase another weapon with their money or purchase a shield to limit their damage intake. To get more money, they can "dig for gold". A player attacks other players with their weapon with the hope of killing the other players (bringing their health to 0). If they are successful in killing another player, they inherit all of the other player's money.

### Explanation of function calls:
- /initialize
    - Person calls this to initialize a new player in the game with a given username. The player is initialized with 100 money, 20 health, a stick (weapon) and no shield.
    - The event is logged to Kafka.
- /purchase_weapon
    - Person calls this to purchase a weapon in the game with their given username and desired weapon. There are checks for validity of username and weapon, aliveness of player, and sufficiency of money. If purchase is successful, the appropriate money is subtracted from the wallet, and the new weapon is stored for the player.
    - Types of Weapon: Price, Damage, Success Rate of Weapon
        - Stick: 0, 0, 1
        - Knife: 5, 2, 0.7
        - Sword: 10, 3, 0.65
        - Grenade: 15, 5, 0.4
        - Gun: 50, 9, 0.75
        - Bazooka: 100, 20, 0.5
        - Nuke: 500, 100, 1
    - A successful purchase is logged to Kafka.
- /purchase_shield
    - Person calls this to purchase a shield in the game. There are checks for validity of username, aliveness of player, and sufficiency of money. If purchase is successful, the appropriate money is subtracted from the wallet, and the player is equipped with a shield.
     - Shield costs 20 and limits the damage of attacks by 5.
     - A successful purchase is logged to Kafka.
- /dig_for_gold
    - Person calls this to "dig for gold" or get more money. There are checks for validity of username, aliveness of player, and sufficiency of money to purchase a shovel to "dig for gold". If purchase is successful, the player digs for gold. A random distribution determines the amount of gold the player will receive, and this amount is added to the player's wallet. Digging for gold costs 5.
     - A successful "dig for gold" event is logged to Kafka.
- /attack
    - Person calls this to attack another player. There are checks for validity of both usernames and aliveness of players. The attack has a chance of success based on the player's weapon. If the attack is successful, the enemy player's health is reduced by the player's weapon's damage rate. If the enemy has a shield, the damage rate is reduced by 5 health. The enemy player is killed if their health reaches 0. If the enemy is killed, the player inherits all of the enemy's money in their wallet.
    - A successful attack is logged to Kafka.
    - A failed attack is logged to Kafka.

### Bonus (Redis)
We use redis to track the player state and store their data in the game. We use redis as a distributed, in memory key-value storage engine and populate it with different types of keys and values.

## Data Pipeline Steps

### Step 1: Spin up Docker-Compose and Link Pypark to Jupyter Notebook

1. Spin up the cluster

In [1]:
!docker-compose up -d

Creating network "project_3_default" with the default driver
Creating project_3_presto_1 ... 
Creating project_3_redis_1  ... 
Creating project_3_cloudera_1 ... 
Creating project_3_zookeeper_1 ... 
Creating project_3_mids_1      ... 
[3Bting project_3_cloudera_1  ... [32mdone[0m[3A[2KCreating project_3_spark_1     ... 
[3BCreating project_3_kafka_1     ... mdone[0m[3A[2K
[1Bting project_3_kafka_1     ... [32mdone[0m[2A[2K[1A[2K

2. Exec a bash shell in the spark container on teminal.
```console
docker-compose exec spark bash
```


3. Create a symbolic link from the spark directory to /w205 :
```console
ln -s /w205 w205
```


4. Exit the container
```console
exit
```


5. Check out Hadoop

In [2]:
!docker-compose exec cloudera hadoop fs -ls /tmp/

Found 3 items
drwxrwxrwt   - mapred mapred              0 2016-04-06 02:26 /tmp/hadoop-yarn
drwx-wx-wx   - hive   supergroup          0 2021-08-05 04:36 /tmp/hive
drwxrwxrwt   - mapred hadoop              0 2016-04-06 02:28 /tmp/logs


### Step 2: Launch Kafka and Flask

1 Create a kafka topic called events

In [3]:
!docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

Created topic events.


2 Install dependencies for flask app on mids container

In [4]:
!docker-compose exec mids pip install redis
!docker-compose exec mids pip install numpy==1.14.6

[33mThe directory '/w205/.cache/pip/http' or its parent directory is not owned by the current user and the cache has been disabled. Please check the permissions and owner of that directory. If executing pip with sudo, you may want sudo's -H flag.[0m
[33mThe directory '/w205/.cache/pip' or its parent directory is not owned by the current user and caching wheels has been disabled. check the permissions and owner of that directory. If executing pip with sudo, you may want sudo's -H flag.[0m
Collecting redis
  Downloading https://files.pythonhosted.org/packages/a7/7c/24fb0511df653cf1a5d938d8f5d19802a88cef255706fdda242ff97e91b7/redis-3.5.3-py2.py3-none-any.whl (72kB)
[K    100% |################################| 81kB 6.1MB/s eta 0:00:01
[?25hInstalling collected packages: redis
Successfully installed redis-3.5.3
[33mYou are using pip version 8.1.1, however version 21.2.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
[33mThe directory '/

bring up game_api

In [None]:

!docker-compose exec mids env FLASK_APP=/w205/project_3/game_api.py flask run --host 0.0.0.0

start a Jupyter notebook for a pyspark kernal

In [None]:
!docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark

### Step 3: Data Streaming Setup


In [3]:
# Import Packages
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType


** define the Schema **

In [22]:
#defin schema
def player_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- username: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("username", StringType(), True),
    ])

def purchase_weapon_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- username: string (nullable = true)
    |-- weapon: string (nullable = true)
    |-- wallet_before:integer  (nullable = true)
    |-- wallet_after:integer  (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("username", StringType(), True),
        StructField("weapon", StringType(), True),
        StructField("wallet_before", IntegerType(), True),
        StructField("wallet_after", IntegerType(), True),
    ])

def purchase_shield_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- username: string (nullable = true)
    |-- wallet_before:integer  (nullable = true)
    |-- wallet_after:integer  (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("username", StringType(), True),
        StructField("wallet_before", IntegerType(), True),
        StructField("wallet_after", IntegerType(), True),
    ])

def dig_for_gold_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- username: string (nullable = true)
    |-- gold_found: integer (nullable = true)
    |-- wallet_before:integer  (nullable = true)
    |-- wallet_after:integer  (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("username", StringType(), True),
        StructField("gold_found", IntegerType(), True),
        StructField("wallet_before", IntegerType(), True),
        StructField("wallet_after", IntegerType(), True),
    ])

def successful_attack_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- attacker: string (nullable = true)
    |-- defender: string (nullable = true)
    |-- weapon_used: string (nullable = true)
    |-- defender_has_shield: boolean (nullable = true)
    |-- defender_health_before:integer  (nullable = true)
    |-- defender_health_after:integer  (nullable = true)
    |-- defender_killed: boolean (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("attacker", StringType(), True),
        StructField("defender", StringType(), True),
        StructField("weapon_used", StringType(), True),
        StructField("defender_has_shield", BooleanType(), True),
        StructField("defender_health_before", IntegerType(), True),
        StructField("defender_health_after", IntegerType(), True),
        StructField("defender_killed", BooleanType(), True),
    ])

def failed_attack_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- attacker: string (nullable = true)
    |-- defender: string (nullable = true)
    |-- weapon_used: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("attacker", StringType(), True),
        StructField("defender", StringType(), True),
        StructField("weapon_used", StringType(), True),
    ])

In [5]:
#define functions
@udf('boolean')
def is_player(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'initialize_player':
        return True
    return False


@udf('boolean')
def is_purchase_weapon(event_as_json):
    event = json.loads(event_as_json)
    # m = re.match('purchase',event['event_type'])
    if event['event_type'] == 'purchase_weapon':
        return True
    return False

@udf('boolean')
def is_purchase_shield(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_shield':
        return True
    return False


@udf('boolean')
def is_dig_for_gold(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'dig_for_gold':
        return True
    return False


@udf('boolean')
def is_purchase_shield(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_shield':
        return True
    return False

@udf('boolean')
def is_successful_attack(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'successful_attack':
        return True
    return False

@udf('boolean')
def is_failed_attack(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'failed_attack':
        return True
    return False


Start reading events

### Stream Events

In [44]:
spark = SparkSession \
    .builder \
    .appName("ExtractEventsJob") \
    .getOrCreate()
    
raw_events = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .load()

** Start Writing Stream **

In [45]:
#extract initial_player events
extracted_initialize_player = raw_events \
    .filter(is_player(raw_events.value.cast('string'))) \
    .select(raw_events.value.cast('string').alias('raw_event'),
            raw_events.timestamp.cast('string'),
            from_json(raw_events.value.cast('string'),
                      player_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')

sink_player = extracted_initialize_player \
    .writeStream \
    .format("parquet") \
    .option("path","/tmp/player") \
    .option("checkpointLocation", "/tmp/checkpoint_player") \
    .start()
    
    
#extract purachase_weapon events
extracted_purchase_weapon = raw_events \
    .filter(is_purchase_weapon(raw_events.value.cast('string'))) \
    .select(raw_events.value.cast('string').alias('raw_event'),
            raw_events.timestamp.cast('string'),
            from_json(raw_events.value.cast('string'),
                      purchase_weapon_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')
    
sink_purchase_weapon = extracted_purchase_weapon \
    .writeStream \
    .format("parquet") \
    .option("path","/tmp/purchase_weapon") \
    .option("checkpointLocation", "/tmp/checkpoint_purchase_weapon") \
    .start()

    
#extract purachase_shield events
extracted_purchase_shield = raw_events \
    .filter(is_purchase_shield(raw_events.value.cast('string'))) \
    .select(raw_events.value.cast('string').alias('raw_event'),
            raw_events.timestamp.cast('string'),
            from_json(raw_events.value.cast('string'),
                      purchase_shield_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')
            
sink_purchase_shield = extracted_purchase_shield \
    .writeStream \
    .format("parquet") \
    .option("path","/tmp/purchase_shield") \
    .option("checkpointLocation", "/tmp/checkpoint_purchase_shield") \
    .start()


#extract dig for gold events
extracted_dig_for_gold = raw_events \
    .filter(is_dig_for_gold(raw_events.value.cast('string'))) \
    .select(raw_events.value.cast('string').alias('raw_event'),
            raw_events.timestamp.cast('string'),
            from_json(raw_events.value.cast('string'),
                  dig_for_gold_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')
    
sink_dig_for_gold = extracted_dig_for_gold \
    .writeStream \
    .format("parquet") \
    .option("path","/tmp/dig_for_gold") \
    .option("checkpointLocation", "/tmp/checkpoint_dig_for_gold") \
    .start()    

#extract successful_attack events
extracted_successful_attack = raw_events \
    .filter(is_successful_attack(raw_events.value.cast('string'))) \
    .select(raw_events.value.cast('string').alias('raw_event'),
            raw_events.timestamp.cast('string'),
            from_json(raw_events.value.cast('string'),
                      successful_attack_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')
    
sink_successful_attack= extracted_successful_attack \
    .writeStream \
    .format("parquet") \
    .option("path","/tmp/successful_attack") \
    .option("checkpointLocation", "/tmp/checkpoint_successful_attack") \
    .start()
            
            
#extract failed_attack events
extracted_failed_attack = raw_events \
    .filter(is_failed_attack(raw_events.value.cast('string'))) \
    .select(raw_events.value.cast('string').alias('raw_event'),
            raw_events.timestamp.cast('string'),
            from_json(raw_events.value.cast('string'),
                      failed_attack_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')
    
sink_failed_attack= extracted_failed_attack \
    .writeStream \
    .format("parquet") \
    .option("path","/tmp/failed_attack") \
    .option("checkpointLocation", "/tmp/checkpoint_failed_attack") \
    .start()

** Start generating events using bash script, the loop runs 1000 times"

In [3]:
!bash data_generator.sh

data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 27: docker-compose: command not found
data_generator.sh: line 28: docker-compose: command not found
data_generator.sh: line 40: docker-compose: command not found
data_generator.sh: line 46: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 37: docker-compose: command not found
data_generator.sh: line 46: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 37: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 37: docker-compose: command not found
data_generator.sh: line 40: docker-compose: command not found
data_generator.sh: line 40: docker-compose: command not found
data_gen

data_generator.sh: line 33: docker-compose: command not found
data_generator.sh: line 40: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 40: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 37: docker-compose: command not found
data_generator.sh: line 33: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 37: docker-compose: command not found
data_generator.sh: line 46: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 46: docker-compose: command not found
data_generator.sh: line 33: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 40: docker-compose: command not found
data_gen

data_generator.sh: line 37: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 40: docker-compose: command not found
data_generator.sh: line 37: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 43: docker-compose: command not found
data_generator.sh: line 37: docker-compose: command not found
data_generator.sh: line 33: docker-compose: command not found
data_generator.sh: line 37: docker-compose: command not found
data_generator.sh: line 46: docker-compose: command not found
data_gen

** stop writing stream **

In [46]:
sink_player.stop()
sink_purchase_weapon.stop()
sink_purchase_shield.stop()
sink_dig_for_gold.stop()
sink_successful_attack.stop()
sink_failed_attack.stop()

** Check Kafka **

In [None]:
!

## Transform events, write to hive tables

In [47]:
#player_events
transformed_player_events = spark.read.parquet('/tmp/player')
transformed_player_events.registerTempTable('player')

player_query = "create external table player_events stored as parquet location '/tmp/player_events' as select * from player"
spark.sql(player_query)

AnalysisException: '`default`.`player_events` already exists.;'

In [12]:
#purchase_weapon_events
transformed_purchase_weapon_events = spark.read.parquet('/tmp/purchase_weapon')
transformed_purchase_weapon_events.registerTempTable('purchase_weapon')

purchase_weapon_query = "create external table purchase_weapon_events stored as parquet location '/tmp/purchase_weapon_events' as select * from purchase_weapon"
spark.sql(purchase_weapon_query)

DataFrame[]

In [48]:
#purchase_shield_events
transformed_purchase_shield_events = spark.read.parquet('/tmp/purchase_shield')
transformed_purchase_shield_events.registerTempTable('purchase_shield')

purchase_shield_query = "create external table purchase_shield_events stored as parquet location '/tmp/purchase_shield_events' as select * from purchase_shield"
spark.sql(purchase_shield_query)

DataFrame[]

In [49]:
#dig_for_gold_events
transformed_dig_for_gold_events = spark.read.parquet('/tmp/dig_for_gold')
transformed_dig_for_gold_events.registerTempTable('dig_for_gold')

dig_for_gold_query = "create external table dig_for_gold_events stored as parquet location '/tmp/dig_for_gold_events' as select * from dig_for_gold"
spark.sql(dig_for_gold_query)

DataFrame[]

In [50]:
#successful_attack_events
transformed_successful_attack_events = spark.read.parquet('/tmp/successful_attack')
transformed_successful_attack_events.registerTempTable('successful_attack')

successful_attack_query = "create external table successful_attack_events stored as parquet location '/tmp/successful_attack_events' as select * from successful_attack"
spark.sql(successful_attack_query)

DataFrame[]

In [51]:
#failed_attack_events
transformed_failed_attack_events = spark.read.parquet('/tmp/failed_attack')
transformed_failed_attack_events.registerTempTable('failed_attack')

failed_attack_query = "create external table failed_attack_events stored as parquet location '/tmp/failed_attack_events' as select * from failed_attack"
spark.sql(failed_attack_query)

DataFrame[]

** using pandas to check that transformed events **

In [29]:
transformed_successful_attack_pd = transformed_successful_attack_events.toPandas()
transformed_successful_attack_pd.head

<bound method NDFrame.head of Empty DataFrame
Columns: [raw_event, timestamp, Accept, Host, User-Agent, event_type, attacker, defender, weapon_used, defender_has_shield, defender_health_before, defender_health_after, defender_killed]
Index: []>

** another check **

In [30]:
transformed__pd = transformed_failed_attack_events.toPandas()
transformed_failed_attack_pd.head

<bound method NDFrame.head of Empty DataFrame
Columns: [raw_event, timestamp, Accept, Host, User-Agent, event_type, attacker, defender, weapon_used]
Index: []>

### Analysis using Presto

 1 run Presto

In [None]:
!docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

2 check and describe tables

In [None]:
#check the tables in presto
!show tables;

In [None]:
#describe the tables
!describe player_events;

3 start business analysis in Presto

** Q1: How many distinct players have been created **

In [None]:
!SELECT COUNT(DISTINCT(username)) FROM player_events;


18 distinct players have been created (actually there won't be any duplicates on username because our game does not allow repeated username to be created using Redis state tracking)

** Q2 How often is each weapon purchased?**


In [None]:
!SELECT weapon, COUNT(weapon) as weapon_purchase_count FROM purchase_weapon_events GROUP BY weapon OR
DER BY weapon_purchase_count DESC;

** Q3 What is the average wallet size after the purchase of a shield? **

In [None]:
???!SELECT AVG(wallet_after) FROM purchase_shield_events;

** Q4 How many players purchased a shield more than once?**

In [None]:
!SELECT COUNT(shield_purchases) FROM (SELECT username, COUNT(username) AS shield_purchases FROM purchase_shield_events GROUP BY(username));

** Q5 What is the max, min, and average of gold found by a user?**

In [None]:
!SELECT MAX(gold_found) AS max, MIN(gold_found) AS min, AVG(gold_found) AS avg FROM dig_for_gold_events;

** Q6 What weapon fails the most often? **

In [None]:
!SELECT weapon_used, COUNT(weapon_used) as count_weapon_used FROM failed_attack_events GROUP BY weapon_used;

** Q7 Which weapon kills the most often? **

In [None]:
!SELECT weapon_used, COUNT(weapon_used) AS count_weapon_used FROM successful_attack_events where defender_killed = true GROUP BY weapon_used;

** Q8 How many players have been killed?**

In [None]:
!SELECT count(defender_killed) as count_defender_killed FROM successful_attack_events WHERE defender
_killed = true;


** Q9 How many defenders has a shield?**

In [None]:
!SELECT count(defender) AS count_has_shield FROM successful_attack_events WHERE defender_has_shield 
= true;


### bring down the docker

In [None]:
!docker-compose down