In [1]:
import sparknlp
sparknlp.start()

from sparknlp.base import *
from sparknlp.annotator import *

from pyspark.ml import Pipeline
from sparknlp.pretrained import PretrainedPipeline
from pyspark.sql import SparkSession


# Python imports
import sys

# spark-nlp components. Each one is incorporated into our pipeline.
from sparknlp.annotator import Lemmatizer, Stemmer, Tokenizer, Normalizer
from sparknlp.base import DocumentAssembler, Finisher

# A Spark Session is how we interact with Spark SQL to create Dataframes
from pyspark.sql import SparkSession

# These allow us to create a schema for our data
from pyspark.sql.types import StructField, StructType, StringType, LongType

# Spark Pipelines allow us to sequentially add components such as transformers
from pyspark.ml import Pipeline

# These are components we will incorporate into our pipeline.
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

# LDA is our model of choice for topic modeling
from pyspark.ml.clustering import LDA

# Some transformers require the usage of other Spark ML functions. We import them here
from pyspark.sql.functions import col, lit, concat

# This will help catch some PySpark errors
from pyspark.sql.utils import AnalysisException

from pyspark.ml.feature import StringIndexer

In [2]:
spark = SparkSession.builder.appName('Spark-model').getOrCreate()

In [3]:
df = spark.read.option("multiline", True).option("quote", "\"").option("escape", "\"").csv("gs://bigdata_project_hksv/labeled_tweets/Constraint_Train.csv", inferSchema=True, header=True)



                                                                                

In [4]:
df.show(10)

+---+--------------------+-----+
| id|               tweet|label|
+---+--------------------+-----+
|  1|The CDC currently...| real|
|  2|States reported 1...| real|
|  3|Politically Corre...| fake|
|  4|#IndiaFightsCoron...| real|
|  5|Populous states c...| real|
|  6|Covid Act Now fou...| real|
|  7|If you tested pos...| real|
|  8|Obama Calls Trump...| fake|
|  9|???Clearly, the O...| fake|
| 10|Retraction—Hydrox...| fake|
+---+--------------------+-----+
only showing top 10 rows



                                                                                

In [5]:
from pyspark.sql.functions import regexp_replace, col


df1 = df.dropna(thresh=1, subset=('tweet'))
df1 = df1.dropna(thresh=1, subset=('label'))


df2 = df1.withColumn('tweet1',regexp_replace(col('tweet'), '@', ''))
df2 = df2.withColumn('tweet2',regexp_replace(col('tweet1'), '#', ''))
df2 = df2.withColumn('tweet2',regexp_replace(col('tweet2'), 'RT', ''))
df2 = df2.withColumn('tweet2',regexp_replace(col('tweet2'), ':', ''))
df2 = df2.withColumn('tweet2',regexp_replace(col('tweet2'), 'http\S+', ''))

df2 = df2.select('tweet2', 'label')



document_assembler = DocumentAssembler().setInputCol("tweet2").setOutputCol("document")


tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")

# The Stemmer takes objects of class "Token" and converts the words into their
# root meaning. For instance, the words "cars", "cars'" and "car's" would all be replaced
# with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")

# The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
# components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")

# Stopwords are common words that generally don't add much detail to the meaning
# of a body of text. In English, these are mostly "articles" such as the words "the"
# and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")

# Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
# of the vocabulary that's being created so we can map our topics back to their
# corresponding words.
# TF (term frequency) creates a matrix that counts how many times each word in the
# vocabulary appears in each body of text. This then gives each word a weight based
# on its frequency.
# tf = CountVectorizer(inputCol="filtered", outputCol="raw_features",vocabSize=1000)

# # Here we implement the IDF portion. IDF (Inverse document frequency) reduces
# # the weights of commonly-appearing words.
# idf = IDF(inputCol="raw_features", outputCol="features")

countVectors  = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=1000, minDF=5)


indexer = StringIndexer(inputCol="label", outputCol="label1")

pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
#         tf,
#         idf,
        countVectors,
        indexer
    ]
)
train_data = pipeline.fit(df2).transform(df2)

                                                                                

In [6]:
train_data.show(10)

