In [1]:
%%time
import pandas as pd
import os, sys, time, json, re, string, datetime

from pyspark import SparkContext, SparkConf, StorageLevel, keyword_only

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.param.shared import HasInputCol, HasInputCols, HasOutputCol, HasOutputCols, Param
from pyspark.ml.feature import OneHotEncoder, HashingTF, IDF, Tokenizer, RegexTokenizer, NGram, CountVectorizer
from pyspark.ml.feature import StopWordsRemover, VectorAssembler, PCA, OneHotEncoderEstimator,StringIndexer

from pyspark.ml.classification import LogisticRegression, NaiveBayes, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.classification import GBTClassifier, MultilayerPerceptronClassifier

from pyspark.ml import Pipeline, Transformer

from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from platform import python_version
print(python_version())

3.6.10
CPU times: user 62.7 ms, sys: 12.3 ms, total: 75 ms
Wall time: 73.4 ms


In [2]:
%%time
sc

CPU times: user 4 µs, sys: 1e+03 ns, total: 5 µs
Wall time: 6.91 µs


In [3]:
%%time
spark = SparkSession.builder \
        .appName("fakenews") \
        .config("spark.master", "yarn") \
        .config("spark.submit.deployMode", "cluster") \
        .config("spark.driver.memory", "25g") \
        .config("spark.executor.instances", "5") \
        .config("spark.executor.cores", "4") \
        .config("spark.executor.memory", "25g") \
        .getOrCreate()

CPU times: user 6.55 ms, sys: 0 ns, total: 6.55 ms
Wall time: 9.81 ms


In [4]:
%%time
fakenews_path="gs://dataproc-6ca41800-27b4-47d5-abee-55c011dfa389-asia-southeast1/data/fake-news/"
fakenews_data_path = fakenews_path + "two_million_rows_news_cleaned_2018_02_13_pyspark.csv"
df_news = spark.read.format("com.databricks.spark.csv") \
                    .option("header", "true") \
                    .option("delimiter", '#') \
                    .load(fakenews_data_path)

#remove empty content which will cause problem when transform the text
df_news = df_news.filter(df_news.type != "")
df_news = df_news.filter(df_news.content != "")
df_news = df_news.filter(df_news.domain != "")
df_news = df_news.filter(df_news.title != "")
df_news = df_news.filter(df_news.authors != "")
df_news = df_news.dropDuplicates(['type', 'content', 'title', 'authors'])

CPU times: user 12.6 ms, sys: 3.71 ms, total: 16.3 ms
Wall time: 6.32 s


In [5]:
# only keep type and content
df_news = df_news.select('type', 'content', 'domain', 'title','authors')
split_authors_col = F.split(df_news['authors'], ', ')
df_news = df_news.withColumn('author1', split_authors_col.getItem(0))
df_news = df_news.withColumn('author2', split_authors_col.getItem(1))

# add binary label
df_news = df_news.withColumn("label", F.when(F.col("type") == 'fake', 1).otherwise(0))

df_news_fake = df_news.filter(df_news.type == 'fake')
df_news_nonfake = df_news.filter(df_news.type != 'fake')
df_news = df_news_fake.union(df_news_nonfake)

# split the dataset
df_train, df_test = df_news.randomSplit([0.8, 0.2], seed=666)
param_tuning = False

In [6]:
%%time
# customized transformer class to manually extract some counting based text features
class ReviewContentTransformer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol="content", outputCol="content_features"):
        super(ReviewContentTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)


    def _transform(self, dataset):
        
        def f(s):
            uppercase_count = 0
            char_count = 0
            for c in s:                
                if c in string.ascii_uppercase:
                    uppercase_count += 1
                    char_count += 1
                elif c in string.ascii_lowercase:
                    char_count += 1
            
            text_len = len(s)
            return Vectors.dense(text_len, char_count, 
                                 uppercase_count, uppercase_count / (char_count + 1e-10))

        return dataset.withColumn(self.getOutputCol(), 
                                  F.udf(f, VectorUDT())(dataset[self.getInputCol()]))

CPU times: user 99 µs, sys: 10 µs, total: 109 µs
Wall time: 115 µs


In [7]:
%%time
# customized transformer class to manually extract some counting based word features
class ReviewWordsTransformer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol="content", outputCol="content_features"):
        super(ReviewWordsTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)


    def _transform(self, dataset):
        
        def f(words):    
            word_count = len(words)
            unique_word_count = len(set(words))
            upper_words = []
            for w in words:
                if w.isupper():
                    upper_words.append(w)
            upper_word_count = len(set(upper_words))
            unique_upper_word_count = len(upper_words)
            return Vectors.dense(word_count, unique_word_count, unique_word_count / (word_count + 1e-10),
                                 upper_word_count, upper_word_count / (word_count + 1e-10), 
                                 unique_upper_word_count, unique_upper_word_count / (upper_word_count + 1e-10))

        return dataset.withColumn(self.getOutputCol(), 
                                  F.udf(f, VectorUDT())(dataset[self.getInputCol()]))

