## Important packages

In [2]:
# NLP
import nltk
from nltk.stem import PorterStemmer
from nltk.corpus import stopwords


# Pandas is required to read the data.
# For some reason pyspark can't read the csv file correctly
# So we have to read using pandas and then convert to spark DF
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt


# PySpark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.ml.feature import IDF, Tokenizer, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.functions import when, col, regexp_replace, concat, lit, length
from pyspark.sql.types import FloatType, DoubleType
from pyspark.ml.classification import NaiveBayesModel, NaiveBayes
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [3]:
def evaluate(df, labelCol = "label", predCol = "prediction"):
    TP = df.filter((col(labelCol) == 0) & (col(predCol) == 0)).count()
    FN = df.filter((col(labelCol) == 1) & (col(predCol) == 0)).count()
    FP = df.filter((col(labelCol) == 0) & (col(predCol) == 1)).count()
    TN = df.filter((col(labelCol) == 1) & (col(predCol) == 1)).count()

    precision = (TP)/(TP+FP)
    recall = (TP)/(TP+FN)
    print("Accuracy: %.3f" % float((TP+TN)/(TP+TN+FP+FN)))
    print("Recall: %.3f" % float(recall))
    print("Precision: %.3f" % float(precision))
    print("F1 Score: %.3f" % float(2*(precision * recall)/(precision +recall)))

    (df
        .crosstab('label','prediction')
        .withColumnRenamed("label_prediction", "label\prediction")
        .orderBy("label\prediction", asceding = False)
        .show()
    )

    return ([[TP, FP], [FN, TN]], precision, recall)

## Spark Session \& Reading Dataset

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("my_app_name") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()


In [4]:
spark

In [5]:
# Load fake news data from CSV into a DataFrame
data_path = r"C:\Users\sures\Downloads\project\project\news.csv"
spark_df = spark.read.csv(data_path, header=True, inferSchema=True)


In [6]:
# Remove unimportant rows of the df

spark_df = spark_df.filter((spark_df.label == 'FAKE') | (spark_df.label == 'REAL'))

In [7]:
from pyspark.ml.feature import StringIndexer

# Assuming 'label' is the name of the column containing the labels
string_indexer = StringIndexer(inputCol='label', outputCol='encoded_label')
spark_df = string_indexer.fit(spark_df).transform(spark_df)


In [8]:
spark_df.count()

1546

In [8]:
spark_df.show()

+--------------------+--------------------+--------------------+-----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------------+
|                 _c0|               title|                text|label| _c4| _c5| _c6| _c7| _c8| _c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27

In [9]:
# import libraries for text cleaning

from pyspark.sql.functions import isnan, when, count, col

In [10]:
# Cheking for null values

spark_df.select([count(when(isnan(col), col)).alias(col) for col in spark_df.columns]).show()

+---+-----+----+-----+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------------+
|_c0|title|text|label|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27|_c28|_c29|_c30|_c31|_c32|_c33|_c34|_c35|_c36|_c37|_c38|_c39|_c40|_c41|_c42|_c43|_c44|_c45|_c46|_c47|_c48|_c

## Pre-processing

### Cleaning Dataset

In [11]:
# Deleting all rows that are missing text
# df_rmv_nan_text = spark_df.filter(col("text") != "NaN")
df_rmv_nan_text = spark_df.filter(length(col("text")) > 60)

# There are a lot of NaN in the dataset.
# Those are Null values in pandas that were
# Converted to NaN string in spark df.
# Since it is a string, it will not be recognized by na() methods
# So, we have to manually change their value:
df_no_nan = (df_rmv_nan_text
             .withColumn("title", when(col("title") == "NaN", " ")
                                            .otherwise(col("title")))
             )


## NOTE: Later on we will use Tokenizer from PySpark MLlib. This tokenizer
##       takes care of converting all characters to lowercase, so it is
##       not required in this step.

# Remove non-character from title and text
df_clean = (df_no_nan

                 ## Removing any non-character from title
                .withColumn("title",
                            regexp_replace(
                                col('title'),
                                r'[^\w\’ ]',''))

                ## Removing any non-character from text
                .withColumn("text",
                            regexp_replace(
                                col('text'),
                                r'[^\w\’ ]',''))

                ## Replacing 2 or more whitespaces with 1 whitespace
                .withColumn("text",
                            regexp_replace(
                                col('text'),
                                r'[ ]{2,}',' '))

                ## Replacing 2 or more whitespaces with 1 whitespace
                .withColumn("title",
                            regexp_replace(
                                col('text'),
                                r'[ ]{2,}',' '))
                )


