In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

In [2]:
@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

In [3]:
spark

In [4]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [5]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2020-11-06 00:27:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2020-11-06 00:28:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2020-11-06 00:30:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2020-11-06 00:30:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2020-11-06 00:30:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     5|2020-11-06 00:30:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     6|2020-11-06 00:30:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     7|2020-11-06 00:30:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0

In [6]:
munged_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .withColumn('munged', munge_event('raw'))

In [7]:
munged_events.show()

+--------------------+--------------------+--------------------+
|                 raw|           timestamp|              munged|
+--------------------+--------------------+--------------------+
|{"Host": "localho...|2020-11-06 00:27:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-11-06 00:28:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-11-06 00:30:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-11-06 00:30:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-11-06 00:30:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-11-06 00:30:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-11-06 00:30:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-11-06 00:30:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-11-06 00:30:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-11-06 00:30:...|{"Host": "moe", "...|
+--------------------+--------------------+--------------------+



In [8]:
extracted_events = munged_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
        .toDF()

In [9]:
extracted_events.show()

+------+-------------+----+-----------+--------------+--------------------+
|Accept|Cache-Control|Host| User-Agent|    event_type|           timestamp|
+------+-------------+----+-----------+--------------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|       default|2020-11-06 00:27:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-11-06 00:28:...|
|   */*|     no-cache| moe|curl/7.47.0|       default|2020-11-06 00:30:...|
|   */*|     no-cache| moe|curl/7.47.0|       default|2020-11-06 00:30:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-11-06 00:30:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-11-06 00:30:...|
|   */*|     no-cache| moe|curl/7.47.0|       default|2020-11-06 00:30:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-11-06 00:30:...|
|   */*|     no-cache| moe|curl/7.47.0|       default|2020-11-06 00:30:...|
|   */*|     no-cache| moe|curl/7.47.0|       default|2020-11-06 00:30:...|
+------+----

In [10]:
sword_purchases = extracted_events \
        .filter(extracted_events.event_type == 'purchase_sword')

In [11]:
sword_purchases.show()

+------+-------------+----+-----------+--------------+--------------------+
|Accept|Cache-Control|Host| User-Agent|    event_type|           timestamp|
+------+-------------+----+-----------+--------------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-11-06 00:28:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-11-06 00:30:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-11-06 00:30:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-11-06 00:30:...|
+------+-------------+----+-----------+--------------+--------------------+



In [12]:
default_hits = extracted_events \
        .filter(extracted_events.event_type == 'default')

In [13]:
default_hits.show()

+------+-------------+----+-----------+----------+--------------------+
|Accept|Cache-Control|Host| User-Agent|event_type|           timestamp|
+------+-------------+----+-----------+----------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|   default|2020-11-06 00:27:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2020-11-06 00:30:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2020-11-06 00:30:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2020-11-06 00:30:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2020-11-06 00:30:...|
|   */*|     no-cache| moe|curl/7.47.0|   default|2020-11-06 00:30:...|
+------+-------------+----+-----------+----------+--------------------+

