# Assignment 3 - Step 4: Prediction
Group 11:
- Lisa Driessen - r0675727
- Laura Fernández López - r0877908
- Silvia María Goñi Mendia - r0877434
- Peter Day - r0866276

In [66]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [67]:
sc

In [68]:
spark

In [69]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.ml.feature import Tokenizer, StopWordsRemover, StringIndexer, HashingTF, IDF, Word2Vec
from pyspark.ml.classification import LogisticRegression, NaiveBayes, LogisticRegressionModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [70]:
globals()['models_loaded'] = False
globals()['my_model'] = None


def predict(df):
    # Preprocessing
    df.show()
    df = StringIndexer(inputCol='channel', outputCol='label', handleInvalid='keep').fit(df).transform(df)
    df = df.withColumn('message0', f.concat(f.col('username'),f.lit(' '),f.col('message')))
    df = df.withColumn("words1", f.regexp_replace(f.col("message0"), "[\$#,<>+@=?!]", ""))
    df = df.withColumn("words2", f.regexp_replace(f.col("words1"), "  +", " "))
    df = df.dropna()
    tokenizer = Tokenizer(inputCol="words2", outputCol="words3")
    df = tokenizer.transform(df)
    remover = StopWordsRemover(inputCol="words3", outputCol="words")
    df = remover.transform(df)
    word2Vec = Word2Vec(vectorSize=10, minCount=0, inputCol="words", outputCol="w2v")
    w2vmodel = word2Vec.fit(df)
    df = w2vmodel.transform(df)
    # Predict
    df_result = globals()['my_model'].transform(df)
    df_result = df_result.drop(*("username", "words", "message0", "channel", "words1", "words2", "words3", "datetime", "w2v"))
    df_result.show()
    return(df_result)

predict_udf = udf(predict, StringType())

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        print("Loading the model!")
        # load in your models here
        globals()['my_model'] = LogisticRegressionModel.load("A3.model")
        globals()['models_loaded'] = True
    #else:
        print("Model's already loaded")
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show(n=3)
    
    # Utilize our predict function
    predictions = predict(df)
    predictions.show()

In [71]:
ssc = StreamingContext(sc, 10)

In [72]:
lines = ssc.socketTextStream("localhost", 8080)
lines.foreachRDD(process)

In [73]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

Loading the model!
Model's already loaded
+---------+--------------------+--------------------+--------+
|  channel|            datetime|             message|username|
+---------+--------------------+--------------------+--------+
|#jinnytty|2022-05-29T10:20:...|                 yes|   v2ran|
|#jinnytty|2022-05-29T10:20:...|            Clueless| mesh_17|
|#jinnytty|2022-05-29T10:20:...|guyfromswitzerlan...| jorgrim|
+---------+--------------------+--------------------+--------+

+---------+--------------------+--------------------+--------+
|  channel|            datetime|             message|username|
+---------+--------------------+--------------------+--------+
|#jinnytty|2022-05-29T10:20:...|                 yes|   v2ran|
|#jinnytty|2022-05-29T10:20:...|            Clueless| mesh_17|
|#jinnytty|2022-05-29T10:20:...|guyfromswitzerlan...| jorgrim|
+---------+--------------------+--------------------+--------+



In [74]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
+--------------------+-----+--------------------+--------------------+----------+
|             message|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|                 yes|  0.0|[8.89869569456102...|[0.57317590306392...|       0.0|
|            Clueless|  0.0|[8.98592539053067...|[0.53327532540359...|       0.0|
|guyfromswitzerlan...|  0.0|[9.05674753646123...|[0.59642602083634...|       0.0|
+--------------------+-----+--------------------+--------------------+----------+

+--------------------+-----+--------------------+--------------------+----------+
|             message|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|                 yes|  0.0|[8.89869569456102...|[0.57317590306392...|       0.0|
|            Clueless|  0.0|[8.98592539053067