In [13]:
import csv
import os
import sys
import random
import string
import numpy as np
import pandas as pd
import pyspark
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from nltk.corpus import stopwords  
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.tokenize import RegexpTokenizer
from sklearn.metrics import f1_score
from sklearn.model_selection import KFold
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.ml import Pipeline
from pyspark.mllib.tree import RandomForest
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.sql import SQLContext
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, size, max, abs

In [14]:
# Initialize a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [37]:
# Label the data sets, remove unncessary fields, sample and save the samples
fake = pd.read_csv("../data/Fake.csv")
fake['label'] = 'Fake'
fake = fake.drop(columns=["date", "subject"])
fake = fake.sample(21000)
true = pd.read_csv("../data/True.csv")
true['label'] = 'True'
true = true.drop(columns=["date", "subject"])
true = true.sample(20000)
# fake.to_csv("../data/Sample2_Fake.csv", index=False, header=None)
# true.to_csv("../data/Sample2_True.csv", index=False, header=None)

# Extrac the stop words and punctuations and save them to a file
punc = list(string.punctuation) + ['“','”','‘', '’','...','']
stop_punc = stopwords.words('english') + punc
# with open('../data/stop_punc.txt', 'w') as file:
#     file.write(str(stop_punc))

In [39]:
# Tokenizing articles and removing stop words from the article
fake_rdd = spark.read.csv("../data/Sample2_Fake.csv").rdd
fake_rdd = fake_rdd.filter(lambda x: x[0] is not None and x[1] is not None).map(lambda x: (x[0] + ' ' + x[1], x[2])).filter(lambda x: x[1]=='Fake')
fake_rdd = fake_rdd.map(lambda x: Row(article=x[0], label=x[1]))
fake_df = spark.createDataFrame(fake_rdd)
tokenizer = Tokenizer(inputCol="article", outputCol="words")
fake_df = tokenizer.transform(fake_df)
fake_rdd = fake_df.rdd
fake_rdd = fake_rdd.map(lambda x: (x[0], [i for i in x[2] if i not in stop_punc], x[1]))
num_fake = fake_rdd.count()
true_rdd = spark.read.csv("../data/Sample2_True.csv").rdd
true_rdd = true_rdd.filter(lambda x: x[0] is not None and x[1] is not None).map(lambda x: (x[0] + ' ' + x[1], x[2])).filter(lambda x: x[1]=='True')
true_rdd = true_rdd.map(lambda x:Row(article=x[0], label=x[1]))
true_df = spark.createDataFrame(true_rdd)
true_df = tokenizer.transform(true_df)
true_rdd = true_df.rdd.map(lambda x: (x[0], [i for i in x[2] if i not in stop_punc], x[1]))
num_true = true_rdd.count()

# Feature extraction using TFIDF
fake_rdd = fake_rdd.map(lambda x: (x[1], x[2])).map(lambda x: Row(words=x[0], label=x[1]))
fake_df = spark.createDataFrame(fake_rdd)
hashingTF = HashingTF(inputCol='words', outputCol='rawFeatures', numFeatures=20)
fake_df = hashingTF.transform(fake_df)
idf = IDF(inputCol='rawFeatures', outputCol='features')
idfModel = idf.fit(fake_df)
fake_df = idfModel.transform(fake_df)
fake_rdd = fake_df.rdd.map(lambda x: (x[3], x[1])).map(lambda x: ([np.take(x[0], i) for i in range(np.size(x[0]))], x[1]))

true_rdd = true_rdd.map(lambda x: (x[1], x[2])).map(lambda x: Row(words=x[0], label=x[1]))
true_df = spark.createDataFrame(true_rdd)
hashingTF = HashingTF(inputCol='words', outputCol='rawFeatures', numFeatures=20)
true_df = hashingTF.transform(true_df)
idf = IDF(inputCol='rawFeatures', outputCol='features')
idfModel = idf.fit(true_df)
true_df = idfModel.transform(true_df)
true_rdd = true_df.rdd.map(lambda x: (x[3], x[1])).map(lambda x: ([np.take(x[0], i) for i in range(np.size(x[0]))], x[1]))

# Spliting the data set into training and test set using kfold cv
kf = KFold(n_splits=5)
fake = fake_rdd.collect()
true = true_rdd.collect()
fake_data = []
for train_index, test_index in kf.split(fake):
    train = []
    test = []
    for i in train_index:
        train.append(fake[i])
    for i in test_index:
        test.append(fake[i])
    fake_data.append((train, test))
true_data = []
for train_index, test_index in kf.split(true):
    train = []
    test = []
    for i in train_index:
        train.append(true[i])
    for i in test_index:
        test.append(true[i])
    true_data.append((train, test))
data = []
for i in range(len(fake_data)):
    true_data[i][0].extend(fake_data[i][0])
    true_data[i][1].extend(fake_data[i][1])
data = true_data

# Training and testing using KNN classifier
knn = KNeighborsClassifier(n_neighbors=5)
accuracy = []
predict = []
f1 = []
for i in range(len(data)):
    knn.fit([j[0] for j in data[i][0]], [j[1] for j in data[i][0]])
    accuracy.append(knn.score([j[0] for j in data[i][1]], [j[1] for j in data[i][1]]))
    predict.append(knn.predict([j[0] for j in data[i][1]]))
for i in range(len(predict)):
    f1.append(f1_score([j[1] for j in data[i][1]], predict[i].tolist(), pos_label="True"))
