In [1]:
import json
from time import time
from operator import add
import argparse

from tdigest import TDigest
from kafka import KeyedProducer, RoundRobinPartitioner, KafkaClient

from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext



In [2]:
percentile_broadcast = None


def load_msg(msg):
    message = json.loads(msg[1])
    return message['user_id'], scores_b.value[message['activity']]


def update_scorecount(new_scores, score_sum):
    if not score_sum:
        score_sum = 0
    return sum(new_scores) + score_sum


def digest_partitions(values):
    digest = TDigest()
    digest.batch_update(values)
    return [digest]


def publish_popular_users(popular_rdd):
    key = 'popular_{}'.format(int(time()))
    message_key = popular_rdd.context.broadcast(key)

    def publish_partition(partition):
        kafka = KafkaClient('kafka:9092')
        producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner,
                                 async=True, batch_send=True)
        producer.send_messages('popular_users', message_key.value,
                               *[json.dumps(user) for user in partition])
    popular_rdd.foreachPartition(publish_partition)


def compute_percentile(rdd):
    global percentile_broadcast
    percentile_limit = rdd.map(lambda row: row[1]).mapPartitions(
        digest_partitions).reduce(add).percentile(0.95)
    percentile_broadcast = rdd.context.broadcast(
        percentile_limit)


def filter_most_popular(rdd):
    global percentile_broadcast
    if percentile_broadcast:
        return rdd.filter(lambda row: row[1] > percentile_broadcast.value)
    return rdd.context.parallelize([])


In [3]:
ssc = StreamingContext(sc, 1)
ssc.checkpoint('checkpoint')
kvs = KafkaUtils.createDirectStream(ssc, ['messages'], {"metadata.broker.list": "kafka:9092"})

In [4]:
scores = {'profile.picture.like': 2, 'profile.view': 1, 'message.private': 3}
scores_b = sc.broadcast(scores)
mapped = kvs.map(load_msg)
updated = mapped.updateStateByKey(update_scorecount)
updated.foreachRDD(compute_percentile)
popular_stream = updated.transform(filter_most_popular)
popular_stream.foreachRDD(publish_popular_users)
ssc.start()
ssc.awaitTermination()

Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/streaming/util.py", line 62, in call
    r = self.func(t, *rdds)
  File "/usr/local/spark/python/pyspark/streaming/dstream.py", line 159, in <lambda>
    func = lambda t, rdd: old_func(rdd)
  File "<ipython-input-2-21c82552202c>", line 31, in publish_popular_users
    popular_rdd.foreachPartition(publish_partition)
  File "/usr/local/spark/python/pyspark/rdd.py", line 766, in foreachPartition
    self.mapPartitions(func).count()  # Force evaluation
  File "/usr/local/spark/python/pyspark/rdd.py", line 1006, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/spark/python/pyspark/rdd.py", line 997, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/usr/local/spark/python/pyspark/rdd.py", line 871, in fold
    vals = self.mapPartitions(func).collect()
  File "/usr/local/spark/python/pyspark/rdd.py", line 773, in collect
    port = self

KeyboardInterrupt: 