In [0]:
from pyspark.sql.types import *
import pandas as pd 
from pyspark.ml.feature import StringIndexer
import pyspark.sql.functions as f
import matplotlib.pyplot as plt
from pyspark.sql.functions import count
from pyspark.ml.feature import Tokenizer,HashingTF, IDF, StringIndexer, SQLTransformer,IndexToString
from pyspark.ml.classification import LogisticRegression,LinearSVC,NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import concat, concat_ws, lit, col, trim


In [0]:
data = spark.read.format('csv').option('header','true').option("multiLine", 'true').load('/FileStore/tables/news_articles-2.csv')
data.show(5)

### Data Preprocessing

First of all we will analyse our main features, visualize them and make some conclusions and after that choose the appropriate models for our text classification.

In [0]:
#Check if we have missing values or not 

Dict_Null = {col:data.filter(data[col].isNull()).count() for col in data.columns}
Dict_Null

In [0]:
data.count()

In [0]:
#dropping the noneficient columns
data = data.drop('published','main_img_url')

In [0]:
data.show(5)

In [0]:
#Now we wnat to analyse our articles's types,to do that we need to find their unique values. 

data.select('type').distinct().rdd.map(lambda r: r[0]).collect()

Junksci is junk science which is also categorised as fake. Satire and state declare the category under which untrustworthy or false news fall under.

In [0]:
#Now we will drop our Na's and continue our analyse without them.

data = data.na.drop()

In [0]:
#After dropping Na's checking that in tha data, we have no any Na 
Dict_no_null = {col:data.filter(data[col].isNull()).count() for col in data.columns}
Dict_no_null

In [0]:
#Convert our string label(fake and real) to numeric(0,1)

indexer = StringIndexer(inputCol="label", outputCol="label1")
indexed = indexer.fit(data).transform(data)
indexed.show(5)

In [0]:
#Select only the cleaned text and title without stop words , after that change label1 to label 

data2 = indexed.select("text_without_stopwords",'title_without_stopwords','author', 'label1')
data2 = data2.withColumnRenamed("label1","label")
data2.show(5)

In [0]:
#Concat our text columns into one , thus we have two columns text and label 
data3=data2.withColumn('text', concat(col('text_without_stopwords'),lit(" "),col('title_without_stopwords'),lit(" "),col('author')))
data3 = data3.drop('text_without_stopwords','title_without_stopwords','author')
data3.show(5)

### EDA(Exploratory data analysis)

In [0]:
#We want to understand most popular type of news 
data1 = data.groupBy('type').count()
data1.toPandas()
type(data1)

In [0]:
#As we can see we have unbalanced data and the most popular type is bs(so the most news are bullshit)

display(data1)

type,count
conspiracy,430
bias,389
hate,244
junksci,102
fake,15
state,121
satire,146
bs,598


In [0]:
#We are interesting also in proportions od our news's labels , as we can see the proportion of fake news is bigger
data_label = data.groupBy('label').count()
data_label.toPandas()
display(data_label)

label,count
Real,754
Fake,1291


In [0]:
data_image = data.groupBy('label','hasImage').count()
data_image.toPandas()
display(data_image)

#When we have 100% stacked barplot we can see that the percantage of news articles that have image are more in real news than in the fake ones. 
#So we have  the first assumpion that the image cane be correlated with the news reliability.So we will include that in the next steps. 

label,hasImage,count
Fake,1,951
Real,0,125
Fake,0,340
Real,1,629


### Model training and Selection

In [0]:
#Spliting our data

(train, test) = data3.randomSplit([0.7, 0.3], seed = 43)

In [0]:
test.show(5)

In [0]:
#Creating the pipelines for each model, where the stages are, tokenization of data, tf-idf and the model name 

tokenizer = Tokenizer(inputCol = 'text',outputCol="words") 
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
lr = LogisticRegression(maxIter=18, regParam=0.3, elasticNetParam=0.0)
nb = NaiveBayes(smoothing=1)
svc = LinearSVC(maxIter=22, regParam=0.3)

logistic_pipeline = Pipeline(
    stages=[tokenizer,
            hashingTF,
            idf,
            lr])

nb_pipeline = Pipeline(
  stages = [tokenizer,
            hashingTF,
            idf,
            nb])

svc_pipeline = Pipeline(
  stages = [tokenizer,
            hashingTF,
            idf,
            svc])



In [0]:
#fit and transform for each model 

pipeline_model_lr = logistic_pipeline.fit(train)
pipeline_model_nb = nb_pipeline.fit(train)
pipeline_model_svc = svc_pipeline.fit(train)

predictions_lr =  pipeline_model_lr.transform(test)
predictions_nb = pipeline_model_nb.transform(test)
predictions_svc = pipeline_model_svc.transform(test)

In [0]:
#functions for accuracy metrics

def accuracy(prediction):
  evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
  accuracy = evaluator.evaluate(prediction)
  return round(accuracy,5)

def Precision(prediction):
  evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
  precision = evaluator.evaluate(prediction)
  return round(precision,5)

def Recall(prediction):
  evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedRecall")
  recall = evaluator.evaluate(prediction)
  return round(recall,5)

In [0]:
#get the list for each model every accuracy metric 

models = [predictions_lr,predictions_nb,predictions_svc]
def metrics(x):
  m = []
  for i in x:
    m.append([accuracy(i),Precision(i),Recall(i)])
  return m 
  

In [0]:
#Creating a list which includes lists in list , in list we have the model name and it's 3 accuracy metrics

lst = metrics(models)
name = [['logistic'],['naive bayes'],['svc']] 
a = []
for i,j in zip(name,lst):
    a.append(i+j)
    
    
print(a)

In [0]:
#Show metrics for each model for comparison. As we can see the best model is Logisticregression 
name = ['logistic','naive bayes','svc'] 
columns = ['models',"accuracy", "precision", "recall"]

dataframe = spark.createDataFrame(a, columns)
dataframe.show(5)

### Conclusion

Based on our result we can see that news can be unrelaible , so we need to rechecked the source of news. Based on our visualization we have seen that the most popular type is the fake one. After fitting our three possible models : LogisticRegression, LinearSVC,NaiveBayes.We can see that , our accuracy isn't so high, maybe there are other factors that affect the detection process, but aren't included in our data. <br>
Overall, the best two models are ,that have higher performance on our data: LogisticRegression and LinearSVC.