In [59]:
import sys
import glob
import json
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pyspark.sql.functions as f
from pyspark.streaming import StreamingContext
from pyspark.ml.feature import StopWordsRemover
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer, StringIndexer
from pyspark.ml.classification import LogisticRegression

In [60]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

sc
spark

In [61]:
df = spark.read.json('..\\data\\dump.json')
df.printSchema()

root
 |-- label: string (nullable = true)
 |-- tweet_id: long (nullable = true)
 |-- tweet_text: string (nullable = true)



In [62]:
type(df)

pyspark.sql.dataframe.DataFrame

In [63]:
df.show()

+--------------+-------------------+--------------------+
|         label|           tweet_id|          tweet_text|
+--------------+-------------------+--------------------+
|        #covid|1376807677934854152|Talking to my fri...|
|        #covid|1376807674843701256|Sufficient doses ...|
|        #covid|1376807578898993152|Is Dubai open for...|
|        #covid|1376807577879728128|Register now Econ...|
|        #biden|1376807944138977280|Fears over potent...|
|        #biden|1376806460772995072|court case agains...|
|        #biden|1376806405261365248|Journalists conti...|
|        #biden|1376806257445732352|Any tips on wind ...|
|        #biden|1376805813377921026|GOLD STOCKS: Spot...|
|    #inflation|1376806399930327042|Singapore’s core ...|
|    #inflation|1376806224218320899|US Dollar or Keer...|
|    #inflation|1376806050477830144|Re-#███████ resum...|
|    #inflation|1376805646742417413|#███████: IndiaEs...|
|    #inflation|1376803909038051332|@Pri_Kishore, Hea...|
|    #inflatio

In [64]:
df.show(2,truncate= True)

+------+-------------------+--------------------+
| label|           tweet_id|          tweet_text|
+------+-------------------+--------------------+
|#covid|1376807677934854152|Talking to my fri...|
|#covid|1376807674843701256|Sufficient doses ...|
+------+-------------------+--------------------+
only showing top 2 rows



In [65]:
print(df.count())

207993


In [66]:
df.filter(df.tweet_id == 1376807677934854152).show()

+------+-------------------+--------------------+
| label|           tweet_id|          tweet_text|
+------+-------------------+--------------------+
|#covid|1376807677934854152|Talking to my fri...|
+------+-------------------+--------------------+



In [67]:
df.filter(df.tweet_id == 1376807677934854152).show()

+------+-------------------+--------------------+
| label|           tweet_id|          tweet_text|
+------+-------------------+--------------------+
|#covid|1376807677934854152|Talking to my fri...|
+------+-------------------+--------------------+



In [68]:
df.describe().show()

+-------+--------+--------------------+--------------------+
|summary|   label|            tweet_id|          tweet_text|
+-------+--------+--------------------+--------------------+
|  count|  207993|              207993|              207993|
|   mean|    null|1.387666180356198...|                null|
| stddev|    null|  6.1606223029364E15|                null|
|    min|  #biden| 1376803909038051332|!   #███████ Pled...|
|    max|#vaccine| 1398244775682248709|🪡 Thread produce...|
+-------+--------+--------------------+--------------------+