average_f1 = sum(f1)/len(f1)

# Result
print('Classification of', num_fake + num_true, 'datapoints containing')
print(num_fake, 'fake datapoints')
print(num_true, 'true datapoints')
print('Using 5NN classifier and 5fold cross validation resulted in average f1 score of', average_f1)

Classification of  39855  datapoints containing
19939  fake datapoints
19916  true datapoints
Using 5NN classifier and 5fold cross validation resulted in average f1 score of 0.9777375436010749


In [41]:
# Tokenizing articles and removing stop words from the article
fake_rdd = spark.read.csv("../data/Sample2_Fake.csv").rdd
fake_rdd = fake_rdd.filter(lambda x: x[0] is not None and x[1] is not None).map(lambda x: (x[0] + ' ' + x[1], x[2])).filter(lambda x: x[1]=='Fake')
fake_rdd = fake_rdd.map(lambda x: Row(article=x[0], label=x[1]))
fake_df = spark.createDataFrame(fake_rdd)
tokenizer = Tokenizer(inputCol="article", outputCol="words")
fake_df = tokenizer.transform(fake_df)
fake_rdd = fake_df.rdd
fake_rdd = fake_rdd.map(lambda x: (x[0], [i for i in x[2] if i not in stop_punc], x[1]))
num_fake = fake_rdd.count()
true_rdd = spark.read.csv("../data/Sample2_True.csv").rdd
true_rdd = true_rdd.filter(lambda x: x[0] is not None and x[1] is not None).map(lambda x: (x[0] + ' ' + x[1], x[2])).filter(lambda x: x[1]=='True')
true_rdd = true_rdd.map(lambda x:Row(article=x[0], label=x[1]))
true_df = spark.createDataFrame(true_rdd)
true_df = tokenizer.transform(true_df)
true_rdd = true_df.rdd.map(lambda x: (x[0], [i for i in x[2] if i not in stop_punc], x[1]))
num_true = true_rdd.count()

# Feature extraction using TFIDF
fake_rdd = fake_rdd.map(lambda x: (x[1], x[2])).map(lambda x: Row(words=x[0], label=x[1]))
fake_df = spark.createDataFrame(fake_rdd)
hashingTF = HashingTF(inputCol='words', outputCol='rawFeatures', numFeatures=20)
fake_df = hashingTF.transform(fake_df)
idf = IDF(inputCol='rawFeatures', outputCol='features')
idfModel = idf.fit(fake_df)
fake_df = idfModel.transform(fake_df)
fake_rdd = fake_df.rdd.map(lambda x: (x[3], x[1])).map(lambda x: ([np.take(x[0], i) for i in range(np.size(x[0]))], x[1]))

true_rdd = true_rdd.map(lambda x: (x[1], x[2])).map(lambda x: Row(words=x[0], label=x[1]))
true_df = spark.createDataFrame(true_rdd)
hashingTF = HashingTF(inputCol='words', outputCol='rawFeatures', numFeatures=20)
true_df = hashingTF.transform(true_df)
idf = IDF(inputCol='rawFeatures', outputCol='features')
idfModel = idf.fit(true_df)
true_df = idfModel.transform(true_df)
true_rdd = true_df.rdd.map(lambda x: (x[3], x[1])).map(lambda x: ([np.take(x[0], i) for i in range(np.size(x[0]))], x[1]))

# Spliting the data set into training and test set using kfold cv
kf = KFold(n_splits=5)
fake = fake_rdd.collect()
true = true_rdd.collect()
fake_data = []
for train_index, test_index in kf.split(fake):
    train = []
    test = []
    for i in train_index:
        train.append(fake[i])
    for i in test_index:
        test.append(fake[i])
    fake_data.append((train, test))
true_data = []
for train_index, test_index in kf.split(true):
    train = []
    test = []
    for i in train_index:
        train.append(true[i])
    for i in test_index:
        test.append(true[i])
    true_data.append((train, test))
data = []
for i in range(len(fake_data)):
    true_data[i][0].extend(fake_data[i][0])
    true_data[i][1].extend(fake_data[i][1])
data = true_data

# Training and testing using random forest classifier 
num_trees = 40
max_depth = 20
rf = RandomForestClassifier(n_estimators=num_trees, max_depth=max_depth)
accuracy = []
predict = []
f1 = []
for i in range(len(data)):
    rf.fit([j[0] for j in data[i][0]], [j[1] for j in data[i][0]])
    accuracy.append(rf.score([j[0] for j in data[i][1]], [j[1] for j in data[i][1]]))
    predict.append(rf.predict([j[0] for j in data[i][1]]))
for i in range(len(predict)):
    f1.append(f1_score([j[1] for j in data[i][1]], predict[i].tolist(), pos_label="True"))
average_f1 = sum(f1)/len(f1)

# Result
print('Classification of', num_fake + num_true, 'datapoints containing: ')
print(num_fake, 'fake datapoints')
print(num_true, 'true datapoints')
print('Using random forest classifier with', num_trees, 'trees, max depth of', max_depth, 'and 5fold cross validation resulted in average f1 score of', average_f1)

Classification of 39855 datapoints containing: 
19939 fake datapoints
19916 true datapoints
Using random forest classifier with 40 trees, max depth of 20 and 5fold cross validation resulted in average f1 score of 0.9999748964478474
