# Tracking User Behavior in our Game

#copy the YAML file and game_api.py file into our working directory

cp ~/w205/course-content/13-Understanding-Data/docker-compose.yml ~/w205/project-3-jchung738/
cp ~/w205/course-content/11-Storing-Data-III/game_api.py .

#start the cluster, create a topic events and shut down the cluster

docker-compose up -d
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

#start up the Flask server

docker-compose exec mids env FLASK_APP=/w205/project-3-jchung738/game_api.py flask run --host 0.0.0.0

#use Apache Bench commands to send events to our server from 2 users

docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_a_guild
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_a_guild

#check with Kafkacat to confirm the events are received

docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning -e

#start pyspark kernel and link to Jupyter notebook

docker-compose exec spark bash
ln -s /w205 w205
exit
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark

#external IP

35.247.16.198




## Read in events using Spark

In [8]:
#import necessary packages
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf


In [12]:


@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

In [13]:
# Load raw events with spark
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

        

In [14]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     5|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     6|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     7|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0

In [15]:
 #transform the data into strings
    
munged_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),
            raw_events.timestamp.cast('string')) \
    .withColumn('munged', munge_event('raw'))

In [16]:
munged_events.show()

+--------------------+--------------------+--------------------+
|                 raw|           timestamp|              munged|
+--------------------+--------------------+--------------------+
|{"Host": "user1.c...|2020-12-07 17:33:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-07 17:33:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-07 17:33:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-07 17:33:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-07 17:33:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-07 17:33:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-07 17:33:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-07 17:33:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-07 17:33:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-07 17:33:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-07 17:33:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-07 17:33:...|{"Host": "moe", "...|
|{"Host": "user1.c...|202

In [17]:
#Extract events with lambda transform
extracted_events = munged_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
        .toDF()

In [8]:
extracted_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Cache-Control: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [18]:
#filter for only purchase_sword events

sword_purchases = extracted_events \
        .filter(extracted_events.event_type == 'purchase_sword')

In [19]:
sword_purchases.printSchema()


root
 |-- Accept: string (nullable = true)
 |-- Cache-Control: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [20]:
#boolean function to select for purchase_sword events
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False


In [21]:
#boolean function to select for join_a_guild events

@udf('boolean')
def is_guild(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'join_a_guild':
        return True
    return False


In [24]:
#read the events using spark
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [25]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     5|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     6|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     7|2020-12-07 17:33:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0

In [26]:
#filter using spark with the boolean function is_purchase
purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))
        

In [29]:
#filter using spark with the boolean function is_guild

guild_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_guild('raw'))
        

In [30]:
#Use lambda transforms to turn the data into a DataFrame 
extracted_guild_events = guild_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

ValueError: RDD is empty

In [31]:
 #Use lambda transforms to turn the data into a DataFrame 

extracted_purchase_events = purchase_events \
    .rdd \
    .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
    .toDF()

In [32]:
    extracted_purchase_events.printSchema()

    

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [37]:
extracted_guild_events.show()

+------+-----------------+---------------+------------+----------+--------------------+
|Accept|             Host|     User-Agent|  event_type|guild_type|           timestamp|
+------+-----------------+---------------+------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|Apache

In [38]:
    extracted_purchase_events.show()


+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|us

In [39]:
#save the DataFrames into parquet
extracted_guild_events\
        .write \
        .mode('overwrite') \
        .parquet('/tmp/guilds')

In [40]:
#save the DataFrames into parquet

extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

# Read Parquet files from HDFS to be queried against with SQL


In [41]:
#read parquet from tmp directory
purchases = spark.read.parquet('/tmp/purchases')


In [42]:
#read parquet from tmp directory

guilds = spark.read.parquet('/tmp/guilds')

In [43]:
purchases.show()


+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|us

In [45]:
guilds.show()

+------+-----------------+---------------+------------+----------+--------------------+
|Accept|             Host|     User-Agent|  event_type|guild_type|           timestamp|
+------+-----------------+---------------+------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_a_guild|roundtable|2020-12-07 16:40:...|
|   */*|user1.comcast.com|Apache

In [46]:
#register the parquet as a temp table for SQL queroes
guilds.registerTempTable('guilds')

In [47]:
#register the parquet as a temp table for SQL queroes

purchases.registerTempTable('purchases')


In [48]:
#SQL query

purchases_by_example2 = spark.sql("select * from purchases where Host = 'user1.comcast.com'")


In [49]:
purchases_by_example2.show()


+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword| longsword|2020-12-07 16:40:...|
|   */*|us

In [16]:
#save query as Pandas DataFrame
df = purchases_by_example2.toPandas()


In [17]:
df.describe()


Unnamed: 0,Accept,Host,User-Agent,event_type,sword_type,timestamp
count,10,10,10,10,10,10
unique,1,1,1,1,1,10
top,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,longsword,2020-12-07 00:30:38.117
freq,10,10,10,10,10,1


# Analysis of our data

## What size is the Roundtable guild?


In [57]:
#SQL query

roundtable = spark.sql("select count(*) from guilds where guild_type ='roundtable'")

In [58]:
roundtable.show()

+--------+
|count(1)|
+--------+
|      20|
+--------+



We see that there are 20 users that joined the Roundtable guild.

In [59]:
roundtable_df = roundtable.toPandas()

In [62]:
roundtable_df

Unnamed: 0,count(1)
0,20


## How many times has a longsword been purchased?


In [68]:
#SQL query

longsword = spark.sql("select count(Host) from purchases where sword_type = 'longsword'")

In [70]:
longsword.show()

+-----------+
|count(Host)|
+-----------+
|         20|
+-----------+



The longsword has been purchased 20 times.

# Read in a Stream of Data


#infinite loop for Apache Bench to send tests to our server
 
while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword; sleep 5; done

#startup hive to create schema on read

docker-compose exec cloudera hive

#hive command to create an external table for schema on read

create external table if not exists default.sword_purchases (Accept string, Host string, User_Agent string, event_type string, timestamp string) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY”);
exit;

#Start up Presto

docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

#Query against the external table using Presto commands

select * from sword_purchases;
select count(*) from sword_purchases;
exit;


In [1]:
import json
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType


In [2]:
#enforce schema for spark

def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])



In [3]:
#boolean function to select for purchase_sword events

@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [4]:
#read in event using spark
raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()


In [5]:
#enforce schema with our  purchase_sword_event_schema function
sword_purchases = raw_events \
        .filter(is_sword_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

In [6]:
#Write hdfs files in streaming mode
    sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

In [33]:
#Stop writing the hdfs files
sink.stop()

#shutdown the cluster

docker-compose down