# DATA STREAMING AND SET UP

In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container {width: 95% ! important;}</style>"))

In [2]:
sc

In [3]:
spark

In [None]:
from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [None]:
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc,10)

lines = ssc.socketTextStream("seppe.net", 7778)

In [None]:
lines.saveAsTextFiles("file:///C:/Users/didie/OneDrive/Bureaublad/spark/TrainingData/TrainingData")

In [None]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

In [None]:
# Wait a bit before running this cell until you see output appear in the previous cell
ssc_t.stop()

In [None]:
data = sc.textFile("file:///C:/Users/didie/OneDrive/Bureaublad/spark/TrainingData/TrainingData-*")
## Didier: ik heb problemen om de data op deze manier in te lezen
## FYI: ik lees data in in mapje trainingdata met alle bestanden met naam TrainingData-... - hiervoor dient asterix in commando zodat we alles in 1 keer kunnen doen

In [None]:
data.count()

In [None]:
data.first()

In [None]:
df = spark.read.json(data)

In [None]:
df.show()
display(df)

In [None]:
## DataFrame aanmaken via pandas
import pandas as pd
from IPython.display import display

df = pd.read_csv("file:///C:/Users/didie/OneDrive/Bureaublad/AA/all_data_extra.csv")
display(df)

In [4]:
## DataFrame aanmaken via Spark zodat MLIB Package werkt:
spark = SparkSession.builder.appName("new").getOrCreate()
df = (spark.read.format("csv").option('header', 'true').load("file:///C:/Users/didie/OneDrive/Bureaublad/AA/all_data_extra.csv"))

# PREDICTIVE MODEL A - USING MLIB





In [5]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import col, isnan, when, count

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="review_text", outputCol="text-label", pattern="\\W")

# stop words are based upon NLTK - https://towardsdatascience.com/text-pre-processing-stop-words-removal-using-different-libraries-f20bac19929a
add_stopwords = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're", "you've", "you'll", "you'd", 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', "she's", 'her', 'hers', 'herself', 'it', "it's", 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', "that'll", 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', "don't", 'should', "should've", 'now', 'd', 'll', 'm', 'o', 're', 've', 'y', 'ain', 'aren', "aren't", 'couldn', "couldn't", 'didn', "didn't", 'doesn', "doesn't", 'hadn', "hadn't", 'hasn', "hasn't", 'haven', "haven't", 'isn', "isn't", 'ma', 'mightn', "mightn't", 'mustn', "mustn't", 'needn', "needn't", 'shan', "shan't", 'shouldn', "shouldn't", 'wasn', "wasn't", 'weren', "weren't", 'won', "won't", 'wouldn', "wouldn't"] 
stopwordsRemover = StopWordsRemover(inputCol="text-label", outputCol="text-cleaned").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="text-cleaned", outputCol="features", vocabSize=10000, minDF=5)

# IDF 
idf = IDF(inputCol="features", outputCol="featuresIDF")

In [6]:
df.select("review_text").na.drop().show(5)

+--------------------+
|         review_text|
+--------------------+
|10000000000000000...|
|A Raft style surv...|
| resources might ...|
| and then it wasn...|
| access your craf...|
+--------------------+
only showing top 5 rows



In [7]:
df.select([count(when(isnan(col("review_text")) | col("review_text").isNull(), True))]).show()
df = df.na.drop()

+----------------------------------------------------------------------------+
|count(CASE WHEN (isnan(review_text) OR (review_text IS NULL)) THEN true END)|
+----------------------------------------------------------------------------+
|                                                                        1682|
+----------------------------------------------------------------------------+



In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

label_stringIdx = StringIndexer(inputCol = "label", outputCol = "label_code", handleInvalid="keep")

assembler = VectorAssembler(inputCols=["featuresIDF"], outputCol="features_combined")

logreg = LogisticRegression(maxIter=10, regParam=0.2, elasticNetParam=0, labelCol="label_code", featuresCol="featuresIDF")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, idf, label_stringIdx, logreg, assembler])

