In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.cassandra.connection.host=cassandra --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 pyspark-shell'

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [None]:
sc = SparkContext(appName="BigDataRiver")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 20)
sql = SQLContext(sc)

In [None]:
kafkaStream = KafkaUtils.createDirectStream(ssc, ['bdr'], {"metadata.broker.list": 'kafka:9092'})

In [None]:
parsed = kafkaStream.map(lambda v: v[1])

In [None]:
schema = StructType([
    StructField("product", LongType(), False),
    StructField("other_product", LongType(), False),
    StructField("count", LongType(), False)
])

def add(p1,o1,c1,p2,o2,c2):
  if p2 is None:
    return (p1,o1,c1)
  elif p1 is None:
    return (p2,o2,c2)
  else: 
    return (p1,o1,c1+c2)

add_udf = F.udf(add, schema)


In [None]:
def bdr(rdd):
    df0 = sql.read.json(rdd)
    if(len(df0.columns)):
        df = df0.select('user_id', 'product').cache()
        users = df.toDF('user_id', 'other_product')
        #stream products
        s_products = df.join(users, users['user_id'] == df['user_id'], 'inner').filter("`product` != `other_product`").select('product','other_product').groupby('product','other_product').count().toDF("p1","o1","c1")
        #products stored in cassandra
        c_products = sql.read.format("org.apache.spark.sql.cassandra").options(table="all_products", keyspace="bdr").load().toDF("p2","o2","c2")
        #join both
        products = s_products.join(c_products, (s_products['p1'] == c_products['p2']) & (s_products['o1'] == c_products['o2']), 'outer')
        new_products = products.withColumn('add_column', add_udf(products['p1'],products['o1'],products['c1'],products['p2'],products['o2'],products['c2'])).select("add_column.product","add_column.other_product","add_column.count").cache()
        #store back the latest counts
        new_products.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="all_products", keyspace="bdr").save()
        
        #now calculate the top products
        p_o_rdd = new_products.rdd.map(lambda r: (r[0], (r[1], r[2]))).groupByKey().mapValues(lambda x: map(lambda z: z[0], sorted(x, reverse=True, key=lambda y: y[1])[:5]))
        schema = StructType([
            StructField("product", LongType(), False),
            StructField("other_products", ArrayType(LongType()), False)
        ])
        sql.createDataFrame(p_o_rdd, schema).write.format("org.apache.spark.sql.cassandra").mode('append').options(table="top_other_products", keyspace="bdr").save()
        
    else:
        print "Empty"
    

In [None]:
parsed.foreachRDD(lambda rdd: bdr(rdd))

In [None]:
ssc.start()
ssc.awaitTermination()