In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

In [2]:
from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [3]:
sc

In [4]:
spark

In [5]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.sql.types import StructType, StructField, StringType, LongType
import pyspark.sql.functions as f
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import ltrim
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer, StringIndexer, IndexToString

In [6]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import lit # for the dummy _transform
import pyspark.sql.functions as f
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import ltrim
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd

class RegexReplacerWritable(
    Transformer, DefaultParamsReadable, DefaultParamsWritable,
):
    #value = Param(
    #   Params._dummy(),
    #   "value",
    #   "value to fill",
    #)

    @keyword_only
    def __init__(self):
        super(RegexReplacerWritable, self).__init__()
        kwargs = self._input_kwargs
        self._set(**kwargs)

    @keyword_only
    def setParams(self):
        """
        setParams(self, outputCols=None, value=0.0)
        Sets params for this RegexReplacerWritable.
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, df: DataFrame) -> DataFrame:
        #Converting all letters to lowercase
        df = df.withColumn("tweet_text",f.lower(f.col("tweet_text")))
        #removing punctuations, numbers, http and spaces
        df = df.withColumn("tweet_text",f.regexp_replace(f.col("tweet_text"),'([^ a-zA-Z\'])',''))
        df = df.withColumn("tweet_text",f.regexp_replace(f.col("tweet_text"),'http.*?\\b',' '))
        df = df.withColumn("tweet_text",f.ltrim(f.regexp_replace(f.col("tweet_text"),'[\r\n\t\f\v ]+', ' ')))
        return df

In [7]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import lit # for the dummy _transform
import pyspark.sql.functions as f
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import ltrim
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd



class UDShortWordsRemover(
    Transformer, DefaultParamsReadable, DefaultParamsWritable,
):
    @keyword_only
    def __init__(self):
        super(UDShortWordsRemover, self).__init__()
        kwargs = self._input_kwargs
        self._set(**kwargs)

    @keyword_only
    def setParams(self):
        """
        setParams(self, outputCols=None, value=0.0)
        Sets params for this RegexReplacerWritable.
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, df: DataFrame) -> DataFrame:
        
        df = df.withColumn("filtered2", f.expr("filter(filtered, x -> not(length(x) < 3))")).where(f.size(f.col("filtered2")) > 0).drop("filtered")
        return df

In [8]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import lit # for the dummy _transform
import pyspark.sql.functions as f
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import ltrim
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd
from nltk.stem import WordNetLemmatizer



class UDLemmatization(
    Transformer, DefaultParamsReadable, DefaultParamsWritable,
):
    @keyword_only
    def __init__(self):
        super(UDLemmatization, self).__init__()
        kwargs = self._input_kwargs
        self._set(**kwargs)

    @keyword_only
    def setParams(self):
        """
        setParams(self, outputCols=None, value=0.0)
        Sets params for this RegexReplacerWritable.
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, df: DataFrame) -> DataFrame:
        
        lemmatizer = WordNetLemmatizer() 
        pandas_df = df.select("*").toPandas()
        pandas_df['lemmatized'] = pandas_df['words'].apply(
                    lambda lst:[lemmatizer.lemmatize(word) for word in lst])
        pandas_df['lemmatized']
        df = spark.createDataFrame(pandas_df)
        return df

In [9]:
globals()['models_loaded'] = True
globals()['my_model'] = PipelineModel.load("lr_model")

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    # Utilize our predict function
    #df_withpreds = df.withColumn("pred", predict_udf(
     #  struct([df[x] for x in df.columns])
    #))
    #df_withpreds.show()
    
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = '***' # Replace '***' with:    [...].load('my_logistic_regression')
        globals()['models_loaded'] = True
    
    prediction = globals()['my_model'].transform(df)
    
    ind_str = IndexToString(inputCol='prediction',outputCol='labelPredicted',labels=globals()['my_model'].stages[7].labels)
    prediction_text = ind_str.transform(prediction)
    selected = prediction_text.select("labelOriginal", "labelPredicted")
    for row in selected.collect():
        print(row)

In [10]:
ssc = StreamingContext(sc, 10)

In [11]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [12]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+------+-------------------+--------------------+
| label|           tweet_id|          tweet_text|
+------+-------------------+--------------------+
|#biden|1398604868047228931|#███████ #███████...|
+------+-------------------+--------------------+

Row(labelOriginal='#biden', labelPredicted='#biden')
+--------+-------------------+--------------------+
|   label|           tweet_id|          tweet_text|
+--------+-------------------+--------------------+
|#vaccine|1398605056929374208|@MWM76 For social...|
+--------+-------------------+--------------------+

Row(labelOriginal='#vaccine', labelPredicted='#vaccine')
+--------+-------------------+--------------------+
|   label|           tweet_id|          tweet_text|
+--------+-------------------+--------------------+
|#vaccine|1398604983486926850|@VicGovDH @ScottM...|
+--------+-------------------+--------------------+

Row(labelOriginal='#vaccine', labelPredicted='#vaccine')
+--------+-------------------+--------------------+
|   labe

Row(labelOriginal='#biden', labelPredicted='#biden')
+------+-------------------+--------------------+
| label|           tweet_id|          tweet_text|
+------+-------------------+--------------------+
|#china|1398607877271691264|DM us for timely ...|
+------+-------------------+--------------------+

Row(labelOriginal='#china', labelPredicted='#covid')


In [13]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
+------+-------------------+--------------------+
| label|           tweet_id|          tweet_text|
+------+-------------------+--------------------+
|#china|1398607783830994948|Did Covid leak fr...|
+------+-------------------+--------------------+

Row(labelOriginal='#china', labelPredicted='#vaccine')