+--------------------+-----+--------------------+--------------------+--------------------+------+
|              tweet2|label|            to_spark|            filtered|            features|label1|
+--------------------+-----+--------------------+--------------------+--------------------+------+
|The CDC currently...| real|[the, cdc, curren...|[cdc, current, re...|(1000,[4,6,9,11,2...|   0.0|
|States reported 1...| real|[state, report, d...|[state, report, d...|(1000,[6,7,9,53,2...|   0.0|
|Politically Corre...| fake|[polit, correct, ...|[polit, correct, ...|(1000,[3,30,34,37...|   1.0|
|IndiaFightsCorona...| real|[indiafightscoron...|[indiafightscoron...|(1000,[0,2,17,29,...|   0.0|
|Populous states c...| real|[popul, state, ca...|[popul, state, ge...|(1000,[1,4,5,7,21...|   0.0|
|Covid Act Now fou...| real|[covid, act, now,...|[covid, act, foun...|(1000,[0,8,10,11,...|   0.0|
|If you tested pos...| real|[if, you, test, p...|[test, posit, cov...|(1000,[0,2,11,28,...|   0.0|
|Obama Cal

## Test data

In [7]:
test_df = spark.read.option("multiline", True).option("quote", "\"").option("escape", "\"").csv("gs://bigdata_project_hksv/labeled_tweets/english_test_with_labels.csv", inferSchema=True, header=True)


test_df.show(100)


[Stage 13:>                                                         (0 + 1) / 1]

+---+--------------------+-----+
| id|               tweet|label|
+---+--------------------+-----+
|  1|Our daily update ...| real|
|  2|Alfalfa is the on...| fake|
|  3|President Trump A...| fake|
|  4|States reported 6...| real|
|  5|This is the sixth...| real|
|  6|Low #vitaminD was...| real|
|  7|A common question...| real|
|  8|The government sh...| real|
|  9|Our daily update ...| real|
| 10|Breakdown of test...| real|
| 11|Two interesting c...| fake|
| 12|A photo shows a 1...| fake|
| 13|🇰🇼 Assistant Un...| real|
| 14|An audio file by ...| fake|
| 15|Says the Coronavi...| fake|
| 16|Kids reach ‘f**k ...| fake|
| 17|Households should...| fake|
| 18|An image of a man...| fake|
| 19|While #socialdist...| real|
| 20|3/10 About 8% of ...| real|
| 21|Korona virus, ver...| fake|
| 22|A 2009 tweet from...| fake|
| 23|ICUs are full for...| real|
| 24|President Donald ...| fake|
| 25|New York continue...| real|
| 26|The government's ...| real|
| 27|Brazil's health m...| fake|
| 28|Govt ha

                                                                                

In [8]:


test_df1 = test_df.dropna(thresh=1, subset=('tweet'))
test_df1 = test_df1.dropna(thresh=1, subset=('label'))


test_df2 = test_df1.withColumn('tweet1',regexp_replace(col('tweet'), '@', ''))
test_df2 = test_df2.withColumn('tweet2',regexp_replace(col('tweet1'), '#', ''))
test_df2 = test_df2.withColumn('tweet2',regexp_replace(col('tweet2'), 'RT', ''))
test_df2 = test_df2.withColumn('tweet2',regexp_replace(col('tweet2'), ':', ''))
test_df2 = test_df2.withColumn('tweet2',regexp_replace(col('tweet2'), 'http\S+', ''))

test_df2 = test_df2.select('tweet2', 'label')
test_data = pipeline.fit(test_df2).transform(df2)

                                                                                

In [9]:
test_data.groupBy('label1').count().show()

+------+-----+
|label1|count|
+------+-----+
|   0.0| 3360|
|   1.0| 3060|
+------+-----+



## Model pipeline

In [10]:
#Logistic Regression Model
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
lr = LogisticRegression(maxIter=10, regParam=0.01, labelCol='label1')

lrm = lr.fit(train_data, )


21/12/02 23:19:17 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/12/02 23:19:17 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [11]:

predictions1 = lrm.transform(test_data)

#print evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="label1", predictionCol="prediction")

print(evaluator.evaluate(predictions1, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions1, {evaluator.metricName: "f1"}))

                                                                                

0.6361370716510903


[Stage 40:>                                                         (0 + 1) / 1]

0.635680592905639


                                                                                

In [12]:
#Hyperparameter tuning logistic regression
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0, labelCol='label1')
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
           .addGrid(lr.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)
cvModel = cv.fit(train_data)


predictions1 = cvModel.transform(test_data)

#print evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="label1", predictionCol="prediction")

print(evaluator.evaluate(predictions1, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions1, {evaluator.metricName: "f1"}))

                                                                                

0.6881619937694704


[Stage 3481:>                                                       (0 + 1) / 1]

0.688244110175366


                                                                                

In [13]:
#NaiveBayes
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1, labelCol="label1")
model = nb.fit(train_data)


                                                                                

In [14]:
predictions1 = model.transform(test_data)

#print evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="label1", predictionCol="prediction")

print(evaluator.evaluate(predictions1, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions1, {evaluator.metricName: "f1"}))

                                                                                

0.6121495327102804


[Stage 3488:>                                                       (0 + 1) / 1]

0.6039546507263915


                                                                                

In [15]:
#RandomForest
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label1", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(train_data)




                                                                                

In [16]:
predictions1 = rfModel.transform(test_data)

#print evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="label1", predictionCol="prediction")

print(evaluator.evaluate(predictions1, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions1, {evaluator.metricName: "f1"}))

                                                                                

0.6987538940809969


[Stage 3504:>                                                       (0 + 1) / 1]

0.6948183590210708


                                                                                

# Predicting on our tweet data

In [17]:
tweet_df = spark.read.option("multiline", True).option("quote", "\"").option("escape", "\"").csv("gs://bigdata_project_hksv/tweetData/2021-04_20211108-22.56.13.csv", inferSchema=True, header=True)


tweet_df.show(20)

+-------------------+----------+--------+-----+--------------+----------+--------------------+
|           tweet_id|     tdate|   ttime|tlang|tcountry_place|month_year|               tweet|
+-------------------+----------+--------+-----+--------------+----------+--------------------+
|1387861806589358087|2021-04-29|20:10:14|   en|          NULL|   2021-04|Thank God...let’s...|
|1387861807532896258|2021-04-29|20:10:14|   en|          NULL|   2021-04|👇💥👇💥👇OHH FFS...|
|1387861812369100800|2021-04-29|20:10:15|   en|          NULL|   2021-04|Significant news
...|
|1387861814399029251|2021-04-29|20:10:16|   en|          NULL|   2021-04|@fox12oregon Peop...|
|1387861820866809857|2021-04-29|20:10:17|   en|          NULL|   2021-04|Help out if you c...|
|1387861827992932356|2021-04-29|20:10:19|   en|          NULL|   2021-04|BioNTech to reque...|
|1387861829016334345|2021-04-29|20:10:19|   en|          NULL|   2021-04|#FordMustResign  ...|
|1387861832124227587|2021-04-29|20:10:20|   en|        

In [18]:
tweet_df1 = tweet_df.dropna(thresh=1, subset=('tweet'))
tweet_df1 = tweet_df1.dropna(thresh=1, subset=('tweet'))

tweet_df1.show(20)

+-------------------+----------+--------+-----+--------------+----------+--------------------+
|           tweet_id|     tdate|   ttime|tlang|tcountry_place|month_year|               tweet|
+-------------------+----------+--------+-----+--------------+----------+--------------------+
|1387861806589358087|2021-04-29|20:10:14|   en|          NULL|   2021-04|Thank God...let’s...|
|1387861807532896258|2021-04-29|20:10:14|   en|          NULL|   2021-04|👇💥👇💥👇OHH FFS...|
|1387861812369100800|2021-04-29|20:10:15|   en|          NULL|   2021-04|Significant news
...|
|1387861814399029251|2021-04-29|20:10:16|   en|          NULL|   2021-04|@fox12oregon Peop...|
|1387861820866809857|2021-04-29|20:10:17|   en|          NULL|   2021-04|Help out if you c...|
|1387861827992932356|2021-04-29|20:10:19|   en|          NULL|   2021-04|BioNTech to reque...|
|1387861829016334345|2021-04-29|20:10:19|   en|          NULL|   2021-04|#FordMustResign  ...|
|1387861832124227587|2021-04-29|20:10:20|   en|        

In [19]:
tweet_df1.select('tweet_id').count()

2235

In [20]:
tweet_df.select('tweet_id').count()

2600

In [21]:
tweet_df2 = tweet_df1.withColumn('tweet1',regexp_replace(col('tweet'), '@', ''))
tweet_df2 = tweet_df2.withColumn('tweet2',regexp_replace(col('tweet1'), '#', ''))
tweet_df2 = tweet_df2.withColumn('tweet2',regexp_replace(col('tweet2'), 'RT', ''))
tweet_df2 = tweet_df2.withColumn('tweet2',regexp_replace(col('tweet2'), ':', ''))
tweet_df2 = tweet_df2.withColumn('tweet2',regexp_replace(col('tweet2'), 'http\S+', ''))

tweet_df2 = tweet_df2.select('tweet_id', 'tweet2')

pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
#         tf,
#         idf,
        countVectors
    ]
)
tweet_dff = pipeline.fit(tweet_df2).transform(tweet_df2)

                                                                                

In [22]:
tweet_dff_classified = rfModel.transform(tweet_dff)



In [23]:
from pyspark.sql.functions import when

tweet_dff_classified = tweet_dff_classified.withColumn('prediction1',
                                when(tweet_dff_classified.prediction == 0, 'Real').otherwise('Fake'))

In [24]:
tweet_dff_classified.select('tweet2','prediction1').show(20, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|tweet2                                                                                                                                                                                                                                                                       |prediction1|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|Thank God...let’s hope this is the last time. Am not able for another lockup in September.                                                         

In [25]:
tweet_dff_classified.printSchema()

root
 |-- tweet_id: long (nullable = true)
 |-- tweet2: string (nullable = true)
 |-- to_spark: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)
 |-- prediction1: string (nullable = false)



In [26]:
tweet_dff_classified.filter(col("prediction1").contains("Fake")).select('tweet_id',
                                                                        'tweet2','prediction1').show(2000, truncate=False)

[Stage 3521:>                                                       (0 + 1) / 1]

+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|tweet_id           |tweet2                                                                                                                                                                                                                                                                                                                                              |prediction1|
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

# On all data

In [27]:
# import os
# folder = 'gs://bigdata_project_hksv/tweetData/'
# filelist = [file for file in os.listdir(folder)]

In [28]:
tweet_dff = spark.read.option("multiline", True).option("quote", "\"").option("escape", "\"").csv("gs://bigdata_project_hksv/tweetData/*.csv", inferSchema=True, header=True)


tweet_dff.show(20)

                                                                                

+-------------------+----------+--------+-----+--------------+----------+--------------------+
|           tweet_id|     tdate|   ttime|tlang|tcountry_place|month_year|               tweet|
+-------------------+----------+--------+-----+--------------+----------+--------------------+
|1313520869407371264|2020-10-06|16:45:53|   en|          NULL|   2020-10|Trump looked like...|
|1313520870183243777|2020-10-06|16:45:54|   en|          NULL|   2020-10|Secret Service Ag...|
|1313520870216916994|2020-10-06|16:45:54|   en|          NULL|   2020-10|                null|
|1313520870472781826|2020-10-06|16:45:54|   en|          NULL|   2020-10|                null|
|1313520870988681216|2020-10-06|16:45:54|   en|          NULL|   2020-10|                null|
|1313520872301363201|2020-10-06|16:45:54|   en|          NULL|   2020-10|Local experts fro...|
|1313520872691503106|2020-10-06|16:45:54|   en|          NULL|   2020-10|#NYC to test #tea...|
|1313520873048084481|2020-10-06|16:45:54|   en|   

In [29]:
duration=['2021-02', '2021-03', '2021-04', '2021-05']
tweet_dff1 = tweet_dff.filter(tweet_dff.month_year.isin(duration))
# tweet_dff.filter(col("month_year").contains('2020-10')).show(10)

In [30]:
tweet_dff2 = tweet_dff1.dropna(thresh=1, subset=('tweet'))
tweet_dff2 = tweet_dff2.dropna(thresh=1, subset=('tweet'))

tweet_dff2.show(20)



+-------------------+----------+--------+-----+--------------+----------+--------------------+
|           tweet_id|     tdate|   ttime|tlang|tcountry_place|month_year|               tweet|
+-------------------+----------+--------+-----+--------------+----------+--------------------+
|1369547251152945152|2021-03-10|07:14:44|   en|          NULL|   2021-03|Covid vaccination...|
|1369547270417510402|2021-03-10|07:14:48|   en|          NULL|   2021-03|US House poised t...|
|1369547270681632771|2021-03-10|07:14:48|   en|          NULL|   2021-03|Papua New Guinea ...|
|1369547291305123842|2021-03-10|07:14:53|   en|            PK|   2021-03|All set for 1B Vi...|
|1369547296396967937|2021-03-10|07:14:55|   en|          NULL|   2021-03|@KamyabJawanPK @P...|
|1369547298049556480|2021-03-10|07:14:55|   en|          NULL|   2021-03|You're allowed to...|
|1369547302579314689|2021-03-10|07:14:56|   en|          NULL|   2021-03|Good thread on Fe...|
|1369547303183396871|2021-03-10|07:14:56|   en|   

                                                                                

In [31]:
tweet_dff2.select('tweet_id').count()

                                                                                

11535172

In [32]:
tweet_dff1.select('tweet_id').count()

                                                                                

13341296

In [None]:
tweet_dff3 = tweet_dff2.withColumn('tweet1',regexp_replace(col('tweet'), '@', ''))
tweet_dff3 = tweet_dff3.withColumn('tweet2',regexp_replace(col('tweet1'), '#', ''))
tweet_dff3 = tweet_dff3.withColumn('tweet2',regexp_replace(col('tweet2'), 'RT', ''))
tweet_dff3 = tweet_dff3.withColumn('tweet2',regexp_replace(col('tweet2'), ':', ''))
tweet_dff3 = tweet_dff3.withColumn('tweet2',regexp_replace(col('tweet2'), 'http\S+', ''))

tweet_dff3 = tweet_dff3.select('tweet2', 'tcountry_place', 'month_year')

pipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stemmer,
        finisher,
        stopword_remover,
#         tf,
#         idf,
        countVectors
    ]
)
tweet_dfff = pipeline.fit(tweet_dff3).transform(tweet_dff3)



In [None]:
tweet_dfff_classified = rfModel.transform(tweet_dfff)



In [None]:
from pyspark.sql.functions import when

tweet_dfff_classified = tweet_dfff_classified.withColumn('prediction1',
                                when(tweet_dfff_classified.prediction == 0, 'Real').otherwise('Fake'))

In [None]:
tweet_dfff_classified.select('tweet2','prediction1').show(20, truncate=False)

In [None]:
tweet_dfff_classified.groupBy('prediction1').count().show()



+-----------+--------+
|prediction1|   count|
+-----------+--------+
|       Real|14361100|
|       Fake|10341931|
+-----------+--------+



                                                                                

In [None]:
# tweet_dfff_classified.groupBy(['tcountry_place', 'prediction1']).count().show(100)

In [None]:
# tweet_dfff_classified.groupBy(['month_year', 'prediction1']).count().show()

In [None]:
df_result4 = tweet_dfff_classified.groupby(tweet_dfff_classified.prediction1).count().sort('prediction1').orderBy(["prediction1"], ascending=False)


In [None]:
df_con = tweet_dfff_classified.select("tcountry_place", "prediction1").groupby("tcountry_place", "prediction1").count().sort('count').orderBy(['count'], ascending = False)

In [None]:
df_text = tweet_dfff_classified.select("tweet2", "prediction1")

In [None]:
df_month = tweet_dfff_classified.select("month_year", "prediction1").groupby("month_year", "prediction1").count().sort('count').orderBy(['month_year'], ascending = False)

In [None]:
pdf_sent = df_result4.toPandas()

In [None]:
fig, ax = plt.subplots(figsize=(11,7))  
sns.barplot(x='sentiment', y='count', data=pdf_sent, ax=ax)

In [None]:
pdf_con = df_con.toPandas()

In [None]:
pdf_con = pdf_con[pdf_con['tcountry_place'] != "NULL"]

In [None]:
fig, ax = plt.subplots(figsize=(30,10))
sns.barplot(x= 'tcountry_place', y='count', hue='prediction1', data = pdf_con.iloc[0:11], ax=ax)

In [None]:
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator
def show_wordcloud_pos(data, title = None):
    wordcloud = WordCloud(
        background_color='white',
        stopwords=set(STOPWORDS),
        max_words=50,
        max_font_size=40, 
        scale=5,
        random_state=1,
        colormap='winter'
    ).generate(str(data))

    fig = plt.figure(1, figsize=(30,30))
    plt.axis('off')
    if title: 
        fig.suptitle(title, fontsize=20)
        fig.subplots_adjust(top=2.3)

    plt.imshow(wordcloud)
    plt.show()

In [None]:
pandf_pos = tweet_dfff_classified.filter(col("prediction1").contains("Real")).select('tweet2').toPandas()


In [None]:
show_wordcloud_pos(pandf_pos['tweet2'], title = 'Words for Real Tweets')

In [None]:
pandf_neg = tweet_dfff_classified.filter(col("prediction1").contains("Fake")).select('tweet2').toPandas()


In [None]:
show_wordcloud_pos(pandf_neg['tweet2'], title = 'Words for Fake Tweets')