# Connect to Spark and Collect Tweets

In [2]:
from pyspark import SparkContext
sc=SparkContext("local[*]","PySparkShell")
sc

In [3]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('PySparkShell').getOrCreate()
spark

In [36]:
from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [37]:
from pyspark.streaming import StreamingContext

In [33]:
ssc = StreamingContext(sc, 2)

Save the tweets on local folder

In [34]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.saveAsTextFiles("/home/lukabeverin/Documents/Leuven/Second Semester/Advanced Analytics/Assignment 3/TextData/SavedTexts")

In [35]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

In [36]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----


In [7]:
data=sc.textFile("/home/lukabeverin/Documents/Leuven/Second Semester/Advanced Analytics/Assignment 3/TextData/SavedTexts-*")

In [8]:
data.first()

'{"tweet_id": 1392894115793326090, "tweet_text": "@PeterSchiff @qryptoo Are you serious right now? Let me get this straight if your kid tumbles while making his first baby steps you shoot him? #\\u2588\\u2588\\u2588\\u2588\\u2588\\u2588\\u2588 #\\u2588\\u2588\\u2588\\u2588\\u2588\\u2588\\u2588 #\\u2588\\u2588\\u2588\\u2588\\u2588\\u2588\\u2588 #\\u2588\\u2588\\u2588\\u2588\\u2588\\u2588\\u2588 #\\u2588\\u2588\\u2588\\u2588\\u2588\\u2588\\u2588iscoming https://t.co/uds83arn6X", "label": "#inflation"}'

In [9]:
df=spark.read.json(data)

In [10]:
df.show()

+--------------+-------------------+--------------------+
|         label|           tweet_id|          tweet_text|
+--------------+-------------------+--------------------+
|    #inflation|1392894115793326090|@PeterSchiff @qry...|
|      #vaccine|1392894443426963456|@ReallySwara You ...|
|      #vaccine|1392894360635707398|PARENTS - What to...|
|    #inflation|1392894738995548166|Odds of a lower c...|
|    #inflation|1392894448758108160|Strongest #██████...|
|        #biden|1392894964850331648|Brad and Britt Ca...|
|        #biden|1392894942943580169|"#███████ #██████...|
|        #biden|1392894911700144134|⏰Running OUT⌛️ #█...|
|    #inflation|1392895011964997633|#███████ ground s...|
|    #inflation|1392895003618385922|#███████ #███████...|
|    #inflation|1392895225111011329|We all can use so...|
|    #inflation|1392895201409175563|#███████ ground s...|
|        #covid|1392895857033453571|@TomSwarbrick1 if...|
|        #covid|1392895823206359047|Oxygen cylinders ...|
|        #covi

In [11]:
data = df

In [12]:
drop_list = ['tweet_id']
data = data.select([column for column in data.columns if column not in drop_list])

# Data Cleaning

In [15]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.ml.feature import HashingTF, IDF

In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [18]:
# regular expression tokenizer
#A regex based tokenizer that extracts tokens either by using the provided regex pattern 
#to split the text (default) or repeatedly matching the regex (if gaps is false). 
#Optional parameters also allow filtering tokens using a minimal length.
#It returns an array of strings that can be empty.


#still looking how to remove all mention @
#data = data.withColumn('tweet_text', regexp_replace('tweet_text', '/ @*/', ''))

regexTokenizer = RegexTokenizer(inputCol="tweet_text", outputCol="words", pattern="\\W")# stop words



#add custom stop words we want to remove
add_stopwords = ["http","https","amp","rt","t","c","the"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)# bag of words count

#countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)


label_stringIdx = StringIndexer(inputCol = "label", outputCol = "target")

In [21]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF,idf, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(1)

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------+
|     label|          tweet_text|               words|            filtered|         rawFeatures|            features|target|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------+
|#inflation|@PeterSchiff @qry...|[peterschiff, qry...|[peterschiff, qry...|(10000,[125,763,1...|(10000,[125,763,1...|   5.0|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------+
only showing top 1 row



# Modelling

Tfidf + Logistic Regression

In [22]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 300
Test Dataset Count: 105


possible parameters: maxIter=20, regParam=0.3, elasticNetParam=0

In [23]:
lr = LogisticRegression(labelCol="target",family = "multinomial", featuresCol="features")

lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)

