# Jeffry Zheng Project 3: Understanding User Behavior

### Tasks

- Instrument an API server to log events to Kafka

- Assemble a data pipeline to catch these events: use Spark streaming to filter select event types from Kafka, land them into HDFS/parquet to make them available for analysis using Presto

- Use Apache Bench to generate test data for the pipeline

- Produce an analytics report which provides a description of the pipeline and some analysis of the events

## Spinning up the pipeline

Bringing up and editing the YAML file:
    
    cp ~/w205/course-content/13-Understanding-Data/docker-compose.yml ~/w205/project-3-jeffry-zheng

YAML file was edited to allow port access for spark instead of cloudera.
YAML file services include zookeeper, kafka, cloudera, spark, presto, and mids.

Spinning up the cluster:
    
    docker-compose up -d
    docker-compose ps
    docker ps -a

Creating and checking topic "events":

    docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
    docker-compose exec kafka kafka-topics --describe --topic events --zookeeper zookeeper:32181
Setting up to watch kafka:
    
    docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning

Bringing up and editing the flask app:

    cp ~/w205/course-content/11-Storing-Data-III/game_api.py ~/w205/project-3-jeffry-zheng

Web-app was edited to include mobile game events to be tracked: "buy a sword" & "join guild". Events include metadata characteristics (i.e., sword type, guild name, etc).


Flask Take the web-app and run it:

    docker-compose exec mids env FLASK_APP=/w205/project-3-jeffry-zheng/game_api.py flask run --host 0.0.0.0


Generate initial test data using Apache Bench (non-streaming):

    docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/
    docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword
    docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword
    docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_guild

    docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/
    docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword
    docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword
    docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild

Linking the Spark container to the /w205 mount point:

    docker-compose exec spark bash
    ln -s /w205 w205
    exit

Starting a Jupyter Notebook for a pyspark kernel:

    docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark

## Filtering and extracting events

In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

In [2]:
spark

In [3]:
@udf('boolean')
def is_purchase(event_as_json):
    """udf for filtering events"""
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [4]:
@udf('boolean')
def is_buy(event_as_json):
    """udf for filtering events"""
    event = json.loads(event_as_json)
    if event['event_type'] == 'buy_a_sword':
        return True
    return False

In [5]:
@udf('boolean')
def is_join(event_as_json):
    """udf for filtering events"""
    event = json.loads(event_as_json)
    if event['event_type'] == 'join_guild':
        return True
    return False

In [6]:
raw_events = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

In [9]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2020-12-09 09:58:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2020-12-09 09:58:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2020-12-09 09:58:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2020-12-09 09:58:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2020-12-09 09:58:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     5|2020-12-09 09:58:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     6|2020-12-09 09:58:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     7|2020-12-09 09:58:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0

In [10]:
# filtering for purchase events
purchase_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),
            raw_events.timestamp.cast('string')) \
    .filter(is_purchase('raw'))

In [11]:
# extracting purchase events
extracted_purchase_events = purchase_events \
    .rdd \
    .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
    .toDF()
extracted_purchase_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_quality: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [12]:
extracted_purchase_events.show()

+------+-----------------+---------------+--------------+-------------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_quality|           timestamp|
+------+-----------------+---------------+--------------+-------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new

In [13]:
# filtering for buy sword events
buy_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),
            raw_events.timestamp.cast('string')) \
    .filter(is_buy('raw'))

In [14]:
# extracting buy sword events
extracted_buy_events = buy_events \
    .rdd \
    .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
    .toDF()
extracted_buy_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [15]:
extracted_buy_events.show()

+------+-----------------+---------------+-----------+----------+--------------------+
|Accept|             Host|     User-Agent| event_type|sword_type|           timestamp|
+------+-----------------+---------------+-----------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|buy_a_sword|     rusty|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|buy_a_sword|     rusty|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|buy_a_sword|     rusty|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|buy_a_sword|     rusty|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|buy_a_sword|     rusty|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|buy_a_sword|     rusty|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|buy_a_sword|     rusty|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|buy_a_sword|     rusty|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|b

In [16]:
# filtering for join guild events
join_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),
            raw_events.timestamp.cast('string')) \
    .filter(is_join('raw'))

In [17]:
# extracting join guild events
extracted_join_events = join_events \
    .rdd \
    .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
    .toDF()
extracted_join_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- guild_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [18]:
extracted_join_events.show()

+------+-----------------+---------------+----------+----------+--------------------+
|Accept|             Host|     User-Agent|event_type|guild_type|           timestamp|
+------+-----------------+---------------+----------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|     Magic|2020-12-09 09:59:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|     Magic|2020-12-09 09:59:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|     Magic|2020-12-09 09:59:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|     Magic|2020-12-09 09:59:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|     Magic|2020-12-09 09:59:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|     Magic|2020-12-09 09:59:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|     Magic|2020-12-09 09:59:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|     Magic|2020-12-09 09:59:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild| 

## Landing events in HDFS/parquet

In [19]:
# writing purchase events to HDFS
extracted_purchase_events \
    .write \
    .mode('overwrite') \
    .parquet('/tmp/purchases')

In [20]:
# writing buy sword events to HDFS
extracted_buy_events \
    .write \
    .mode('overwrite') \
    .parquet('/tmp/buy_a_sword')

In [21]:
# writing join guild events to HDFS
extracted_join_events \
    .write \
    .mode('overwrite') \
    .parquet('/tmp/join_guild')

## Checking results in hadoop

All directories:

    docker-compose exec cloudera hadoop fs -ls /tmp/