In [69]:
df.select('tweet_text').show(5, truncate = False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tweet_text                                                                                                                                                                                                                                                                                                         |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Talking to my friend she said that you can get covid from someone new

## cleaning data 

In [70]:
df.registerTempTable('table')
sqlCtx.sql(
    'SELECT label, COUNT(label) AS COUNT, ROUND(COUNT(label)/207993,3) AS PERCENTAGE       FROM table         GROUP BY label         order by COUNT desc'
).show()

+--------------+-----+----------+
|         label|COUNT|PERCENTAGE|
+--------------+-----+----------+
|        #covid|54391|     0.262|
|      #vaccine|47681|     0.229|
|        #china|45402|     0.218|
|        #biden|31891|     0.153|
|#stopasianhate|21501|     0.103|
|    #inflation| 7127|     0.034|
+--------------+-----+----------+



## duplicates

In [71]:
df.registerTempTable('table')
sqlCtx.sql(
    'SELECT count( distinct tweet_id),  count(  tweet_id)      FROM table        '
).show()

+------------------------+---------------+
|count(DISTINCT tweet_id)|count(tweet_id)|
+------------------------+---------------+
|                  207993|         207993|
+------------------------+---------------+



## prepocessing data

In [72]:
# lowering letters


df = df.withColumn("tweet_text",f.lower(f.col("tweet_text")))
df.show()

+--------------+-------------------+--------------------+
|         label|           tweet_id|          tweet_text|
+--------------+-------------------+--------------------+
|        #covid|1376807677934854152|talking to my fri...|
|        #covid|1376807674843701256|sufficient doses ...|
|        #covid|1376807578898993152|is dubai open for...|
|        #covid|1376807577879728128|register now econ...|
|        #biden|1376807944138977280|fears over potent...|
|        #biden|1376806460772995072|court case agains...|
|        #biden|1376806405261365248|journalists conti...|
|        #biden|1376806257445732352|any tips on wind ...|
|        #biden|1376805813377921026|gold stocks: spot...|
|    #inflation|1376806399930327042|singapore’s core ...|
|    #inflation|1376806224218320899|us dollar or keer...|
|    #inflation|1376806050477830144|re-#███████ resum...|
|    #inflation|1376805646742417413|#███████: indiaes...|
|    #inflation|1376803909038051332|@pri_kishore, hea...|
|    #inflatio

## length of the strings 

In [73]:
temp = df.withColumn("tweet_leng",f.length(f.col("tweet_text")))
temp.select('tweet_leng').sort(temp.tweet_leng.desc()).show(3, truncate=False) 
# minimum length is 13 
# max length is  978       

+----------+
|tweet_leng|
+----------+
|978       |
|961       |
|960       |
+----------+
only showing top 3 rows



In [74]:
temp.select('tweet_text').filter(temp.tweet_leng == 13).show(11, truncate=False)

+-------------+
|tweet_text   |
+-------------+
|stop #███████|
|#███████ done|
|also #███████|
|#███████ scam|
|also #███████|
|fuck #███████|
|fuck #███████|
|test #███████|
|#███████ test|
|fuck #███████|
|fuck #███████|
+-------------+



In [75]:
temp.select('tweet_text').filter(temp.tweet_leng == 978).show(11, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tweet_text        

## clean the sentences from punctuation

In [76]:
# lowering letters
#import pyspark.sql.functions as f
df = df.withColumn("tweet_text",f.regexp_replace("tweet_text", r',|\.|&|$|%|\\|\||-|_', ''))
df.show(2, truncate = False)

+------+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label |tweet_id           |tweet_text                                                                                                                                                                                                                                                                                             |
+------+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|#covid|13768076779348541

In [77]:
# https://subscription.packtpub.com/book/big_data_and_business_intelligence/9781788474221/7/ch07lvl1sec63/removing-stop-words-from-the-text

In [78]:
## the most frequent word

## word2vec
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Word2Vec.html
# https://spark.apache.org/docs/latest/ml-features

## split into train & test

In [79]:
train, test = df.randomSplit([0.7, 0.3], 1234) # 70% of train set
print(train.count())
print(test.count())

145524
62469


## pipeline with word2vec
## model logistic regression

In [80]:
# https://github.com/hanhanwu/Hanhan-Spark-Python/blob/master/RandomForests.py

## training a model

In [81]:
from pyspark.ml import Pipeline

In [82]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
stage_0 = StringIndexer(inputCol="label", outputCol="label_encoded")
stage_1 = RegexTokenizer(inputCol= 'tweet_text' , outputCol= 'tokens', pattern= '\\W')
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_stop_words')
word2Vec = Word2Vec(inputCol= 'filtered_stop_words', outputCol= 'vector', vectorSize= 500)
modelLR = LogisticRegression(featuresCol="vector", labelCol="label_encoded", maxIter=10000, regParam=0.05, elasticNetParam=0.05)

pipeline = Pipeline(stages=[stage_0,stage_1, stage_2, word2Vec, modelLR])

# Fit the pipeline to training documents.
model = pipeline.fit(train)

## testing a model

In [83]:
prediction = model.transform(test)
prediction.select('tweet_id','label', "label_encoded","rawPrediction","probability","prediction").show(5)

+-------------------+------+-------------+--------------------+--------------------+----------+
|           tweet_id| label|label_encoded|       rawPrediction|         probability|prediction|
+-------------------+------+-------------+--------------------+--------------------+----------+
|1376805813377921026|#biden|          3.0|[-0.7090734024493...|[0.03156344415520...|       2.0|
|1376806257445732352|#biden|          3.0|[0.31908328049681...|[0.12084199037427...|       2.0|
|1376807944138977280|#biden|          3.0|[0.36246240428153...|[0.23828668756625...|       3.0|
|1377203682635350016|#biden|          3.0|[-0.2180711526915...|[0.09051680446793...|       3.0|
|1377204875335979008|#biden|          3.0|[-0.5928024717694...|[0.07086371284188...|       2.0|
+-------------------+------+-------------+--------------------+--------------------+----------+
only showing top 5 rows



In [84]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label_encoded", predictionCol="prediction") 

accuracy = evaluator.evaluate(prediction, {evaluator.metricName: "accuracy" })
print("accuracy = %g " % (accuracy))

testError = evaluator.evaluate(prediction, {evaluator.metricName: "accuracy" })
print("Test Error = %g " % (1.0 - testError))

truPosit = evaluator.evaluate(prediction, {evaluator.metricName: "truePositiveRateByLabel"})
print("truPosit = %g " % ( truPosit))


falPosit = evaluator.evaluate(prediction, {evaluator.metricName: "falsePositiveRateByLabel"})
print("falPosit = %g " % ( falPosit))


precisionByLabel = evaluator.evaluate(prediction, {evaluator.metricName: "precisionByLabel"})
print("precisionByLabel = %g " % ( precisionByLabel))


accuracy = 0.634619 
Test Error = 0.365381 
truPosit = 0.669881 
falPosit = 0.16327 
precisionByLabel = 0.597442 


In [86]:
## roc?

## archieved results 

In [340]:
# accuracy 0.2595239700627065 
# maxIter=1 000

# accuracy 0.2595239700627065
# maxIter=1 000

# accuracy 0.25973035375608144
# maxIter= 10 000

#0.25973035375608144
#10 000


word2Vec = Word2Vec(inputCol= 'filtered_stop_words', outputCol= 'vector', vectorSize= 300)
model_w2v = word2Vec.fit(stopwordsData)
Accuracy = 0.5644635936340398
modelLR = LogisticRegression(featuresCol="vector", labelCol="label_encoded", maxIter=100000, regParam=0.2, elasticNetParam=0.2)

SyntaxError: invalid syntax (<ipython-input-340-e28a9c5d362f>, line 11)

In [None]:
# define stage 3: create a word vector 
word2Vec = Word2Vec(inputCol= 'filtered_stop_words', outputCol= 'vector', vectorSize= 500)
model_w2v = word2Vec.fit(stopwordsData)
# define stage 4: Logistic Regression Model
modelLR = LogisticRegression(featuresCol="vector", labelCol="label_encoded", maxIter=100000, regParam=0.2, elasticNetParam=0.2)
modelLR = modelLR.fit(train)
Accuracy = 0.5716514114510322

In [None]:
# define stage 3: create a word vector of the size 100
word2Vec = Word2Vec(inputCol= 'filtered_stop_words', outputCol= 'vector', vectorSize= 500)
model_w2v = word2Vec.fit(stopwordsData)
# define stage 4: Logistic Regression Model
modelLR = LogisticRegression(featuresCol="vector", labelCol="label_encoded", maxIter=10000, regParam=0.1, elasticNetParam=0.1)
modelLR = modelLR.fit(train)
Accuracy = 0.6250240510156401
totalIterations:248
    