In [1]:
# Load modules
import os
import json
import sys
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.ml import PipelineModel
import threading

In [None]:
# Helper thread to avoid the Spark StreamingContext from blocking Jupyter    
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [None]:
# If saved model doesn't load, retrain.
# from nltk.corpus import stopwords
# from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
# from pyspark.ml.classification import LogisticRegression
# from pyspark.ml import Pipeline

# df_pandas_orig = pd.read_csv("DF_Streaming.csv")
# df_pandas = df_pandas_orig.dropna(inplace=False)
# df_spark = spark.createDataFrame(df_pandas)
# training_data, test_data = df_spark.randomSplit([0.8, 0.2], seed = 100)

# stop_words = set(stopwords.words('english'))
# tokenizer = Tokenizer(inputCol="review_text", outputCol="words") # stop words
# stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(list(stop_words))
# count_vectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
# lr_model = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0.0)
# pipeline_lr = Pipeline(stages=[tokenizer, stopwords_remover, count_vectors, lr_model])
# fitted_pipeline_lr = pipeline_lr.fit(training_data)

In [32]:
# Prediction code from saved model
globals()['models_loaded'] = True
globals()['my_model'] = PipelineModel.load('models/score_classifier_lr')

def process(time, rdd):
    if rdd.isEmpty():
        return    
    print("========= %s =========" % str(time))    
    # Convert to data frame
    input = spark.read.json(rdd)
    prediction = globals()['my_model'].transform(input)
    # prediction = fitted_pipeline_lr.transform(input)
    prediction.select("review_id", "app_id", "review_text", "label", "prediction").show()

In [2]:
# Set environment variables for Spark
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Start spark session
spark = SparkSession.builder.appName('Spark_Predictions').getOrCreate()
sc = spark.sparkContext
sc

In [38]:
ssc = StreamingContext(sc, 10)
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [39]:
# Start the review stream
ssc_t = StreamingThread(ssc)
ssc_t.start()

+---------+-------+--------------------+-----+----------+
|review_id| app_id|         review_text|label|prediction|
+---------+-------+--------------------+-----+----------+
|139164398|1559600|It's actually a w...|    1|       1.0|
|139164444|2355750|           Good game|    1|       1.0|
|139165010| 705040|I'm really hurt.\...|    0|       1.0|
+---------+-------+--------------------+-----+----------+

+---------+-------+--------------------+-----+----------+
|review_id| app_id|         review_text|label|prediction|
+---------+-------+--------------------+-----+----------+
|139161195|2413140|Titor's Time Trav...|    1|       0.0|
|139165474|1934780|8/10 - Great grap...|    1|       1.0|
|139163329|1934780|really fun its li...|    1|       1.0|
+---------+-------+--------------------+-----+----------+

+---------+-------+--------------------+-----+----------+
|review_id| app_id|         review_text|label|prediction|
+---------+-------+--------------------+-----+----------+
|139162663|1

In [36]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
