In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import re
import math
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import  StringType, DoubleType, IntegerType, ArrayType


In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
def clean_text(c):
    c = F.lower(c)
    c = F.regexp_replace(c, "^rt ", "")
    c = F.regexp_replace(c, "(https?\://)\S+", "")
    c = F.regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
    #c = split(c, "\\s+") tokenization...
    return c



In [4]:
# Read the files
posTXT = spark.read.option("header", "true").option("inferSchema", "true").csv("True.csv")
negTXT = spark.read.option("header", "true").option("inferSchema", "true").csv("Fake.csv")

### Pre-processing of data

In [5]:
# Some elements in text column are empty, so we will drop them
# Clean the text using above utility function

posTXT = posTXT.select("text").dropna().withColumn("text", clean_text(F.col("text")))
negTXT = negTXT.select("text").dropna().withColumn("text", clean_text(F.col("text")))

In [6]:
# Concatenate both dataframes while adding a label column for marking true/fake
full_df = posTXT.withColumn("label", F.lit(1)).union(negTXT.withColumn("label", F.lit(0))).dropDuplicates().withColumn("document_id", F.monotonically_increasing_id())


In [7]:
# Tokenize the words
tokenizer = Tokenizer(inputCol="text", outputCol="vector")
vector_df = tokenizer.transform(full_df)

vector_df = vector_df.withColumn("vector", F.expr("filter(vector, x -> x != '')"))
# vector_df.show(5)


In [8]:
from pyspark.ml.feature import StopWordsRemover

# Define a list of stop words or use default list
remover = StopWordsRemover()
stopwords = remover.getStopWords() 

# Specify input/output columns
remover.setInputCol("vector")
remover.setOutputCol("vector_no_stopw")

# Transform existing dataframe with the StopWordsRemover
vector_no_stopw_df = remover.transform(vector_df).drop("vector")

# Display
# vector_no_stopw_df.printSchema()
# vector_no_stopw_df.show()

In [9]:
# hashingTF = HashingTF(inputCol="vector_no_stopw", outputCol="rawFeatures", numFeatures=32)
# featurizedData = hashingTF.transform(vector_no_stopw_df) 
# featurizedData.show(5)

### Q2: TF-IDF implementation

In [10]:
# default vocab size is 262144, removing the vocabsize will automatically take maximum size
vocab_size = 40 

# Calculating term frequency
cv = CountVectorizer(inputCol="vector_no_stopw", outputCol="rawFeatures", minTF=2.0, minDF=2.0, vocabSize=vocab_size)
cv_model = cv.fit(vector_no_stopw_df)
featurizedData = cv_model.transform(vector_no_stopw_df)
# featurizedData.show(5)

In [11]:
# Calculating inverse data frequency from term frequency
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=2)
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData) 
# rescaledData.show(5)

In [12]:
# total counts of words
import numpy as np

total_counts = rescaledData.select('rawFeatures').rdd\
                    .map(lambda row: row['rawFeatures'].toArray())\
                    .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])

# Vocabulary and it's count
vocabList = cv_model.vocabulary
d = {'vocabList':vocabList,'counts':total_counts}

spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).na.drop().show()

+----------+--------+
| vocabList|  counts|
+----------+--------+
|      said|112723.0|
|     trump| 99400.0|
|        us| 46437.0|
| president| 34159.0|
|    people| 26758.0|
|       one| 19924.0|
|   reuters| 11521.0|
|     state| 21064.0|
|      also| 16649.0|
|       new| 18978.0|
|    donald| 12242.0|
|    states| 17676.0|
|     house| 18282.0|
|government| 17579.0|
|republican| 17307.0|
|    united| 14919.0|
|      told| 11427.0|
|   clinton| 17686.0|
|     obama| 15648.0|
|     white| 14028.0|
+----------+--------+
only showing top 20 rows



In [13]:

def termsIdx2Term(vocabulary):
    def termsIdx2Term(termIndices):
        return [vocabulary[int(index)] for index in termIndices]
    return F.udf(termsIdx2Term, ArrayType(StringType()))

# Helper functions to extract indices and values from SparseMatrix type columns
indices_udf = F.udf(lambda vector: vector.indices.tolist(), ArrayType(IntegerType()))
values_udf = F.udf(lambda vector: vector.toArray().tolist(), ArrayType(DoubleType()))

# TFIDF is the product of tf, idf
tf_idf_udf = F.udf(lambda x_vec, y_vec: [x*y for x, y in zip(x_vec, y_vec)], ArrayType(DoubleType()))

In [14]:
# Extract the TF and TF-IDF values

tf_idf = rescaledData.withColumn('indices', indices_udf(F.col('rawFeatures')))\
           .withColumn('tf_values', values_udf(F.col('rawFeatures')))\
           .withColumn('idf_values', values_udf(F.col('features')))\
           .withColumn("terms", termsIdx2Term(vocabList)("indices"))\
           .withColumn("tfidf_values", tf_idf_udf(F.col("tf_values"), F.col("idf_values")))


In [15]:

main_df = tf_idf\
    .withColumn("word_index", F.array([F.lit(x) for x in vocabList]))\
    .withColumn("tmp", F.arrays_zip("word_index", "tf_values", "idf_values", "tfidf_values"))\
    .withColumn("tmp", F.explode("tmp"))\
    .select("document_id", "text", "label", F.col("tmp.word_index"), F.col("tmp.tf_values"), F.col("tmp.idf_values"), F.col("tmp.tfidf_values"))

# Print this if you want to see TF, IDF values.

# main_df.show()

### Q3: We do see an interesting connection between rare words and high tf-idf values

In [16]:
# This is the final output with sorted tf-idf values

main_df.select("label", "document_id", "text", "word_index", "tfidf_values").orderBy("tfidf_values", ascending=False).show()

+-----+-------------+--------------------+----------+------------------+
|label|  document_id|                text|word_index|      tfidf_values|
+-----+-------------+--------------------+----------+------------------+
|    0| 438086664346|wow what a list o...|     first|13768.321835306568|
|    0| 824633721023|patrick henningse...|   clinton|10505.923791244139|
|    0|1640677507272|fundamental trans...|     years|  9614.74156045434|
|    0|  17179869362|one of the ancill...|     state| 9223.714743325312|
|    0| 360777253053|thanks to obama s...|     state| 8962.062427137105|
|    0|1322849927357|shawn helton 21st...|   clinton| 8395.574085702672|
|    0| 438086664346|wow what a list o...| president| 7123.014543631126|
|    0| 429496729731|it s thursday jul...|     obama| 6716.152034085382|
|    0| 867583393971| patrick hennings...|   clinton|  5556.85225322004|
|    0| 867583393970| in response to t...|      news| 5499.028624532774|
|    0| 678604832916|hillary won t be ...|   clinto

### Q5: Use-case

A social network is interested in producing an engine for filtering fake news. For this purpose they want to run your algorithm on every post that is published. The goal is to verify the correctness of the news in the post as quickly as possible and publish it if it is correct and archived otherwise. Describe the technological difficulties of this type of system, especially regarding the storage of unstructured data and real-time information processing.