In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = \
  '--conf spark.cassandra.connection.host=cassandra --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 pyspark-shell'

In [2]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [3]:
sc = SparkContext(appName="BigDataRiver")
sc.setLogLevel("WARN")
sc.setCheckpointDir('checkpoint/')
ssc = StreamingContext(sc, 60)
sql = SQLContext(sc)

In [4]:
kafkaStream = KafkaUtils.createDirectStream(ssc, ['bdr'], {"metadata.broker.list": 'kafka:9092'})

In [5]:
parsed = kafkaStream.map(lambda v: v[1])

In [6]:
def usersWhoBoughtXAlsoBought(df):
    productDf = df.select('user_id', 'product')
    otherProductDf = productDf.toDF('user_id', 'other_product')
    matchedProductsDf = productDf.join(otherProductDf, otherProductDf['user_id'] == productDf['user_id'], 'inner').\
            filter("`product` != `other_product`").select('product','other_product').\
            groupby('product','other_product').count().toDF("product","other_product","count")
    return matchedProductsDf

In [7]:
def selectTopProducts(df):
    df.registerTempTable("products")
    topProductsDf = sql.sql("""
        SELECT
            *,
            ROW_NUMBER() OVER(PARTITION BY product ORDER BY count DESC) rn
        FROM products
    """).where("rn <= 5").groupBy("product").agg(F.collect_list("other_product").alias("other_products"))
    return topProductsDf

In [9]:
def processStream(rdd):
    df = sql.read.json(rdd)
    if(len(df.columns)):
        #store all user products from stream into C*
        df.select('user_id', 'product').\
            write.format("org.apache.spark.sql.cassandra").\
            mode('append').options(table="all_user_products", keyspace="bdr").save()
        
        #read all from C* and perform the business logic
        allUserProductsDf = sql.read.format("org.apache.spark.sql.cassandra").\
            options(table="all_user_products", keyspace="bdr").load().cache()
    
        topDf = selectTopProducts(usersWhoBoughtXAlsoBought(allUserProductsDf))
        topDf.show()

        topDf.write.format("org.apache.spark.sql.cassandra").\
            mode('append').options(table="top_other_products_kappa", keyspace="bdr").save()
            
        print ("Done")
    else:
        print ("Empty")

In [10]:
parsed.foreachRDD(lambda rdd: processStream(rdd))

In [None]:
ssc.start()
ssc.awaitTermination()

+-------+--------------------+
|product|      other_products|
+-------+--------------------+
|     26|[84, 58, 96, 73, 92]|
|     29|[61, 62, 36, 82, 30]|
|     65|[89, 52, 96, 66, 87]|
|     19|[64, 33, 98, 41, 28]|
|     54|[10, 18, 91, 15, 21]|
|      0|[96, 83, 41, 48, 21]|
|     22| [64, 82, 58, 46, 2]|
|      7|[15, 85, 91, 90, 39]|
|     77|[16, 51, 50, 91, 21]|
|     34|[82, 36, 31, 58, 46]|
|     50|[47, 10, 32, 16, 91]|
|     94|[91, 84, 41, 15, 12]|
|     57|[12, 81, 61, 60, 30]|
|     32|[50, 15, 39, 10, 76]|
|     43|[82, 85, 30, 89, 68]|
|     84|[92, 26, 96, 44, 36]|
|     31|[89, 78, 34, 25, 80]|
|     39|[91, 15, 32, 10, 48]|
|     98|[68, 97, 37, 64, 21]|
|     25|[64, 82, 11, 97, 58]|
+-------+--------------------+
only showing top 20 rows

Done
+-------+--------------------+
|product|      other_products|
+-------+--------------------+
|     26|[84, 58, 96, 73, 92]|
|     29|[62, 61, 36, 82, 81]|
|     65|[89, 52, 96, 66, 87]|
|     19| [33, 98, 21, 64, 9]|
|     54