# Concatenation of title and text when title doesn't appear in text
df_combined = (df_clean
                    .withColumn('full_text',
                                  when(col("text").contains(
                                                    concat(col("title"))),
                                                    col("text"))

                                  .otherwise(concat(col("title"),
                                                    lit(" "),
                                                    col("text"))))
                    .select(["full_text","encoded_label"])
                    .withColumn("label", col("encoded_label").cast(DoubleType()))
                    .dropDuplicates()
                )


# Clean memory
del df_rmv_nan_text, df_no_nan, df_clean

# Sanity Check
print(df_combined.count())
df_combined.show(7)

906
+--------------------+-------------+-----+
|           full_text|encoded_label|label|
+--------------------+-------------+-----+
|Trump Raises Conc...|          0.0|  0.0|
|The Manhattan bil...|          1.0|  1.0|
| she explains why...|          0.0|  0.0|
|Fluoridation of p...|          0.0|  0.0|
|YouTube censoring...|          0.0|  0.0|
|We obviously spok...|          1.0|  1.0|
|Home VIDEO TREY G...|          0.0|  0.0|
+--------------------+-------------+-----+
only showing top 7 rows



### Check Class Balance

Still balanced!

In [12]:
df_combined.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|  547|
|  1.0|  359|
+-----+-----+



### Stopwords

In [13]:
try:
    stopwords_ls = stopwords.words('english')
except:
    nltk.download("stopwords")
    stopwords_ls = stopwords.words('english')

# Sanity Check
stopwords_ls[:10]