In [24]:
predictions.filter(predictions['prediction'] == 0) \
    .select("tweet_text","label","probability","target","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+------+----------+
|                    tweet_text|   label|                   probability|target|prediction|
+------------------------------+--------+------------------------------+------+----------+
|.@g20org @POTUS @SecBlinken...|  #covid|[1.0,4.1961085260023297E-23...|   2.0|       0.0|
|LET'S GO T-SHIRT ... Buy No...|  #china|[1.0,3.1776117061691716E-26...|   3.0|       0.0|
|Discuss the status of avail...|  #covid|[1.0,9.551039322298131E-27,...|   2.0|       0.0|
|Ya boy is going for his fir...|#vaccine|[1.0,4.013308477752283E-31,...|   0.0|       0.0|
|Round 1 of Covid vaccine do...|#vaccine|[1.0,1.998305810272253E-33,...|   0.0|       0.0|
|That's the way to go for an...|#vaccine|[1.0,1.8687542184506683E-33...|   0.0|       0.0|
|It’s a beautiful day AND ha...|#vaccine|[1.0,3.1127819257209254E-35...|   0.0|       0.0|
|I think the people who were...|#vaccine|[1.0,9.340228168335533E-40,...|   0.0|       0.0|

In [25]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [26]:
evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))

Accuracy = 0.342857


# Saving and loading the model

In [27]:
from pyspark.ml import Pipeline, PipelineModel

In [28]:
lrModel.write().overwrite().save("/home/lukabeverin/Documents/Leuven/Second Semester/Advanced Analytics/Assignment 3/Model")

In [29]:
reloaded_model = lrModel.load("/home/lukabeverin/Documents/Leuven/Second Semester/Advanced Analytics/Assignment 3/Model")

In [30]:
reloaded_model

LogisticRegressionModel: uid=LogisticRegression_31778765de73, numClasses=6, numFeatures=10000

# Make Predictions on Incoming Data

In [31]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType

In [32]:
globals()['models_loaded'] = False
globals()['my_model'] = None

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    #transform new incoming data
    dataset = pipelineFit.transform(df)

    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = reloaded_model
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model: 
    df_result = globals()['my_model'].transform(dataset)
    df_result = df_result.select("label","tweet_id","tweet_text","target","prediction")
    
    
    df_result.show()

In [33]:
ssc = StreamingContext(sc, 10)

In [34]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [38]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+--------+-------------------+--------------------+
|   label|           tweet_id|          tweet_text|
+--------+-------------------+--------------------+
|#vaccine|1396131732777316363|#███████ passport...|
+--------+-------------------+--------------------+

+--------+-------------------+--------------------+------+----------+
|   label|           tweet_id|          tweet_text|target|prediction|
+--------+-------------------+--------------------+------+----------+
|#vaccine|1396131732777316363|#███████ passport...|   0.0|       4.0|
+--------+-------------------+--------------------+------+----------+

+--------+-------------------+--------------------+
|   label|           tweet_id|          tweet_text|
+--------+-------------------+--------------------+
|#vaccine|1396131648882630659|Why would you get...|
+--------+-------------------+--------------------+

+--------+-------------------+--------------------+------+----------+
|   label|           tweet_id|          tweet_text|target

In [39]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
+------+-------------------+--------------------+
| label|           tweet_id|          tweet_text|
+------+-------------------+--------------------+
|#covid|1396133109456834561|On a different no...|
+------+-------------------+--------------------+

+------+-------------------+--------------------+------+----------+
| label|           tweet_id|          tweet_text|target|prediction|
+------+-------------------+--------------------+------+----------+
|#covid|1396133109456834561|On a different no...|   2.0|       2.0|
+------+-------------------+--------------------+------+----------+



## Challenges

- Convert target and prediction column back into label so it is easier to visualise the performance of the model.
- Get more data for training the model
- Struggled to preproccess data in Spark ML as effectively as in Pandas(DataFrame)
- There are cons to bow and tfidf
- Explore word2vec embeddings in the future
- CNN in spark ML is challenging
- Fantastic skeleton code provided by the Prof
- Deploy on Github will be challenging
- Could try cv or gridsearch to select best model parameters


## Discussion

- The focus of this assignment is on getting the full pipeline as outlined above constructed, and not on getting spectacularly high accuracies, though TF-IDF, ... might be interesting to apply. We have accomplished this task.
- Preferably, your predictive model needs to be build using MLlib (so read documentation and tutorials). Our entire pipeline was built in Spark ML. Sklearn was used to explore the data, get a better understanding of the techniques and to validate models.
- Streaming server did not crash