In [9]:
train, test = df.randomSplit([0.7, 0.3], seed=42)

In [10]:
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(train)
predictions = pipelineFit.transform(test)
predictions.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+--------------------+
|           review_id|              app_id|         review_text|               label|          text-label|        text-cleaned|            features|         featuresIDF|label_code|       rawPrediction|         probability|prediction|   features_combined|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+--------------------+
|(Don't let your m...| asking if you're...|      I am winning)"|                   1|    [i, am, winning]|           [winning]|         (418,[],[])|         (418,[],[])|       0.0|[5.85514010630536...|[0.46021023950044...|       0.0|  

In [11]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label_code", predictionCol="prediction")
evaluator.evaluate(predictions)

0.3693371517384297

In [12]:
eval_accuracy = MulticlassClassificationEvaluator(labelCol="label_code", predictionCol="prediction", metricName="accuracy")
eval_precision = MulticlassClassificationEvaluator(labelCol="label_code", predictionCol="prediction", metricName="weightedPrecision")
eval_recall = MulticlassClassificationEvaluator(labelCol="label_code", predictionCol="prediction", metricName="weightedRecall")
eval_f1 = MulticlassClassificationEvaluator(labelCol="label_code", predictionCol="prediction", metricName="f1")

In [13]:
accuracy = eval_accuracy.evaluate(predictions)
precision = eval_precision.evaluate(predictions)
recall = eval_recall.evaluate(predictions)
f1score = eval_f1.evaluate(predictions)

In [14]:
accuracy

0.5145348837209303

In [15]:
precision

0.33850049838421936

In [16]:
recall

0.5145348837209303

In [17]:
f1score

0.3693371517384297

# MODEL PREDICTION

In [18]:
from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [19]:
sc

In [20]:
spark

In [21]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType

In [22]:
    # we define 'process' which is actually the link between the offline model we make & online live streaming
    # we load in our model A into the globals [my model]
    # and use this model A to make predictions
    # and via 'process' we try to get it online again

In [23]:
globals()['models_loaded'] = False
globals()['my_model'] = None

# Toy predict function that returns a random probability. Normally you'd use your loaded globals()['my_model'] here
def predict(df):
    return globals()['my_model'].transform(df)

predict_udf = udf(predict, StringType())

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    # Utilize our predict function
    #df_withpreds = df.withColumn("pred", predict_udf(
    #    struct([df[x] for x in df.columns])
    #))
    #df_withpreds.show()
    
    # Normally, you wouldn't use a UDF (User Defined Function) Python function to predict as we did here (you can)
    # but an MLlib model you've built and saved with Spark
    # In this case, you need to prevent loading your model in every call to "process" as follows:
    
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = pipelineFit.load("lrm_model.model")
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model: 
    dataset_result = globals()['my_model'].transform(df)
    dataset_result.select('review_id', 'review_text', 'label_code', 'prediction').show()

In [24]:
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 10)
lines = ssc.socketTextStream("seppe.net", 7778)

In [25]:
### earlier, we defined 'process' which is actually the link between the offline model we make & online live streaming

lines.foreachRDD(process)

In [26]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+------+-----+---------+--------------------+
|app_id|label|review_id|         review_text|
+------+-----+---------+--------------------+
|824600|    1|139152723|A very fun and fu...|
|824600|    1|139152686|Easily the best b...|
|824600|    1|139152124|very well made sh...|
+------+-----+---------+--------------------+

+---------+--------------------+----------+----------+
|review_id|         review_text|label_code|prediction|
+---------+--------------------+----------+----------+
|139152723|A very fun and fu...|       0.0|       0.0|
|139152686|Easily the best b...|       0.0|       0.0|
|139152124|very well made sh...|       0.0|       0.0|
+---------+--------------------+----------+----------+

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
| 705040|    0|139152760|#TLDR:   Hawken: ...|
| 705040|    1|139148742|Ignore the idiots...|
|1268750|    1|139153706|i love the game i...|
|1268750|

In [None]:
ssc_t.stop()