# Project 3: Understanding User Behavior
### UC Berkeley W205 MIDS Spring 2021
### Kieran Berton

In this notebook I'll showcase how we can pull in event data with Kafka from the Flask API webserver running our mobile gaming app, use Spark to filter these events and land them in a Hadoop Distributed File System (HDFS), and how we can use Hive and Presto to then query the returned data and perform analysis on it. 

This notebook will primarily focus on the filtering done with Spark, but I will include notes referencing how data is generated and how it moves through our data pipeline into HDFS and into a queryable table, then I will show some simple examples of analysis we can do with the data. 

The Flask server can be started using the "game_api.py" file included in this repository with the following command. 

docker-compose exec mids env FLASK_APP=/w205/project-3-kieran-berton/game_api.py flask run --host 0.0.0.0

Once it is up and running, we can create a Kafka topic where we will be writing events from the web server so they can be stored in HDFS using this command

docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

For this project, we used several methods of creating sample hits to our web server, for example we began by using Apache bench to execute batch server calls using the commands below.

docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/

docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword

docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword

docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_guild

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild

As you can see, these commands try to contact four different pages on our web server: the home page, and the purchase_a_sword, buy_a_sword, and join_guild sub pages. We contact the server as if from two 'users' identified by the Host names 'user1.comcast.com' and 'user2.att.com'. Later we used Apache bench to run a continuous stream of server calls using the following command which makes 10 calls to the server, specifically the 'purchase_a_sword' sub page, every 5 seconds. 

while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword; sleep 5; done

Once the data is being written to Kafka, we can use Spark to extract the data, filter it, and then write it to HDFS. 

In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

We first read the json objects from the Kafka stream into a spark table called raw_events.

In [2]:
#This function will extract the columns of interest from the json object stored in Kafka

@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

In [3]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [4]:
munged_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .withColumn('munged', munge_event('raw'))

In [5]:
munged_events.show()