CPU times: user 114 µs, sys: 0 ns, total: 114 µs
Wall time: 119 µs


In [8]:
%%time
# show model prediction performance on the given dataset
def eval_model_perf(fitted_model, dataset, label_col="label", prediction_col="prediction", probability_col="probability"):
    pred_dataset = fitted_model.transform(dataset)
    eval_dataset = pred_dataset.select(label_col, prediction_col, probability_col)
    # model performance evaluation
    metricNames = ["accuracy", "f1"]
    model_eval = MulticlassClassificationEvaluator(predictionCol=prediction_col, labelCol=label_col)
    for m in metricNames:
        val = model_eval.evaluate(eval_dataset, {model_eval.metricName: m})
        print(m, " = ", val)
    roc_eval = BinaryClassificationEvaluator(rawPredictionCol=probability_col, labelCol=label_col, metricName="areaUnderROC")
    print("AUC =", roc_eval.evaluate(eval_dataset))    
    return pred_dataset

# show CV param tunning result
def show_cv_results(cv_model):
    for result, param in sorted(zip(cv_model.avgMetrics, cv_model.getEstimatorParamMaps()), reverse=True, key=lambda x: x[0]):
        print(result, " | ", param)

CPU times: user 7 µs, sys: 1 µs, total: 8 µs
Wall time: 11.9 µs


In [10]:
%%time
def build_data_preproc_model_with_pca(vocab_size=5000):
    preproc_steps = [
        RegexTokenizer(inputCol="content", outputCol="all_words", pattern=r"\W"),
        StopWordsRemover(inputCol="all_words", outputCol="words"),
        CountVectorizer(inputCol="words", outputCol="tf_features", vocabSize=vocab_size),
        IDF(inputCol="tf_features", outputCol="tfidf_features"),
        PCA(inputCol="tfidf_features", outputCol="pca_features", k=100),
        
        ReviewContentTransformer(inputCol="content", outputCol="content_features"),
        ReviewWordsTransformer(inputCol="words", outputCol="word_features"),
        
        RegexTokenizer(inputCol="title", outputCol="all_title_words", pattern=r"\W"),
        StopWordsRemover(inputCol="all_title_words", outputCol="title_words"),
        CountVectorizer(inputCol="title_words", outputCol="title_tf_features", vocabSize=100),
        IDF(inputCol="title_tf_features", outputCol="title_tfidf_features"),
        PCA(inputCol="title_tfidf_features", outputCol="title_pca_features", k=100),        

        StringIndexer(inputCol="domain", outputCol="domain_indexed", handleInvalid='keep'),
        OneHotEncoder(inputCol="domain_indexed", outputCol="domain_feature"),
        
        StringIndexer(inputCol="author1", outputCol="author1_indexed", handleInvalid='keep'),
        OneHotEncoder(inputCol="author1_indexed", outputCol="author1_feature"),
        
        StringIndexer(inputCol="author2", outputCol="author2_indexed", handleInvalid='keep'),        
        OneHotEncoder(inputCol="author2_indexed", outputCol="author2_feature"),
        
        VectorAssembler(inputCols=["pca_features", "title_pca_features", "title_tfidf_features", 
                                   "content_features", "word_features", "domain_feature","author1_feature", "author2_feature"], 
                        outputCol="features")
    ]
    return Pipeline(stages=preproc_steps)

def build_data_preproc_model_without_pca(vocab_size=5000):
    preproc_steps = [
        RegexTokenizer(inputCol="content", outputCol="all_words", pattern=r"\W"),
        StopWordsRemover(inputCol="all_words", outputCol="words"),
        
        StringIndexer(inputCol="domain", outputCol="domain_indexed", handleInvalid='keep'),
        OneHotEncoder(inputCol="domain_indexed", outputCol="domain_feature"),
        
        StringIndexer(inputCol="author1", outputCol="author1_indexed", handleInvalid='keep'),
        OneHotEncoder(inputCol="author1_indexed", outputCol="author1_feature"),
        
        StringIndexer(inputCol="author2", outputCol="author2_indexed", handleInvalid='keep'),        
        OneHotEncoder(inputCol="author2_indexed", outputCol="author2_feature"),
        
        ReviewContentTransformer(inputCol="content", outputCol="content_features"),
        ReviewWordsTransformer(inputCol="words", outputCol="word_features"),
        
        VectorAssembler(inputCols=["tf_features", "content_features", "word_features", 
                                   "domain_feature","author1_feature", "author2_feature"], 
                        outputCol="features")
    ]
    return Pipeline(stages=preproc_steps)

