Spark can be used to implement sentiment analysis and text features like TF-IDF, n-grams, and word embeddings. Spark is well-suited for handling large-scale data processing tasks, making it and ideal platform for processing large datasets efficiently. 

#### 1. Dataset collection and load text and libs
Dataset from Kaggle -> https://www.kaggle.com/datasets/emineyetm/fake-news-detection-datasets?resource=download (Fake news detection dataset)

In [21]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import pyspark.sql.functions as F

In [22]:
spark = (
    
            SparkSession
            .builder
            .appName('FakeNewsDetection') 
            .config("spark.driver.memory", "4g")
            .config("spark.executor.memory", "8g")
            .master('local[*]') 
            .getOrCreate()
    )

spark

### Dataset Schema

In [23]:
from pyspark.sql.types import (
    StringType, 
    StructField,
    StructType,
)

schema = StructType([
    StructField('title', StringType(), True),
    StructField('text', StringType(), True),
    StructField('subject', StringType(), True),
    StructField('date', StringType(), True),
])

In [24]:
subjects = [
    "politicsNews",
    "worldnews",
    "News",
    "politics",
    "left-news",
    "US_News",
    "Middle-east",
    "Government News"
]

In [25]:
df_fake = spark.read.csv("./data/Fake.csv", header=True, schema=schema)#.limit(5000)
df_true = spark.read.csv("./data/True.csv", header=True, schema=schema)#.limit(5000)

1. Tokenize the text, remove stopwords, and compute TF-IDF.

In [26]:
data = df_true.withColumn('fake', F.lit(0)).union(df_fake.withColumn('fake', F.lit(1))).orderBy(F.rand())

In [27]:
data = data.filter(F.col('subject').isin(subjects))


In [28]:
data.count()

43638

#### Check the values of the subject column

In [29]:
from pyspark.ml.feature import SQLTransformer, RegexTokenizer, StopWordsRemover, CountVectorizer, Imputer, IDF
from pyspark.ml.feature import StringIndexer, VectorAssembler
StopWordsRemover.loadDefaultStopWords('english')

# 0. Extract tokens from title
title_tokenizer= RegexTokenizer(inputCol= 'title', outputCol= 'title_words',
                                pattern= '\\W', toLowercase= True)
# 1. Remove stop words from title
title_sw_remover= StopWordsRemover(inputCol= 'title_words', outputCol= 'title_sw_removed')
# 2. Compute Term frequency from title
title_count_vectorizer= CountVectorizer(inputCol= 'title_sw_removed', outputCol= 'tf_title')
# 3. Compute Term frequency-inverse document frequency from title
title_tfidf= IDF(inputCol= 'tf_title', outputCol= 'tf_idf_title')
# 4. Extract tokens from text
text_tokenizer= RegexTokenizer(inputCol= 'text', outputCol= 'text_words',
                                pattern= '\\W', toLowercase= True)
# 5. Remove stop words from text
text_sw_remover= StopWordsRemover(inputCol= 'text_words', outputCol= 'text_sw_removed')
# 6. Compute Term frequency from text
text_count_vectorizer= CountVectorizer(inputCol= 'text_sw_removed', outputCol= 'tf_text')
# 7. Compute Term frequency-inverse document frequency text
text_tfidf= IDF(inputCol= 'tf_text', outputCol= 'tf_idf_text')
# 8. StringIndexer subject
subject_str_indexer= StringIndexer(inputCol= 'subject', outputCol= 'subject_idx')
# 9. VectorAssembler
vec_assembler= VectorAssembler(inputCols=['tf_idf_title', 'tf_idf_text', 'subject_idx'], outputCol= 'features')

In [30]:
from pyspark.ml.classification import RandomForestClassifier
# 10 Random Forest Classifier
rf= RandomForestClassifier(featuresCol= 'features', labelCol= 'fake', predictionCol= 'fake_predict', maxDepth= 7, numTrees= 20)

In [31]:
from pyspark.ml import Pipeline
rf_pipe= Pipeline(stages=[title_tokenizer, # 0
                title_sw_remover, # 1
                title_count_vectorizer, # 2
                title_tfidf, # 3
                text_tokenizer, # 4
                text_sw_remover, # 5
                text_count_vectorizer, # 6
                text_tfidf, # 7
                subject_str_indexer, # 8
                vec_assembler, # 9
                rf]) # 10 model

In [32]:
train, test= data.randomSplit([0.8, 0.2])


In [33]:
rf_model= rf_pipe.fit(train)


