In [3]:
raw_events = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","events").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 

In [4]:
events = raw_events.select(raw_events.value.cast('string'))

In [3]:
import json

In [5]:
extracted_events = events.rdd.map(lambda x: json.loads(x.value)).toDF()



In [6]:
extracted_events.show()

+------+--------------+-----------+--------------+
|Accept|          Host| User-Agent|    event_type|
+------+--------------+-----------+--------------+
|   */*|localhost:5000|curl/7.47.0|       default|
|   */*|localhost:5000|curl/7.47.0|purchase_sword|
|   */*|localhost:5000|curl/7.47.0|purchase_knife|
|   */*|localhost:5000|curl/7.47.0| purchase_frog|
|   */*|localhost:5000|curl/7.47.0|    ride_horse|
|   */*|localhost:5000|curl/7.47.0|climb_mountain|
|   */*|localhost:5000|curl/7.47.0|    ride_horse|
|   */*|localhost:5000|curl/7.47.0| purchase_frog|
+------+--------------+-----------+--------------+



In [7]:
extracted_events \
        .write \
        .parquet("/tmp/extracted_events")

In [9]:
@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

munged_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),
            raw_events.timestamp.cast('string')) \
    .withColumn('munged', munge_event('raw'))
munged_events.show()

extracted_events = munged_events \
    .rdd \
    .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
    .toDF()
extracted_events.show()

extracted_events \
    .write \
    .mode("overwrite") \
    .parquet("/tmp/extracted_events")

+--------------------+--------------------+--------------------+
|                 raw|           timestamp|              munged|
+--------------------+--------------------+--------------------+
|{"Host": "localho...|2019-11-10 05:04:...|{"Host": "moe", "...|
|{"Host": "localho...|2019-11-10 05:05:...|{"Host": "moe", "...|
|{"Host": "localho...|2019-11-10 05:06:...|{"Host": "moe", "...|
|{"Host": "localho...|2019-11-10 05:07:...|{"Host": "moe", "...|
|{"Host": "localho...|2019-11-10 05:07:...|{"Host": "moe", "...|
|{"Host": "localho...|2019-11-10 05:07:...|{"Host": "moe", "...|
|{"Host": "localho...|2019-11-10 05:41:...|{"Host": "moe", "...|
|{"Host": "localho...|2019-11-10 05:42:...|{"Host": "moe", "...|
+--------------------+--------------------+--------------------+

+------+-------------+----+-----------+--------------+--------------------+
|Accept|Cache-Control|Host| User-Agent|    event_type|           timestamp|
+------+-------------+----+-----------+--------------+-------------

In [10]:
# separate.py
import json

from pyspark.sql import Row
from pyspark.sql.functions import udf

@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

raw_events = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

munged_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),
            raw_events.timestamp.cast('string')) \
    .withColumn('munged', munge_event('raw'))

extracted_events = munged_events \
    .rdd \
    .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
    .toDF()

sword_purchases = extracted_events \
    .filter(extracted_events.event_type == 'purchase_sword')
sword_purchases.show()
sword_purchases \
    .write \
    .mode("overwrite") \
    .parquet("/tmp/sword_purchases")
    
default_hits = extracted_events \
    .filter(extracted_events.event_type == 'default')
default_hits.show()
default_hits \
    .write \
    .mode("overwrite") \
    .parquet("/tmp/default_hits")

+------+-------------+----+-----------+--------------+--------------------+
|Accept|Cache-Control|Host| User-Agent|    event_type|           timestamp|
+------+-------------+----+-----------+--------------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2019-11-10 05:05:...|
+------+-------------+----+-----------+--------------+--------------------+

+------+-------------+----+-----------+----------+--------------------+
|Accept|Cache-Control|Host| User-Agent|event_type|           timestamp|
+------+-------------+----+-----------+----------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|   default|2019-11-10 05:04:...|
+------+-------------+----+-----------+----------+--------------------+

