In [1]:
import pyspark
import json
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

In [2]:
spark

In [3]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |--
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

@udf('string')
def event_type(event_as_json):
    event = json.loads(event_as_json)
    return event['event_type']

In [4]:
raw_events = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .load()
    
event_types = raw_events.select(event_type(raw_events.value.cast('string')).alias('event_type')) \
    .groupby('event_type') \
    .count()
    
sword_purchases = raw_events \
    .filter(is_sword_purchase(raw_events.value.cast('string'))) \
    .select(raw_events.value.cast('string').alias('raw_event'),
            raw_events.timestamp.cast('string'),
            from_json(raw_events.value.cast('string'),
                      purchase_sword_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')
    
query = event_types \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()
    
sink = sword_purchases \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
    .option("path", "/tmp/sword_purchases") \
    .trigger(processingTime="20 seconds") \
    .start()
    
    
query.awaitTermination()
sink.awaitTermination()

StreamingQueryException: 'Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 206, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main\n    process()\n  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process\n    serializer.dump_stream(func(split_index, iterator), outfile)\n  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 220, in dump_stream\n    self.serializer.dump_stream(self._batched(iterator), stream)\n  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream\n    for obj in iterator:\n  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 209, in _batched\n    for item in iterator:\n  File "<string>", line 1, in <lambda>\n  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 71, in <lambda>\n    return lambda *a: f(*a)\n  File "<ipython-input-3-a470ad8b57d7>", line 30, in event_type\nKeyError: \'event_type\'\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)\n\tat org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)\n\tat org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)\n\tat org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:287)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:287)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:287)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:108)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n\nDriver stacktrace:\n=== Streaming Query ===\nIdentifier: [id = 8b066c75-94ff-44e1-82d0-d9d2c424d667, runId = 380336df-1184-4eaa-8cd9-3e4d37d2bc70]\nCurrent Committed Offsets: {KafkaSource[Subscribe[events]]: {"events":{"0":0}}}\nCurrent Available Offsets: {KafkaSource[Subscribe[events]]: {"events":{"0":1}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nAggregate [event_type#15], [event_type#15, count(1) AS count#21L]\n+- Project [event_type(cast(value#1 as string)) AS event_type#15]\n   +- StreamingExecutionRelation KafkaSource[Subscribe[events]], [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n'

In [5]:
import redis
r = redis.Redis(host='redis', port='6379')

In [9]:
r.set('user1', json.dumps({"Guild": None, "Inventory": {}}).encode())
r.get('user1')

b'{"Guild": null, "Inventory": {}}'

In [10]:
json.loads(r.get('user1'))

{'Guild': None, 'Inventory': {}}