24/12/15 17:44:16 WARN DAGScheduler: Broadcasting large task binary with size 1207.6 KiB
24/12/15 17:44:18 WARN DAGScheduler: Broadcasting large task binary with size 1208.6 KiB
24/12/15 17:44:23 WARN DAGScheduler: Broadcasting large task binary with size 10.6 MiB
24/12/15 17:44:23 WARN DAGScheduler: Broadcasting large task binary with size 10.6 MiB
24/12/15 17:44:26 WARN DAGScheduler: Broadcasting large task binary with size 11.8 MiB
24/12/15 17:44:40 WARN DAGScheduler: Broadcasting large task binary with size 13.4 MiB
24/12/15 17:44:42 WARN MemoryStore: Not enough space to cache rdd_655_6 in memory! (computed 199.7 MiB so far)
24/12/15 17:44:42 WARN BlockManager: Persisting block rdd_655_6 to disk instead.
24/12/15 17:44:42 WARN MemoryStore: Not enough space to cache rdd_655_5 in memory! (computed 199.7 MiB so far)
24/12/15 17:44:42 WARN BlockManager: Persisting block rdd_655_5 to disk instead.
24/12/15 17:44:42 WARN MemoryStore: Not enough space to cache rdd_655_7 in memory! (comput

In [34]:
from pyspark.ml.evaluation import  MulticlassClassificationEvaluator, BinaryClassificationEvaluator

accuracy= MulticlassClassificationEvaluator(labelCol= 'fake', predictionCol= 'fake_predict', metricName= 'accuracy')
f1= MulticlassClassificationEvaluator(labelCol= 'fake', predictionCol= 'fake_predict', metricName= 'f1')
areaUnderROC= BinaryClassificationEvaluator(labelCol= 'fake', metricName= 'areaUnderROC')

def classification_evaluator(data_result):
    data_result.crosstab(col1= 'fake_predict', col2= 'fake').show()
    print('accuracy:' ,accuracy.evaluate(data_result))
    print('f1:' ,f1.evaluate(data_result))
    print('areaUnderROC:' ,areaUnderROC.evaluate(data_result))

In [35]:
# Predict on training data set
rf_train_result= rf_model.transform(train)

In [36]:
classification_evaluator(rf_train_result)

24/12/15 17:45:36 WARN DAGScheduler: Broadcasting large task binary with size 10.8 MiB
24/12/15 17:45:38 WARN DAGScheduler: Broadcasting large task binary with size 10.7 MiB
24/12/15 17:45:39 WARN DAGScheduler: Broadcasting large task binary with size 10.7 MiB
                                                                                

+-----------------+-----+-----+
|fake_predict_fake|    0|    1|
+-----------------+-----+-----+
|              1.0|  433|17119|
|              0.0|16598|  627|
+-----------------+-----+-----+



24/12/15 17:45:40 WARN DAGScheduler: Broadcasting large task binary with size 10.8 MiB
                                                                                

accuracy: 0.9700106386037551


24/12/15 17:45:44 WARN DAGScheduler: Broadcasting large task binary with size 10.8 MiB
                                                                                

f1: 0.9690793761914358


24/12/15 17:45:47 WARN DAGScheduler: Broadcasting large task binary with size 10.8 MiB
                                                                                

areaUnderROC: 0.9945007189369528


In [37]:
rf_test_result= rf_model.transform(test)

In [38]:
classification_evaluator(rf_test_result)

24/12/15 17:45:52 WARN DAGScheduler: Broadcasting large task binary with size 10.8 MiB
24/12/15 17:45:54 WARN DAGScheduler: Broadcasting large task binary with size 10.7 MiB
24/12/15 17:45:54 WARN DAGScheduler: Broadcasting large task binary with size 10.7 MiB
                                                                                

+-----------------+----+----+
|fake_predict_fake|   0|   1|
+-----------------+----+----+
|              1.0|  97|4430|
|              0.0|4177| 155|
+-----------------+----+----+



24/12/15 17:45:55 WARN DAGScheduler: Broadcasting large task binary with size 10.8 MiB
                                                                                

accuracy: 0.9668739400791407


24/12/15 17:45:58 WARN DAGScheduler: Broadcasting large task binary with size 10.8 MiB
                                                                                

f1: 0.9683937540859505


24/12/15 17:46:00 WARN DAGScheduler: Broadcasting large task binary with size 10.8 MiB
                                                                                

areaUnderROC: 0.994842927030133
