In [1]:
!pip install kafka-python
!pip install redis



In [2]:
import os
import requests
import json
import redis
import pyspark
import base64
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from datetime import datetime
import uuid

In [3]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--master local[2] pyspark-shell'
conf = SparkConf().set("spark.jars","/home/jovyan/work/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc,60) # 60 is the batch interval :)
debug = True
saveToLocalDisk = True
spark = SparkSession(sc)

In [4]:
from pyspark.sql import Window,WindowSpec;
from pyspark.sql.functions import *
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType, MapType

def get_bucket(rank):
    return math.floor(rank/10)

def get_json(summary, title, link, timestamp):
    return {"Summary":summary, "Title":title, "Link":link, "Timestamp":timestamp}

def get_unicode_escaped_string(text):
    if(text is None):
        return ""
    else:
        return text.encode("ascii").decode("unicode_escape")

def SetValHistory(x):
    r = redis.StrictRedis(host = 'redis-store', port = 6379)
    idVal = str(uuid.uuid4())
    if (r.get('LatestNews') != None):
        try:
            batchSet = json.loads(r.get('LatestNews'))
            if ("id" in batchSet):
                r.set('LatestNews',"{\"tasks\":"+json.dumps(x)+", \"id\":\""+idVal+"\", \"next\":\""+batchSet["id"]+"\"}")
                r.set(idVal,"{\"tasks\":"+json.dumps(x)+", \"id\":\""+idVal+"\", \"next\":\""+batchSet["id"]+"\"}")
            else:
                r.set('LatestNews',"{\"tasks\":"+json.dumps(x)+", \"id\":\""+idVal+"\", \"next\":\"\"}")
                r.set(idVal,"{\"tasks\":"+json.dumps(x)+", \"id\":\""+idVal+"\", \"next\":\"\"}")
        except Exception:
            r.set('LatestNews',"{\"tasks\":"+json.dumps(x)+", \"id\":\""+idVal+"\", \"next\":\"\"}")
            r.set(idVal,"{\"tasks\":"+json.dumps(x)+", \"id\":\""+idVal+"\", \"next\":\"\"}")
    else:
        r.set('LatestNews',"{\"tasks\":"+json.dumps(x)+", \"id\":\""+idVal+"\", \"next\":\"\"}")
        r.set(idVal,"{\"tasks\":"+json.dumps(x)+", \"id\":\""+idVal+"\", \"next\":\"\"}")
        
def SetVal(x):
    r = redis.StrictRedis(host = 'redis-store', port = 6379)
    idVal = str(uuid.uuid4())
    news = r.get('LatestNews')
    if news != None:
        try:
            batchSet = json.loads(news)
            r.set('LatestNews',"{\"tasks\":["+str(x)+"], \"id\":\""+idVal+"\", \"next\":\""+batchSet["id"]+"\"}")
            r.set(idVal,"{\"tasks\":["+str(x)+"], \"id\":\""+idVal+"\", \"next\":\""+batchSet["id"]+"\"}")
        except Exception:
            r.set('LatestNews',"{\"tasks\":["+str(x)+"], \"id\":\""+idVal+"\", \"next\":\"\"}")
            r.set(idVal,"{\"tasks\":["+str(x)+"], \"id\":\""+idVal+"\", \"next\":\"\"}")
    else:
        #try:
        #    past_data = spark.read.option("header", "false").csv("/home/jovyan/work/NewsData.csv/*.csv")
        #    window=Window.orderBy(col("Timestamp").desc());
        #    df = past_data.toDF("Timestamp","Title","Link","Summary1");
        #    ranked = df.withColumn("Rank",dense_rank().over(window)).cache().drop_duplicates();
        #    get_bucket_udf = udf(get_bucket, IntegerType())
        #    get_json_udf = udf(get_json, MapType(StringType(),StringType()))
        #    get_unicode_escaped_string_udf = udf(get_unicode_escaped_string, StringType())
        #    branked = ranked.withColumn("Bucket", get_bucket_udf('Rank')) \
        #                    .withColumn("Summary", get_unicode_escaped_string_udf("Summary1")) \
        #                    .withColumn("JSON",get_json_udf("Summary","Title","Link","Timestamp")) \
        #                    .groupBy("Bucket").agg(collect_list("JSON").alias("Dictionary"))
        #    for val in branked.select("Dictionary").rdd.collect():
        #        SetValHistory([item for sublist in val for item in sublist])
        #except Exception:
        #    r.set('LatestNews',"{\"tasks\":["+str(x)+"], \"id\":\""+idVal+"\", \"next\":\"\"}")
        #    r.set(idVal,"{\"tasks\":["+str(x)+"], \"id\":\""+idVal+"\", \"next\":\"\"}")
        r.set('LatestNews',"{\"tasks\":["+str(x)+"], \"id\":\""+idVal+"\", \"next\":\"\"}")
        r.set(idVal,"{\"tasks\":["+str(x)+"], \"id\":\""+idVal+"\", \"next\":\"\"}")
        
def SaveToNewsFile(rdd):
    if not rdd.isEmpty():
        rdd.toDF() \
        .write.save("/home/jovyan/work/NewsData.csv",
                    maxRecordsPerFile=1000, format="csv", mode="append") 
    
def SaveToClickFile(rdd):
    if not rdd.isEmpty():
        rdd.toDF() \
        .write.save("/home/jovyan/work/ClickData.csv",
                    maxRecordsPerFile=1000, format="csv", mode="append")
    
def SaveToFile(rdd):
    if not rdd.isEmpty():
        rdd.toDF( [ "Ticker", "Timestamp", "Price" ] ) \
        .write.save("/home/jovyan/work/FinanceData.csv",
                    maxRecordsPerFile=1000, format="csv", mode="append") 
        
def ConvertToTuple(a):
    if not (type(a) is list): 
        return (a['Timestamp'],a['Title'])
    else:
        return None

In [None]:
kafkaStream = KafkaUtils.createStream(
    ssc=ssc, 
    zkQuorum='zk-cs:2181', 
    #zkQuorum='zookeeper:2181', 
    groupId='test-consumer-group',#, #Consumer Group 
    topics={'finance_ticker':1,'news_feed':2, 'click':3})

olines = kafkaStream.map(lambda x: x[1]).map(lambda a: json.loads(a))
if(debug):
    olines.pprint()
if(saveToLocalDisk):
    olines.filter(lambda a: ('Type' in a) & (a['Type']=="Finance")) \
    .map(lambda a: (a['Ticker'],a['Timestamp'],a['Price'])) \
    .foreachRDD(SaveToFile)
    
    olines.filter(lambda a: ('Type' in a) & (a['Type']=="News")) \
    .map(lambda a: (a['Timestamp'],a['Title'],a['Link'],str(a['Summary'].encode("unicode_escape").decode("utf8")))) \
    .foreachRDD(SaveToNewsFile)
    
    olines.filter(lambda a: ('Type' in a) & (a['Type']=="Click")) \
    .foreachRDD(SaveToClickFile)
    
lines = olines.filter(lambda a: ('Type' in a) & (a['Type']=="Finance")) \
        .map(lambda a: str(a['Price'])) \
        .reduce(lambda v,agg: agg+", "+v)

if(debug):
    lines.pprint()

lines = olines.filter(lambda a: ('Type' in a) & (a['Type']=="News")) \
        .map(lambda a: json.dumps(a)).reduce(lambda v,agg: agg+", "+v) \
        .map(lambda x: SetVal(x))
    
if(debug):
    lines.pprint()
    
ssc.start()  
print("Going to wait termination")
ssc.awaitTermination()

Going to wait termination
-------------------------------------------
Time: 2020-01-05 17:46:00
-------------------------------------------

-------------------------------------------
Time: 2020-01-05 17:46:00
-------------------------------------------

-------------------------------------------
Time: 2020-01-05 17:46:00
-------------------------------------------



In [None]:
lines = lines.map(lambda a: requests.post(
        "http://serving:8501/v1/models/half_plus_two:predict",
        "{\"instances\": ["+a+"]}").text)
if(debug):
    lines.pprint()

lines = lines.flatMap(lambda x: json.loads(x)["predictions"])
if(debug):
    lines.pprint()

lines = lines.map(lambda x: SetVal(x))
if(debug):
    lines.pprint()

In [None]:
## Causes error due to mixing up Spark and SQL Contexts. Needs redesigning to support bootstrapping
try:
            past_data = spark.read.option("header", "false").csv("/home/jovyan/work/NewsData.csv/*.csv")
            window=Window.orderBy(col("Timestamp").desc());
            df = past_data.toDF("Timestamp","Title","Link","Summary");
            ranked = df.withColumn("Rank",dense_rank().over(window)).cache().drop_duplicates();
            get_bucket_udf = udf(get_bucket, IntegerType())
            get_json_udf = udf(get_json, MapType(StringType(),StringType()))
            get_unicode_escaped_string_udf = udf(get_unicode_escaped_string, StringType())
            branked = ranked.withColumn("Bucket", get_bucket_udf('Rank')) \
                            .withColumn("Summary", get_unicode_escaped_string_udf("Summary1")) \
                            .withColumn("JSON",get_json_udf("Summary","Title","Link","Timestamp")) \
                            .groupBy("Bucket").agg(collect_list("JSON").alias("Dictionary"))
            for val in branked.select("Dictionary").rdd.collect():
                SetValHistory([item for sublist in val for item in sublist])
        except Exception:
            r.set('LatestNews',"{\"tasks\":["+str(x)+"], \"id\":\""+idVal+"\", \"next\":\"\"}")
            r.set(idVal,"{\"tasks\":["+str(x)+"], \"id\":\""+idVal+"\", \"next\":\"\"}")