In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = \
  '--conf spark.cassandra.connection.host=cassandra --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 pyspark-shell'

In [2]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [3]:
sc = SparkContext(appName="BigDataRiver")
sc.setLogLevel("WARN")
sc.setCheckpointDir('checkpoint/')
ssc = StreamingContext(sc, 60)
sql = SQLContext(sc)

In [4]:
kafkaStream = KafkaUtils.createDirectStream(ssc, ['bdr'], {"metadata.broker.list": 'kafka:9092'})

In [5]:
parsed = kafkaStream.map(lambda v: v[1])

In [6]:
def usersWhoBoughtXAlsoBought(df):
    productDf = df.select('user_id', 'product')
    otherProductDf = productDf.toDF('user_id', 'other_product')
    matchedProductsDf = productDf.join(otherProductDf, otherProductDf['user_id'] == productDf['user_id'], 'inner').\
            filter("`product` != `other_product`").select('product','other_product').\
            groupby('product','other_product').count().toDF("product","other_product","count")
    return matchedProductsDf

In [7]:
def selectTopProducts(df):
    df.registerTempTable("products")
    topProductsDf = sql.sql("""
        SELECT
            *,
            ROW_NUMBER() OVER(PARTITION BY product ORDER BY count DESC) rn
        FROM products
    """).where("rn <= 5").groupBy("product").agg(F.collect_list("other_product").alias("other_products"))
    return topProductsDf

In [8]:
def processStream(rdd):
    df = sql.read.json(rdd)
    if(len(df.columns)):
        #store all user products from stream into C*
        df.select('user_id', 'product').\
            write.format("org.apache.spark.sql.cassandra").\
            mode('append').options(table="all_user_products", keyspace="bdr").save()
        
        #read all from C* and perform the business logic
        allUserProductsDf = sql.read.format("org.apache.spark.sql.cassandra").\
            options(table="all_user_products", keyspace="bdr").load().cache()
    
        topDf = selectTopProducts(usersWhoBoughtXAlsoBought(allUserProductsDf))
        topDf.show()

        topDf.write.format("org.apache.spark.sql.cassandra").\
            mode('append').options(table="top_other_products_kappa", keyspace="bdr").save()
            
        print "Done"
    else:
        print "Empty"

In [9]:
parsed.foreachRDD(lambda rdd: processStream(rdd))

In [10]:
ssc.start()
ssc.awaitTermination()

Empty
+-------+--------------+
|product|other_products|
+-------+--------------+
|      6|      [18, 99]|
|     72|       [5, 51]|
|     51|       [5, 72]|
|      5|      [72, 51]|
|     18|       [99, 6]|
|     99|       [18, 6]|
+-------+--------------+

Done
+-------+--------------------+
|product|      other_products|
+-------+--------------------+
|     77|                [55]|
|     57|            [30, 85]|
|     43|                [41]|
|     25|             [2, 99]|
|      6|            [18, 99]|
|     68| [21, 17, 9, 83, 11]|
|     72|             [5, 51]|
|      9|[11, 97, 83, 17, 21]|
|     51|             [5, 72]|
|     17| [11, 9, 21, 97, 68]|
|     41|                [43]|
|     33|                 [5]|
|      5|        [72, 33, 51]|
|     89|                [67]|
|     85|            [57, 30]|
|     67|                [89]|
|     61|                [99]|
|     83| [21, 9, 11, 68, 17]|
|     55|                [77]|
|     62|                [30]|
+-------+----------------

Traceback (most recent call last):
  File "/opt/conda/envs/python2/lib/python2.7/SocketServer.py", line 290, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/envs/python2/lib/python2.7/SocketServer.py", line 318, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/envs/python2/lib/python2.7/SocketServer.py", line 331, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/envs/python2/lib/python2.7/SocketServer.py", line 652, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/pyspark/serializers.py", line 545, in read_int
    raise EOFError
EOFError
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 883, in send_command
    resp

Py4JError: An error occurred while calling o27.awaitTermination