In [10]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [11]:
sc

In [12]:
spark

In [13]:
import random
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType
import pandas as pd

In [19]:
from pyspark.ml.classification import LogisticRegression 
from pyspark.ml.pipeline import PipelineModel
globals()['models_loaded'] = False
globals()['my_model'] = None
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, Word2Vec
from pyspark.ml.clustering import LDA

# Toy predict function that returns a random probability. Normally you'd use your loaded globals()['my_model'] here
#def predict(df):
 #   return random.random()

#predict_udf = udf(predict, StringType())

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df = df.select(['votes', 'comments','source_text'])
    df.show()

    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = PipelineModel.load("LR") 
        globals()['models_loaded'] = True

    df_result = globals()['my_model'].transform(df)
    df_result.select('votes', 'comments', "w2v", "topicDistribution", "finalFeatures", 'rawPrediction', 'probability', 'prediction').show()


In [20]:
ssc = StreamingContext(sc, 10)

In [21]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [22]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+-----+--------+--------------------+
|votes|comments|         source_text|
+-----+--------+--------------------+
|    1|       0|China Builds Worl...|
|    2|       0|GitHub - srush/LL...|
|    2|       0|Reddit-OpenAI dea...|
|    3|       0|Asmi 24.04: Ubunt...|
|    1|       0|Life and Death of...|
+-----+--------+--------------------+

+-----+--------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|votes|comments|                 w2v|   topicDistribution|       finalFeatures|       rawPrediction|         probability|prediction|
+-----+--------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    1|       0|[0.13116427509906...|[3.11554042620429...|[1.0,0.0,0.131164...|[4.19727054445399...|[0.98518618634509...|       0.0|
|    2|       0|[0.01696339680946...|[4.09609587297592...|[2.0,0.0,0.016963...|[3.63897832336453...|[0.97439373260810...|    

In [23]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