+--------------------+--------------------+--------------------+
|                 raw|           timestamp|              munged|
+--------------------+--------------------+--------------------+
|{"Host": "localho...|2021-04-16 20:16:...|{"Host": "moe", "...|
|{"Host": "localho...|2021-04-16 20:16:...|{"Host": "moe", "...|
|{"Host": "localho...|2021-04-16 20:16:...|{"Host": "moe", "...|
|{"Host": "localho...|2021-04-16 20:16:...|{"Host": "moe", "...|
|{"Host": "localho...|2021-04-16 20:16:...|{"Host": "moe", "...|
|{"Host": "localho...|2021-04-16 20:16:...|{"Host": "moe", "...|
|{"Host": "localho...|2021-04-16 20:16:...|{"Host": "moe", "...|
|{"Host": "localho...|2021-04-16 20:16:...|{"Host": "moe", "...|
|{"Host": "localho...|2021-04-16 20:16:...|{"Host": "moe", "...|
|{"Host": "localho...|2021-04-16 20:16:...|{"Host": "moe", "...|
|{"Host": "localho...|2021-04-16 20:16:...|{"Host": "moe", "...|
|{"Host": "localho...|2021-04-16 20:16:...|{"Host": "moe", "...|
|{"Host": "localho...|202

We then extract the events from this raw table to an extracted_events table with column correctly separated. 

In [6]:
extracted_events = munged_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
        .toDF()

We can then filter events based on their event_type in order to see only purchase sword events or other event types. 

In [7]:
sword_purchases = extracted_events \
        .filter(extracted_events.event_type == 'purchase_sword')

In [8]:
sword_purchases.show()

+------+-------------+----+-----------+--------------+--------------------+
|Accept|Cache-Control|Host| User-Agent|    event_type|           timestamp|
+------+-------------+----+-----------+--------------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2021-04-16 20:16:...|
+------+-------------+----+-----------+--------------+--------------------+



In [9]:
default_hits = extracted_events \
        .filter(extracted_events.event_type == 'default')

In [10]:
default_hits.show()

+------+-------------+----+-----------+----------+--------------------+
|Accept|Cache-Control|Host| User-Agent|event_type|           timestamp|
+------+-------------+----+-----------+----------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|   default|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2021-04-16 20:16:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2021-04-16 20:16:...|
+------+-------------+----+-----------+----------+--------------

# New with Week 12

In week 12, we are adding two more event types, the buy_sword and join_guild events, which are associated with their own pages on the Flask server. The process of extraction is the same, we read from Kafka with Spark, filter for the event types we are interested in and then display them in a table. Except this week we also write the extracted events to HDFS. 

In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

In [2]:
@udf('boolean')
def is_buy_sword(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'buy_sword':
        return True
    return False

In [3]:
@udf('boolean')
def is_join_guild(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'join_guild':
        return True
    return False

In [4]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [5]:
raw_events.show(5)

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2021-04-16 20:21:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2021-04-16 20:21:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2021-04-16 20:21:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2021-04-16 20:21:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2021-04-16 20:21:...|            0|
+----+--------------------+------+---------+------+--------------------+-------------+
only showing top 5 rows



### Join Guild Events
Here we filter for only join_guild events

In [6]:
join_guild_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_join_guild('raw'))

In [7]:
join_guild_events.show()

+--------------------+--------------------+
|                 raw|           timestamp|
+--------------------+--------------------+
|{"Accept": "*/*",...|2021-04-16 20:21:...|
|{"Accept": "*/*",...|2021-04-16 20:21:...|
|{"Accept": "*/*",...|2021-04-16 20:21:...|
|{"Accept": "*/*",...|2021-04-16 20:21:...|
|{"Accept": "*/*",...|2021-04-16 20:21:...|
|{"Accept": "*/*",...|2021-04-16 20:21:...|
|{"Accept": "*/*",...|2021-04-16 20:21:...|
|{"Accept": "*/*",...|2021-04-16 20:21:...|
|{"Accept": "*/*",...|2021-04-16 20:21:...|
|{"Accept": "*/*",...|2021-04-16 20:21:...|
|{"Accept": "*/*",...|2021-04-16 20:22:...|
|{"Accept": "*/*",...|2021-04-16 20:22:...|
|{"Accept": "*/*",...|2021-04-16 20:22:...|
|{"Accept": "*/*",...|2021-04-16 20:22:...|
|{"Accept": "*/*",...|2021-04-16 20:22:...|
|{"Accept": "*/*",...|2021-04-16 20:22:...|
|{"Accept": "*/*",...|2021-04-16 20:22:...|
|{"Accept": "*/*",...|2021-04-16 20:22:...|
|{"Accept": "*/*",...|2021-04-16 20:22:...|
|{"Accept": "*/*",...|2021-04-16

Then we convert this table to a more readable tabular format

In [8]:
extracted_join_guild_events = join_guild_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [9]:
extracted_join_guild_events.show(5)

+------+-----------------+---------------+----------+-------------------+--------------------+
|Accept|             Host|     User-Agent|event_type|         guild_type|           timestamp|
+------+-----------------+---------------+----------+-------------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
+------+-----------------+---------------+----------+-------------------+--------------------+
only showing top 5 rows



In [10]:
#write join_guild events to a subfolder in HDFS for join_guild events in parquet format
extracted_join_guild_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/join_guild')

In [11]:
#read back in what we just wrote to HDFS
join_guild = spark.read.parquet('/tmp/join_guild')

In [12]:
join_guild.show(5)

+------+-----------------+---------------+----------+-------------------+--------------------+
|Accept|             Host|     User-Agent|event_type|         guild_type|           timestamp|
+------+-----------------+---------------+----------+-------------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
+------+-----------------+---------------+----------+-------------------+--------------------+
only showing top 5 rows



We can even create a temporary table from what we read in from HDFS and query it using SQL. 

In [13]:
join_guild.registerTempTable('join_guild')

In [14]:
join_guild_spark_df = spark.sql("select * from join_guild where Host = 'user1.comcast.com'")

In [15]:
join_guild_spark_df.show()

+------+-----------------+---------------+----------+-------------------+--------------------+
|Accept|             Host|     User-Agent|event_type|         guild_type|           timestamp|
+------+-----------------+---------------+----------+-------------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|swordsmith alliance|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_gui

Then we can take the table of values returned from our SQL query and convert it into a Pandas table that we can do typical python analysis with. 

In [16]:
join_guild_df = join_guild_spark_df.toPandas()
join_guild_df.head()

Unnamed: 0,Accept,Host,User-Agent,event_type,guild_type,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,join_guild,swordsmith alliance,2021-04-16 20:21:52.078
1,*/*,user1.comcast.com,ApacheBench/2.3,join_guild,swordsmith alliance,2021-04-16 20:21:52.082
2,*/*,user1.comcast.com,ApacheBench/2.3,join_guild,swordsmith alliance,2021-04-16 20:21:52.084
3,*/*,user1.comcast.com,ApacheBench/2.3,join_guild,swordsmith alliance,2021-04-16 20:21:52.088
4,*/*,user1.comcast.com,ApacheBench/2.3,join_guild,swordsmith alliance,2021-04-16 20:21:52.09


In [17]:
join_guild_df.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,guild_type,timestamp
count,10,10,10,10,10,10
unique,1,1,1,1,1,10
top,*/*,user1.comcast.com,ApacheBench/2.3,join_guild,swordsmith alliance,2021-04-16 20:21:52.093
freq,10,10,10,10,10,1


We can ask questions like how many unique people joined a guild?

In [24]:
join_guild_df.Host.nunique()

1

Or what day did people join guilds?

In [32]:
join_guild_df['Day']=[d.split(' ')[0].split('-')[2] for d in join_guild_df.timestamp]
join_guild_df.Day

0    16
1    16
2    16
3    16
4    16
5    16
6    16
7    16
8    16
9    16
Name: Day, dtype: object

As we can see, everyone joined on the 16th of the month, when I was running this analysis. 

### Buy Sword 

Now we filter only for buy_sword events and repeat the same process of extracting from Kafka, writing to HDFS, reading back in the data we just wrote, creating a queryable temp table, then converting the table returned by SQL to a pandas dataframe and doing some simple analysis. 

In [33]:
#filter raw events for only buy_sword events using the 'is_buy_sword' function
buy_sword_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_buy_sword('raw'))

In [34]:
buy_sword_events.show(5)

+--------------------+--------------------+
|                 raw|           timestamp|
+--------------------+--------------------+
|{"Host": "user1.c...|2021-04-16 20:21:...|
|{"Host": "user1.c...|2021-04-16 20:21:...|
|{"Host": "user1.c...|2021-04-16 20:21:...|
|{"Host": "user1.c...|2021-04-16 20:21:...|
|{"Host": "user1.c...|2021-04-16 20:21:...|
+--------------------+--------------------+
only showing top 5 rows



In [35]:
extracted_buy_sword_events = buy_sword_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [36]:
extracted_buy_sword_events.show(5)

+------+-----------------+---------------+----------+----------+--------------------+
|Accept|             Host|     User-Agent|event_type|sword_type|           timestamp|
+------+-----------------+---------------+----------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
+------+-----------------+---------------+----------+----------+--------------------+
only showing top 5 rows



In [37]:
#write to HDFS
extracted_buy_sword_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/buy_sword')

In [38]:
#read back in what we just wrote to HDFS
buy_sword = spark.read.parquet('/tmp/buy_sword')

In [39]:
buy_sword.show(5)

+------+-----------------+---------------+----------+----------+--------------------+
|Accept|             Host|     User-Agent|event_type|sword_type|           timestamp|
+------+-----------------+---------------+----------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
+------+-----------------+---------------+----------+----------+--------------------+
only showing top 5 rows



In [40]:
#make an SQL-queryable temp table
buy_sword.registerTempTable('buy_sword')

In [41]:
#query it with SQL
buy_sword_spark_df = spark.sql("select * from buy_sword where Host = 'user1.comcast.com'")

In [42]:
buy_sword_spark_df.show(5)

+------+-----------------+---------------+----------+----------+--------------------+
|Accept|             Host|     User-Agent|event_type|sword_type|           timestamp|
+------+-----------------+---------------+----------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
|   */*|user1.comcast.com|ApacheBench/2.3| buy_sword|  scimitar|2021-04-16 20:21:...|
+------+-----------------+---------------+----------+----------+--------------------+
only showing top 5 rows



In [43]:
#write to a pandas dataframe
buy_sword_df = buy_sword_spark_df.toPandas()
buy_sword_df.head()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_type,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,buy_sword,scimitar,2021-04-16 20:21:48.404
1,*/*,user1.comcast.com,ApacheBench/2.3,buy_sword,scimitar,2021-04-16 20:21:48.407
2,*/*,user1.comcast.com,ApacheBench/2.3,buy_sword,scimitar,2021-04-16 20:21:48.41
3,*/*,user1.comcast.com,ApacheBench/2.3,buy_sword,scimitar,2021-04-16 20:21:48.413
4,*/*,user1.comcast.com,ApacheBench/2.3,buy_sword,scimitar,2021-04-16 20:21:48.419


In [44]:
buy_sword_df.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_type,timestamp
count,10,10,10,10,10,10
unique,1,1,1,1,1,10
top,*/*,user1.comcast.com,ApacheBench/2.3,buy_sword,scimitar,2021-04-16 20:21:48.422
freq,10,10,10,10,10,1


How many people bought a sword?

In [46]:
buy_sword_df.Host.nunique()

1

What day of the month did people buy a sword?

In [48]:
buy_sword_df['Day']=[d.split(' ')[0].split('-')[2] for d in buy_sword_df.timestamp]
buy_sword_df.Day

0    16
1    16
2    16
3    16
4    16
5    16
6    16
7    16
8    16
9    16
Name: Day, dtype: object

# New in Week 13

In this week, we are reading streaming data from Kafka, instead of static calls to the server. 

In [1]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

In [2]:
#this function defines the schema of the data to be read in from Kafka
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

In [3]:
#again we have a filter for purchase_sword events
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [4]:
#we read the raw events from Kafka
raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

In [5]:
#then we filter for only the purchase_sword events
sword_purchases = raw_events \
        .filter(is_sword_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

In [6]:
#then we create a 'sink' to capture the data being streamed in through Kafka
sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

In [7]:
sink.stop()

#### Check that the files are writing to HDFS by running this from a Linux command line on your machine

docker-compose exec cloudera hadoop fs -ls /tmp/sword_purchases

#### Then you can setup an external table of event data using Hive with these commands

docker-compose exec cloudera hive

create external table if not exists default.sword_purchases (Accept string, Host string, User_Agent string, event_type string, timestamp string) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY");

exit;

#### And finally you can query the data using Presto

docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

(Check what tables are available from Hive)

show tables;

(Query tables)

select * from sword_purchases;

(Count hits to the web server)

select count(*) from sword_purchases;


Hopefully this has been an enlightening demonstration of how a big data pipeline can be used to extract information from a mobile app. Kafka, Spark, Hadoop and Presto are great tools for transferring, storing, and querying relevant business data. 