CPU times: user 7 µs, sys: 0 ns, total: 7 µs
Wall time: 12.2 µs


In [None]:
%%time
print("**********Run Models with PCA Features**********")
# generate the features to be used for model training
preproc_model = build_data_preproc_model_with_pca(2000).fit(df_train)
df_train_pca = preproc_model.transform(df_train).select("label", "features")
df_test_pca = preproc_model.transform(df_test).select("label", "features")

**********Run Models with PCA Features**********
[Row(label=1, features=SparseVector(4920, {0: -1.545, 1: -10.1689, 2: -4.6902, 3: 1.731, 4: -3.6121, 5: 2.793, 6: -2.1007, 7: 0.9937, 8: -1.9308, 9: 2.6694, 10: 0.6508, 11: 0.7024, 12: 0.5505, 13: -3.1285, 14: -2.071, 15: 0.6434, 16: 1.8526, 17: 1.1357, 18: -2.4302, 19: -1.8551, 20: -0.1209, 21: 0.1592, 22: 0.391, 23: -1.0441, 24: -0.0244, 25: -2.1715, 26: -4.2558, 27: 0.0521, 28: 0.0537, 29: 0.9504, 30: 0.2269, 31: -1.1771, 32: -0.7934, 33: 0.4121, 34: -0.5933, 35: 2.1907, 36: -0.419, 37: -0.187, 38: -0.499, 39: -0.8866, 40: -0.7528, 41: -1.1466, 42: 0.9674, 43: 0.8339, 44: 0.4442, 45: -0.7517, 46: -0.006, 47: -1.0215, 48: -0.339, 49: 0.5213, 50: -1.6145, 51: -1.5744, 52: -0.9032, 53: -0.8733, 54: -0.396, 55: -0.4403, 56: -0.7994, 57: 0.5004, 58: -2.1218, 59: 0.5531, 60: 1.3719, 61: 2.4516, 62: -0.4075, 63: 1.6629, 64: 0.2582, 65: 0.0971, 66: 1.035, 67: -0.5552, 68: 1.4545, 69: 1.6198, 70: 1.99, 71: 0.3013, 72: 0.8973, 73: 0.6923, 74: -

In [None]:
%%time
lr_model = LogisticRegression(featuresCol='features', 
                              labelCol='label', 
                              predictionCol='prediction', 
                              probabilityCol='probability', 
                              rawPredictionCol='rawPrediction',
                              family='binomial', 
                              fitIntercept=True, 
                              threshold=0.5, 
                              standardization=False, 
                              maxIter=200, 
                              regParam=0.005, 
                              elasticNetParam=0, 
                              tol=1e-06, 
                              aggregationDepth=2)

lr_model = lr_model.fit(df_train_pca)
eval_model_perf(lr_model, df_test_pca)  

In [None]:
%%time
dt_model = DecisionTreeClassifier(featuresCol='features', 
                                  labelCol='label', 
                                  predictionCol='prediction', 
                                  probabilityCol='probability', 
                                  rawPredictionCol='rawPrediction', 
                                  maxDepth=10, maxBins=32, 
                                  minInstancesPerNode=1, 
                                  minInfoGain=0.0, 
                                  maxMemoryInMB=2048, 
                                  cacheNodeIds=True, 
                                  checkpointInterval=10,
                                  impurity='gini', 
                                  seed=666)

dt_model = dt_model.fit(df_train_pca)
eval_model_perf(dt_model, df_test_pca)

In [None]:
%%time
rf_model = RandomForestClassifier(featuresCol='features', 
                                  labelCol='label', 
                                  predictionCol='prediction', 
                                  probabilityCol='probability', 
                                  rawPredictionCol='rawPrediction',
                                  maxDepth=10, 
                                  maxBins=32, 
                                  minInstancesPerNode=1, 
                                  minInfoGain=0.0, 
                                  maxMemoryInMB=2048, 
                                  cacheNodeIds=True, 
                                  checkpointInterval=10, 
                                  impurity='gini', 
                                  numTrees=200, 
                                  featureSubsetStrategy='auto', 
                                  seed=666, 
                                  subsamplingRate=0.8)

rf_model = rf_model.fit(df_train_pca)
eval_model_perf(rf_model, df_test_pca)

In [None]:
%%time
gbt_model = GBTClassifier(featuresCol='features', 
                         labelCol='label', 
                         maxIter=250)

gbt_model = gbt_model.fit(df_train_pca)
eval_model_perf(gbt_model, df_test_pca)

In [None]:
%%time
mp_model = MultilayerPerceptronClassifier(featuresCol='features', 
                                          labelCol='label', 
                                          predictionCol='prediction', 
                                          layers=[4, 5, 4, 3],  
                                          maxIter=100, 
                                          blockSize=128, 
                                          seed=1234)    

    

mp_model = mp_model.fit(df_train_pca)
eval_model_perf(mp_model, df_test_pca)

In [None]:
%%time
print("**********Run Models without PCA Features**********")
# generate the features to be used for model training
preproc_model = build_data_preproc_model_without_pca(3000).fit(df_train)
df_train_wo_pca = preproc_model.transform(df_train).select("label", "features")
df_test_wo_pca = preproc_model.transform(df_test).select("label", "features")

**********Run Models without PCA Features**********
[Row(label=1, features=SparseVector(4645, {0: 4268.0, 1: 3398.0, 2: 126.0, 3: 0.0371, 4: 406.0, 5: 270.0, 6: 0.665, 94: 1.0}))]
[Row(label=1, features=SparseVector(4645, {0: 16668.0, 1: 13355.0, 2: 443.0, 3: 0.0332, 4: 1545.0, 5: 895.0, 6: 0.5793, 94: 1.0}))]
accuracy  =  0.9776730339866038
f1  =  0.9816465297766493
AUC = 0.9871456507711381
CPU times: user 333 ms, sys: 91.1 ms, total: 424 ms
Wall time: 4min 14s


DataFrame[label: int, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [None]:
%%time
lr_model = LogisticRegression(featuresCol='features', 
                              labelCol='label', 
                              predictionCol='prediction', 
                              probabilityCol='probability', 
                              rawPredictionCol='rawPrediction',
                              family='binomial', 
                              fitIntercept=True, 
                              threshold=0.5, 
                              standardization=False, 
                              maxIter=200, 
                              regParam=0.005, 
                              elasticNetParam=0, 
                              tol=1e-06, 
                              aggregationDepth=2)

lr_model = lr_model.fit(df_train_wo_pca)    
eval_model_perf(lr_model, df_test_wo_pca)

In [None]:
%%time
dt_model = DecisionTreeClassifier(featuresCol='features', 
                                  labelCol='label', 
                                  predictionCol='prediction', 
                                  probabilityCol='probability', 
                                  rawPredictionCol='rawPrediction', 
                                  maxDepth=10, maxBins=32, 
                                  minInstancesPerNode=1, 
                                  minInfoGain=0.0, 
                                  maxMemoryInMB=2048, 
                                  cacheNodeIds=True, 
                                  checkpointInterval=10,
                                  impurity='gini', 
                                  seed=666)

dt_model = dt_model.fit(df_train_wo_pca)
eval_model_perf(dt_model, df_test_wo_pca)

In [None]:
%%time
rf_model = RandomForestClassifier(featuresCol='features', 
                                  labelCol='label', 
                                  predictionCol='prediction', 
                                  probabilityCol='probability', 
                                  rawPredictionCol='rawPrediction',
                                  maxDepth=10, 
                                  maxBins=32, 
                                  minInstancesPerNode=1, 
                                  minInfoGain=0.0, 
                                  maxMemoryInMB=2048, 
                                  cacheNodeIds=True, 
                                  checkpointInterval=10, 
                                  impurity='gini', 
                                  numTrees=200, 
                                  featureSubsetStrategy='auto', 
                                  seed=666, 
                                  subsamplingRate=0.8)

rf_model = rf_model.fit(df_train_wo_pca)
eval_model_perf(rf_model, df_test_wo_pca)

In [None]:
%%time
gbt_model = GBTClassifier(featuresCol='features', 
                         labelCol='label', 
                         maxIter=250)

gbt_model = gbt_model.fit(df_train_wo_pca)
eval_model_perf(gbt_model, df_test_wo_pca)

In [None]:
%%time
mp_model = MultilayerPerceptronClassifier(featuresCol='features', 
                                          labelCol='label', 
                                          predictionCol='prediction', 
                                          layers=[4, 5, 4, 3],  
                                          maxIter=100, 
                                          blockSize=128, 
                                          seed=1234)

mp_model = mp_model.fit(df_train_wo_pca)
eval_model_perf(mp_model, df_test_wo_pca)

In [None]:
%%time
nb_model = NaiveBayes(featuresCol='features', 
                              labelCol='label', 
                              predictionCol='prediction', 
                              probabilityCol='probability', 
                              rawPredictionCol='rawPrediction', 
                              smoothing=1, 
                              modelType='multinomial')

nb_model = nb_model.fit(df_train_wo_pca)
eval_model_perf(nb_model, df_test_wo_pca)