In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

# Linux Commands

### cd into project 3 directory:
cd ~/w205/project-3-juliejlai

### copy in project 3 yml file:
cp ~/w205/course-content/13-Understanding-Data/docker-compose.yml .

### start up cluster:
docker-compose up -d

docker-compose ps

docker ps -a

### copy game_api.py
cp ~/w205/course-content/11-Storing-Data-III/game_api.py .

### modify game_api.py
vi game_api.py

### create a topic:
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

### take flask up:
docker-compose exec mids env FLASK_APP=/w205/project-3-juliejlai/game_api.py flask run --host 0.0.0.0

### generating events using curl:
docker-compose exec mids curl http://localhost:5000/

docker-compose exec mids curl http://localhost:5000/purchase_a_sword

docker-compose exec mids curl http://localhost:5000/buy_a_sword

docker-compose exec mids curl http://localhost:5000/join_guild

### generating events using apache bench:
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/

docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword

docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com"  http://localhost:5000/buy_a_sword

docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com"  http://localhost:5000/join_guild

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword

docker-compose exec mids ab -n 10 -H "Host: user2.att.com"  http://localhost:5000/buy_a_sword

docker-compose exec mids ab -n 10 -H "Host: user2.att.com"  http://localhost:5000/join_guild

### infinite loop to run the apache bench command:
while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword; sleep 5; done

### read from kafka:
docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning -e

### jupyter notebook:
docker-compose exec spark bash

ln -s /w205 w205

exit

docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark

### hive command to create an external table for schema on read:
docker-compose exec cloudera hive

create external table if not exists default.sword_purchases (Accept string, Host string, User_Agent string, event_type string, timestamp string) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY");

### presto query against the external table:
docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

select * from sword_purchases;

select count (\*) from sword_purchases;

### tear down cluster:
docker-compose down

docker-compose ps

docker ps -a

# Extracting events from kafka and write them to hdfs

In [2]:
@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

In [3]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [4]:
raw_events.show(5)

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2020-12-08 17:36:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2020-12-08 17:36:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2020-12-08 17:36:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2020-12-08 17:36:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2020-12-08 17:36:...|            0|
+----+--------------------+------+---------+------+--------------------+-------------+
only showing top 5 rows



In [5]:
munged_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .withColumn('munged', munge_event('raw'))

In [6]:
munged_events.show(5)

+--------------------+--------------------+--------------------+
|                 raw|           timestamp|              munged|
+--------------------+--------------------+--------------------+
|{"Host": "user1.c...|2020-12-08 17:36:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-08 17:36:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-08 17:36:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-08 17:36:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-08 17:36:...|{"Host": "moe", "...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [7]:
extracted_events = munged_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
        .toDF()

In [8]:
extracted_events.show(5)

+------+-------------+----+---------------+--------------+----------+--------------------+
|Accept|Cache-Control|Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-------------+----+---------------+--------------+----------+--------------------+
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
+------+-------------+----+---------------+--------------+----------+--------------------+
only showing top 5 rows



In [9]:
sword_purchases = extracted_events \
        .filter(extracted_events.event_type == 'purchase_sword')

In [10]:
sword_purchases.show(5)

+------+-------------+----+---------------+--------------+----------+--------------------+
|Accept|Cache-Control|Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-------------+----+---------------+--------------+----------+--------------------+
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|     no-cache| moe|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
+------+-------------+----+---------------+--------------+----------+--------------------+
only showing top 5 rows



In [11]:
default_hits = extracted_events \
        .filter(extracted_events.event_type == 'default')

In [12]:
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [13]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [14]:
purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

In [15]:
extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()
extracted_purchase_events.printSchema()
extracted_purchase_events.show(5)

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_type: string (nullable = true)
 |-- timestamp: string (nullable = true)

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
+------+-----------------+-------

In [16]:
extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

## Pyspark code and Basic Analytics using SQL

In [17]:
purchases = spark.read.parquet('/tmp/purchases')

purchases.show(5)

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
+------+-----------------+---------------+--------------+----------+--------------------+
only showing top 5 rows



In [18]:
purchases.registerTempTable('purchases')

## How many hosts are there?

In [19]:
hosts = spark.sql("select distinct(Host) from purchases")

hosts.show()

+-----------------+
|             Host|
+-----------------+
|    user2.att.com|
|user1.comcast.com|
+-----------------+



In [20]:
host_df = hosts.toPandas()

host_df.describe()

Unnamed: 0,Host
count,2
unique,2
top,user1.comcast.com
freq,1


There are two hosts: user1.comcast.com and user2.att.com

## Get information from Host = 'user1.comcast.com'

In [21]:
comcast = spark.sql("select * from purchases where Host = 'user1.comcast.com'")

comcast.show(5)

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 17:36:...|
+------+-----------------+---------------+--------------+----------+--------------------+
only showing top 5 rows



In [22]:
comcast_df = comcast.toPandas()

comcast_df.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_type,timestamp
count,5750,5750,5750,5750,5750,5750
unique,1,1,1,1,1,5750
top,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-08 17:38:16.923
freq,5750,5750,5750,5750,5750,1


## Get information from Host = 'user2.att.com'

In [23]:
att = spark.sql("select * from purchases where Host = 'user2.att.com'")

att.show(5)

+------+-------------+---------------+--------------+----------+--------------------+
|Accept|         Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-------------+---------------+--------------+----------+--------------------+
|   */*|user2.att.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 18:02:...|
|   */*|user2.att.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 18:02:...|
|   */*|user2.att.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 18:02:...|
|   */*|user2.att.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 18:02:...|
|   */*|user2.att.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 18:02:...|
+------+-------------+---------------+--------------+----------+--------------------+
only showing top 5 rows



In [24]:
att_df = att.toPandas()

att_df.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_type,timestamp
count,10,10,10,10,10,10
unique,1,1,1,1,1,10
top,*/*,user2.att.com,ApacheBench/2.3,purchase_sword,knights,2020-12-08 18:02:26.698
freq,10,10,10,10,10,1


## How many event_types exist?

In [25]:
event = spark.sql("select event_type, count(event_type)\
                            from purchases group by event_type")

event.show(5)

+--------------+-----------------+
|    event_type|count(event_type)|
+--------------+-----------------+
|purchase_sword|             5760|
+--------------+-----------------+



In [26]:
event_df = event.toPandas()
event_df

Unnamed: 0,event_type,count(event_type)
0,purchase_sword,5760


## How many sword_types exist?

In [27]:
sword = spark.sql("select sword_type, count(sword_type)\
                            from purchases group by sword_type")

sword.show(5)

+----------+-----------------+
|sword_type|count(sword_type)|
+----------+-----------------+
|   knights|             5760|
+----------+-----------------+



In [28]:
sword_df = event.toPandas()
sword_df

Unnamed: 0,event_type,count(event_type)
0,purchase_sword,5760


In [29]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])


In [30]:
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [31]:
raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

In [32]:
sword_purchases = raw_events \
        .filter(is_sword_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

In [33]:
sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

In [34]:
sink.stop()