In [1]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [2]:
sc

In [3]:
spark

In [4]:
import random
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType

In [5]:
from pyspark.ml.pipeline import PipelineModel

In [52]:
globals()['models_loaded'] = False
globals()['my_model'] = None

# Toy predict function that returns a random probability. Normally you'd use your loaded globals()['my_model'] here
#def predict(df):
    #print(globals()['my_model'].keys)
#    return globals()['my_model'].transform(df)

#predict_udf = udf(predict, StringType())




def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    # Utilize our predict function
#    df_withpreds = df.withColumn("pred", predict_udf(
#        struct([df[x] for x in df.columns])
#    ))
#    df_withpreds.show()
    
    # Normally, you wouldn't use a UDF (User Defined Function) Python function to predict as we did here (you can)
    # but an MLlib model you've built and saved with Spark
    # In this case, you need to prevent loading your model in every call to "process" as follows:
    
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = PipelineModel.load("#Path")
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model (uncomment below):
    
    df_result = globals()['my_model'].transform(df)
    df_result.select('app_id','review_id','review_text','probability', 'prediction').show()

In [53]:
ssc = StreamingContext(sc, 10)

In [54]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [55]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|2086140|    0|137833533|Excited to play i...|
|1294810|    1|137832855|I think the game ...|
+-------+-----+---------+--------------------+

+-------+---------+--------------------+--------------------+----------+
| app_id|review_id|         review_text|         probability|prediction|
+-------+---------+--------------------+--------------------+----------+
|2086140|137833533|Excited to play i...|[0.00997483202920...|       1.0|
|1294810|137832855|I think the game ...|[5.50898886634599...|       1.0|
+-------+---------+--------------------+--------------------+----------+

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|1294810|    0|137831921|In it's current s...|
|1294810|    0|137830511|I mean the game i...|
+-------+-----+---------+------------------

In [51]:
ssc_t.stop()

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|1494420|    1|137833699|It's good for wha...|
|1494420|    0|137833643|Game is in very r...|
+-------+-----+---------+--------------------+

----- Stopping... this may take a few seconds -----
+---------+--------------------+--------------------+----------+
|review_id|         review_text|         probability|prediction|
+---------+--------------------+--------------------+----------+
|137833699|It's good for wha...|[0.99999993429189...|       0.0|
|137833643|Game is in very r...|[2.25301279080010...|       1.0|
+---------+--------------------+--------------------+----------+