['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're"]

### Stemmer Class

In [9]:


from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable


class Stemmer(Transformer,
                 HasInputCol,
                 HasOutputCol,
                 DefaultParamsReadable,
                 DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol = "input", outputCol = "output"):
        super(Stemmer, self).__init__()
        kwargs = self._input_kwargs
        self.set_params(**kwargs)

    @keyword_only
    def set_params(self, inputCol = "input", outputCol = "output"):
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def get_input_col(self):
        return self.getOrDefault(self.inputCol)

    def get_output_col(self):
        return self.getOrDefault(self.outputCol)

    def _transform(self, df):

        # Input and output column
        input_col = self.get_input_col()
        output_col = self.get_output_col()

        # Initialize stemmer from nltk package
        ps = PorterStemmer()

        # User Defined Function: stemming every word in the input column
        transform_udf = F.udf(lambda x: [ps.stem(word) for word in x], ArrayType(StringType(), False))

        # Return the new df with the new column
        return df.withColumn(output_col, transform_udf(input_col))

# Sanity check
# words = Tokenizer(inputCol="text", outputCol="words").transform(spark_df)
# test = Stem(inputCol = "words", outputCol = "test").transform(words)
# test.select(["words", "test"]).show(4)

## Machine Learning

### Split Data

In [17]:
# Split data to train and test
train, test = df_combined.randomSplit([0.7,0.3], seed=2)

In [10]:
from pyspark.ml import Pipeline # pipeline to transform data
from pyspark.sql import SparkSession # to initiate spark
from pyspark.ml.feature import RegexTokenizer # tokenizer
from pyspark.ml.feature import HashingTF, IDF # vectorizer
from pyspark.ml.feature import StopWordsRemover # to remove stop words
from pyspark.sql.functions import concat_ws, col # to concatinate cols
from pyspark.ml.classification import LogisticRegression # ml model
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # for hyperparameter tuning
from pyspark.ml.evaluation import MulticlassClassificationEvaluator # to evaluate the model
from pyspark.mllib.evaluation import MulticlassMetrics # # performance metrics

In [19]:
# convert sentences to list of words
tokenizer = RegexTokenizer(inputCol="full_text", outputCol="words", pattern="\\W")

train_df = tokenizer.transform(train)
train_df.select(['label','full_text', 'words']).show(5)

+-----+--------------------+--------------------+
|label|           full_text|               words|
+-----+--------------------+--------------------+
|  0.0| 2006 at 203 am L...|[2006, at, 203, a...|
|  0.0| 2016 JohnGHendy ...|[2016, johnghendy...|
|  1.0| 2016 The poll ha...|[2016, the, poll,...|
|  1.0| 28 states and th...|[28, states, and,...|
|  0.0| Allan served a t...|[allan, served, a...|
+-----+--------------------+--------------------+
only showing top 5 rows



In [20]:
# convert sentences to list of words
tokenizer = RegexTokenizer(inputCol="full_text", outputCol="words", pattern="\\W")

test_df = tokenizer.transform(test)
test_df.select(['label','full_text', 'words']).show(5)

+-----+--------------------+--------------------+
|label|           full_text|               words|
+-----+--------------------+--------------------+
|  1.0| 32 patients were...|[32, patients, we...|
|  0.0| Brian Dobson is ...|[brian, dobson, i...|
|  1.0| Charlie Hebdo ha...|[charlie, hebdo, ...|
|  0.0| Churkin stated T...|[churkin, stated,...|
|  1.0| Cruz was on cons...|[cruz, was, on, c...|
+-----+--------------------+--------------------+
only showing top 5 rows



In [21]:
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filter")

train_df= stopwords_remover.transform(train_df)

train_df.select(['label','full_text', 'words', 'filter']).show(5)

+-----+--------------------+--------------------+--------------------+
|label|           full_text|               words|              filter|
+-----+--------------------+--------------------+--------------------+
|  0.0| 2006 at 203 am L...|[2006, at, 203, a...|[2006, 203, link,...|
|  0.0| 2016 JohnGHendy ...|[2016, johnghendy...|[2016, johnghendy...|
|  1.0| 2016 The poll ha...|[2016, the, poll,...|[2016, poll, marg...|
|  1.0| 28 states and th...|[28, states, and,...|[28, states, dist...|
|  0.0| Allan served a t...|[allan, served, a...|[allan, served, t...|
+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [22]:
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filter")

test_df = stopwords_remover.transform(test_df)

test_df.select(['label','full_text', 'words', 'filter']).show(5)

+-----+--------------------+--------------------+--------------------+
|label|           full_text|               words|              filter|
+-----+--------------------+--------------------+--------------------+
|  1.0| 32 patients were...|[32, patients, we...|[32, patients, di...|
|  0.0| Brian Dobson is ...|[brian, dobson, i...|[brian, dobson, w...|
|  1.0| Charlie Hebdo ha...|[charlie, hebdo, ...|[charlie, hebdo, ...|
|  0.0| Churkin stated T...|[churkin, stated,...|[churkin, stated,...|
|  1.0| Cruz was on cons...|[cruz, was, on, c...|[cruz, constant, ...|
+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [23]:
# Calculate term frequency in each article
hashing_tf = HashingTF(inputCol="filter", outputCol="raw_features", numFeatures=10000)
featurized_data = hashing_tf.transform(train_df)

# TF-IDF vectorization of articles
idf = IDF(inputCol="raw_features", outputCol="features")
idf_vectorizer = idf.fit(featurized_data)
train_df = idf_vectorizer.transform(featurized_data)

train_df.select("label",'full_text', 'words', 'filter', "features").show()

+-----+--------------------+--------------------+--------------------+--------------------+
|label|           full_text|               words|              filter|            features|
+-----+--------------------+--------------------+--------------------+--------------------+
|  0.0| 2006 at 203 am L...|[2006, at, 203, a...|[2006, 203, link,...|(10000,[168,3469,...|
|  0.0| 2016 JohnGHendy ...|[2016, johnghendy...|[2016, johnghendy...|(10000,[263,671,8...|
|  1.0| 2016 The poll ha...|[2016, the, poll,...|[2016, poll, marg...|(10000,[42,808,83...|
|  1.0| 28 states and th...|[28, states, and,...|[28, states, dist...|(10000,[120,132,1...|
|  0.0| Allan served a t...|[allan, served, a...|[allan, served, t...|(10000,[379,407,6...|
|  0.0| American voters ...|[american, voters...|[american, voters...|(10000,[3712,5633...|
|  0.0| Bill Clinton is ...|[bill, clinton, i...|[bill, clinton, s...|(10000,[47,157,16...|
|  1.0| Brockway said of...|[brockway, said, ...|[brockway, said, ...|(10000,[85

In [12]:
# Calculate term frequency in each article
hashing_tf = HashingTF(inputCol="filter", outputCol="raw_features", numFeatures=10000)
featurized_data_test = hashing_tf.transform(test_df)

# TF-IDF vectorization of articles
idf = IDF(inputCol="raw_features", outputCol="features")
idf_vectorizer = idf.fit(featurized_data_test)
test_df = idf_vectorizer.transform(featurized_data)

test_df.select("label",'full_text', 'words', 'filter', "features").show()

NameError: name 'test_df' is not defined

In [25]:
train=train_df.select("label","features")
test=test_df.select("label","features")

In [26]:
train.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(10000,[168,3469,...|
|  0.0|(10000,[263,671,8...|
|  1.0|(10000,[42,808,83...|
|  1.0|(10000,[120,132,1...|
|  0.0|(10000,[379,407,6...|
+-----+--------------------+
only showing top 5 rows



In [27]:
test.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(10000,[168,3469,...|
|  0.0|(10000,[263,671,8...|
|  1.0|(10000,[42,808,83...|
|  1.0|(10000,[120,132,1...|
|  0.0|(10000,[379,407,6...|
+-----+--------------------+
only showing top 5 rows



In [11]:
train =train.withColumn('label', train['label'].cast('double'))


NameError: name 'train' is not defined

In [29]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['features'], outputCol='dense_features')
train = assembler.transform(train)


In [30]:
train.show(5)


+-----+--------------------+--------------------+
|label|            features|      dense_features|
+-----+--------------------+--------------------+
|  0.0|(10000,[168,3469,...|(10000,[168,3469,...|
|  0.0|(10000,[263,671,8...|(10000,[263,671,8...|
|  1.0|(10000,[42,808,83...|(10000,[42,808,83...|
|  1.0|(10000,[120,132,1...|(10000,[120,132,1...|
|  0.0|(10000,[379,407,6...|(10000,[379,407,6...|
+-----+--------------------+--------------------+
only showing top 5 rows



In [31]:
train.drop("features")

DataFrame[label: double, dense_features: vector]

In [32]:
from xgboost.spark import SparkXGBClassifier

In [33]:
model=SparkXGBClassifier(label_col="label").fit(train)

2024-05-12 13:47:09,044 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-05-12 13:47:18,151 INFO XGBoost-PySpark: _fit Finished xgboost training!


In [34]:
predict_df=model.transform(test)

In [35]:
predict_df.show()

+-----+--------------------+--------------------+----------+--------------------+
|label|            features|       rawPrediction|prediction|         probability|
+-----+--------------------+--------------------+----------+--------------------+
|  0.0|(10000,[168,3469,...|[-0.6931549906730...|       1.0|[0.33333158493041...|
|  0.0|(10000,[263,671,8...|[2.95089197158813...|       0.0|[0.95030564069747...|
|  1.0|(10000,[42,808,83...|[-0.8122609853744...|       1.0|[0.30740892887115...|
|  1.0|(10000,[120,132,1...|[-0.6931549906730...|       1.0|[0.33333158493041...|
|  0.0|(10000,[379,407,6...|[1.32028520107269...|       0.0|[0.78922915458679...|
|  0.0|(10000,[3712,5633...|[0.72081977128982...|       0.0|[0.67278754711151...|
|  0.0|(10000,[47,157,16...|[2.24892854690551...|       0.0|[0.90455806255340...|
|  1.0|(10000,[855,1241,...|[-0.3804846704006...|       1.0|[0.40601003170013...|
|  0.0|(10000,[551,695,1...|[-1.2882757186889...|       1.0|[0.21614480018615...|
|  1.0|(10000,[1

In [36]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [37]:
# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predict_df)

# Print the AUC score
print("AUC:", auc)

AUC: 0.9494047619047623


GBT


In [38]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["features"], outputCol="features_vector")
from pyspark.ml.classification import GBTClassifier

In [39]:
gbt = GBTClassifier(labelCol="label", featuresCol="features_vector", maxIter=10)

# Create a pipeline
pipeline = Pipeline(stages=[assembler, gbt])

# Train the GBTClassifier model
model_gbt = pipeline.fit(train_df)

# Make predictions on the testing data
predictions = model_gbt.transform(test_df)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)

# Print the AUC score
print("AUC:", auc)

AUC: 0.9442197610751611