Individual directories:

    docker-compose exec cloudera hadoop fs -ls /tmp/purchases
    docker-compose exec cloudera hadoop fs -ls /tmp/buy_a_sword
    docker-compose exec cloudera hadoop fs -ls /tmp/join_guild

## Analytics using Spark SQL on the Spark Dataframe in Memory

In [22]:
purchases = spark.read.parquet('/tmp/purchases')

In [23]:
purchases.show()

+------+-----------------+---------------+--------------+-------------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_quality|           timestamp|
+------+-----------------+---------------+--------------+-------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new

In [24]:
purchases.registerTempTable('purchases')

Analytics question: What purchase sword events were conducted by user1.comcast.com?

In [25]:
purchases_by_host1 = spark.sql("select * from purchases where Host = 'user1.comcast.com'")

In [26]:
purchases_by_host1.show()

+------+-----------------+---------------+--------------+-------------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_quality|           timestamp|
+------+-----------------+---------------+--------------+-------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new|2020-12-09 09:58:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|          new

In [27]:
df1 = purchases_by_host1.toPandas()

In [28]:
df1.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_quality,timestamp
count,10,10,10,10,10,10
unique,1,1,1,1,1,10
top,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,new,2020-12-09 09:58:49.478
freq,10,10,10,10,10,1


Analytics question: How many times was a sword bought?

In [29]:
buys = spark.read.parquet('/tmp/buy_a_sword')

In [30]:
buys.registerTempTable('buys')

In [31]:
buys_count = spark.sql("select count(*) as swords_bought from buys")

In [32]:
df2 = buys_count.toPandas()
df2

Unnamed: 0,swords_bought
0,20


Analytics question: Which user joined a guild the latest (most recently) and when?

In [33]:
joins = spark.read.parquet('/tmp/join_guild')

In [34]:
joins.registerTempTable('joins')

In [35]:
latest_join = spark.sql("select Host as newest_member, timestamp from joins where timestamp=(select max(timestamp) from joins)")

In [36]:
df3 = latest_join.toPandas()
df3

Unnamed: 0,newest_member,timestamp
0,user2.att.com,2020-12-09 09:59:32.119


## Add data streaming into the pipeline

Use Apache Bench to generate additional test data:

    while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword; sleep 5; done
    while true; do docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild; sleep 5; done


In [37]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

In [38]:
def buy_a_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("sword_type", StringType(), True),
    ])

In [39]:
def join_guild_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("guild_type", StringType(), True),
    ])

In [40]:
raw_events_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .load()

In [41]:
# filtering for buy a sword event
buy_swords = raw_events_stream \
    .filter(is_buy(raw_events_stream.value.cast('string'))) \
    .select(raw_events_stream.value.cast('string').alias('raw_event'),
            raw_events_stream.timestamp.cast('string'),
            from_json(raw_events_stream.value.cast('string'),
                      buy_a_sword_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')

In [50]:
# writing event stream to HDFS
sink1 = buy_swords \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "/tmp/checkpoints_for_buy_a_sword") \
    .option("path", "/tmp/buy_a_sword") \
    .trigger(processingTime="10 seconds") \
    .start()

In [43]:
# filtering for join guild event
join_guilds = raw_events_stream \
    .filter(is_join(raw_events_stream.value.cast('string'))) \
    .select(raw_events_stream.value.cast('string').alias('raw_event'),
            raw_events_stream.timestamp.cast('string'),
            from_json(raw_events_stream.value.cast('string'),
                      join_guild_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')

In [51]:
# writing event stream to HDFS
sink2 = join_guilds \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "/tmp/checkpoints_for_join_guild") \
    .option("path", "/tmp/join_guild") \
    .trigger(processingTime="10 seconds") \
    .start()

In [52]:
# stop the stream
sink1.stop()
sink2.stop()

## Using Hive to create an external table for schema on read

    docker-compose exec cloudera hive
    
        create external table if not exists default.buy_a_sword (Accept string, Host string, User_Agent string, event_type string, sword_type string, timestamp string) stored as parquet location '/tmp/buy_a_sword'  tblproperties ("parquet.compress"="SNAPPY");
    
        create external table if not exists default.join_guild (Accept string, Host string, User_Agent string, event_type string, guild_type string, timestamp string) stored as parquet location '/tmp/join_guild'  tblproperties ("parquet.compress"="SNAPPY");

## Using Presto to query against the external table

    docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
    
        show tables;
```
                        Table    
                    -------------
                     buy_a_sword 
                     join_guild  
                    (2 rows)
```
        describe buy_a_sword;
```
                       Column   |  Type   | Comment 
                    ------------+---------+---------
                     accept     | varchar |         
                     host       | varchar |         
                     user_agent | varchar |         
                     event_type | varchar |         
                     sword_type | varchar |         
                     timestamp  | varchar |         
                    (6 rows)
```
        describe join_guild;
```
                       Column   |  Type   | Comment 
                    ------------+---------+---------
                     accept     | varchar |         
                     host       | varchar |         
                     user_agent | varchar |         
                     event_type | varchar |         
                     guild_type | varchar |         
                     timestamp  | varchar |         
                    (6 rows)
```

Analytics question: How many total events occured for buy_a_sword and join_guild?

    select count(*) from buy_a_sword;
```
                     num_buys 
                    ----------
                         1140 
                    (1 row)
```
    select count(*) from join_guild;
```
                    num_joins 
                    -----------
                          1060 
                    (1 row)
```

## Shutting down the cluster

    docker-compose down
    docker-compose ps
    docker ps -a
