In [None]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:6 http://security.ubuntu.com/ubuntu bionic-security InRelease
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:9 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Reading package lists... Done


In [None]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql import SparkSession
import findspark
findspark.init()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
spark = SparkSession.builder.appName("Tokens").getOrCreate()

In [None]:
news_spark = spark.read.csv("/content/drive/My Drive/pyspark_fake_cleaned.csv",sep ='\t', header = True)

In [None]:
news_spark.show(50)

+---+--------------------+--------------------+-----+
|_c0|               title|                text|label|
+---+--------------------+--------------------+-----+
|  0|LAW ENFORCEMENT O...|No comment is exp...|    1|
|  1|                NONE|Did they post the...|    1|
|  2|UNBELIEVABLE! OBA...| Now, most of the...|    1|
|  3|Bobby Jindal, rai...|A dozen political...|    0|
|  4|SATAN 2: Russia u...|The RS-28 Sarmat ...|    1|
|  5|About Time! Chris...|All we can say on...|    1|
|  6|DR BEN CARSON TAR...|DR. BEN CARSON TE...|    1|
|  7|HOUSE INTEL CHAIR...|                NONE|    1|
|  8|Sports Bar Owner ...|The owner of the ...|    1|
|  9|Latest Pipeline L...|FILE – In this Se...|    1|
| 10| GOP Senator Just...|"The most punchab...|    1|
| 11|May Brexit offer ...|BRUSSELS (Reuters...|    0|
| 12|Schumer calls on ...|WASHINGTON (Reute...|    0|
| 13|WATCH: HILARIOUS ...|After watching th...|    1|
| 14|No Change Expecte...|As more and more ...|    0|
| 15|Billionaire Odebr...|RI

In [None]:
# Tokenize sentences
tok_title = Tokenizer(inputCol="title", outputCol="Tok_title")
tok_text = Tokenizer(inputCol = "text", outputCol= "Tok_text")

In [None]:
def word_list_length(word_list):
    return len(word_list)
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
# Create a user defined function
count_tokens = udf(word_list_length, IntegerType())

In [None]:
tokenized_df = tok_title.transform(news_spark)
tokenized_df = tok_text.transform(tokenized_df)
tokenized_df = tokenized_df.withColumn("title tokens", count_tokens(col("Tok_title")))
tokenized_df =  tokenized_df.withColumn("text tokens", count_tokens(col("Tok_text")))
tokenized_df = tokenized_df.drop("title","text")
tokenized_df.show(2)

+---+-----+--------------------+--------------------+------------+-----------+
|_c0|label|           Tok_title|            Tok_text|title tokens|text tokens|
+---+-----+--------------------+--------------------+------------+-----------+
|  0|    1|[law, enforcement...|[no, comment, is,...|          18|        941|
|  1|    1|              [none]|[did, they, post,...|           1|          8|
+---+-----+--------------------+--------------------+------------+-----------+
only showing top 2 rows



In [None]:
from pyspark.ml.feature import StopWordsRemover
stop_title = StopWordsRemover(inputCol="Tok_title", outputCol="stop_title")
stop_text = StopWordsRemover(inputCol="Tok_text", outputCol="stop_text")
filtered = stop_title.transform(tokenized_df)
filtered = stop_text.transform(filtered)
filtered.show()

+---+-----+--------------------+--------------------+------------+-----------+--------------------+--------------------+
|_c0|label|           Tok_title|            Tok_text|title tokens|text tokens|          stop_title|           stop_text|
+---+-----+--------------------+--------------------+------------+-----------+--------------------+--------------------+
|  0|    1|[law, enforcement...|[no, comment, is,...|          18|        941|[law, enforcement...|[comment, expecte...|
|  1|    1|              [none]|[did, they, post,...|           1|          8|              [none]|[post, votes, hil...|
|  2|    1|[unbelievable!, o...|[, now,, most, of...|          18|         38|[unbelievable!, o...|[, now,, demonstr...|
|  3|    0|[bobby, jindal,, ...|[a, dozen, politi...|          16|       1290|[bobby, jindal,, ...|[dozen, political...|
|  4|    1|[satan, 2:, russi...|[the, rs-28, sarm...|          16|        329|[satan, 2:, russi...|[rs-28, sarmat, m...|
|  5|    1|[about, time!, ch...|

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
hashingTF1 = HashingTF(inputCol="stop_title", outputCol="hash_title")
hashingTF2 = HashingTF(inputCol="stop_text", outputCol="hash_text")
hashed_df = hashingTF1.transform(filtered)
hashed_df = hashingTF2.transform(hashed_df)

hashed_df.show()

+---+-----+--------------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+--------------------+
|_c0|label|           Tok_title|            Tok_text|title tokens|text tokens|          stop_title|           stop_text|          hash_title|           hash_text|
+---+-----+--------------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+--------------------+
|  0|    1|[law, enforcement...|[no, comment, is,...|          18|        941|[law, enforcement...|[comment, expecte...|(262144,[19684,22...|(262144,[619,992,...|
|  1|    1|              [none]|[did, they, post,...|           1|          8|              [none]|[post, votes, hil...|(262144,[110743],...|(262144,[84100,10...|
|  2|    1|[unbelievable!, o...|[, now,, most, of...|          18|         38|[unbelievable!, o...|[, now,, demonstr...|(262144,[17893,31...|(262144,[3564,538...|
|  3|    0|[bobby, jin

In [None]:
idf1 = IDF(inputCol="hash_title", outputCol="idf_title")
idf2 = IDF(inputCol="hash_text", outputCol="idf_text")
idfModel = idf1.fit(hashed_df)
rescaledData = idfModel.transform(hashed_df)
rescaledData.select("hash_title", "hash_text", "idf_title").show()
idfModel = idf2.fit(rescaledData)
rescaledData = idfModel.transform(rescaledData)
rescaledData.select("hash_title", "hash_text", "idf_title", "idf_text").show()

+--------------------+--------------------+--------------------+
|          hash_title|           hash_text|           idf_title|
+--------------------+--------------------+--------------------+
|(262144,[19684,22...|(262144,[619,992,...|(262144,[19684,22...|
|(262144,[110743],...|(262144,[84100,10...|(262144,[110743],...|
|(262144,[17893,31...|(262144,[3564,538...|(262144,[17893,31...|
|(262144,[54679,11...|(262144,[511,1546...|(262144,[54679,11...|
|(262144,[3571,171...|(262144,[161,921,...|(262144,[3571,171...|
|(262144,[30367,72...|(262144,[2162,227...|(262144,[30367,72...|
|(262144,[36217,83...|(262144,[29066,95...|(262144,[36217,83...|
|(262144,[20326,31...|(262144,[110743],...|(262144,[20326,31...|
|(262144,[31895,66...|(262144,[960,6957...|(262144,[31895,66...|
|(262144,[41421,59...|(262144,[1546,160...|(262144,[41421,59...|
|(262144,[47685,92...|(262144,[751,1512...|(262144,[47685,92...|
|(262144,[18923,38...|(262144,[115,1772...|(262144,[18923,38...|
|(262144,[36525,44...|(26

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
clean = VectorAssembler(inputCols=['idf_text', 'text tokens'], outputCol='features')

In [None]:
cleaned = clean.transform(rescaledData)

In [None]:

cleaned.select('label','features').show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    1|(262145,[619,992,...|
|    1|(262145,[84100,10...|
|    1|(262145,[3564,538...|
|    0|(262145,[511,1546...|
|    1|(262145,[161,921,...|
|    1|(262145,[2162,227...|
|    1|(262145,[29066,95...|
|    1|(262145,[110743,2...|
|    1|(262145,[960,6957...|
|    1|(262145,[1546,160...|
|    1|(262145,[751,1512...|
|    0|(262145,[115,1772...|
|    0|(262145,[1096,153...|
|    1|(262145,[2437,523...|
|    0|(262145,[1115,253...|
|    0|(262145,[2015,230...|
|    1|(262145,[1619,392...|
|    0|(262145,[2622,594...|
|    1|(262145,[1519,154...|
|    0|(262145,[3148,356...|
+-----+--------------------+
only showing top 20 rows



In [None]:
cleaned.select("label").distinct().collect()

[Row(label='0'), Row(label='1')]

In [None]:
cleaned = cleaned.withColumn("label", cleaned["label"].cast('numeric'))

In [None]:
cleaned = cleaned.select("label","features")

In [None]:
training, testing = cleaned.randomSplit([0.7, 0.3], 21)

In [None]:
from pyspark.ml.classification import NaiveBayes
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [None]:
test_results = predictor.transform(testing)
test_results.show(5)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    0|(262145,[4,666,12...|[-39327.190298613...|           [1.0,0.0]|       0.0|
|    0|(262145,[6,154,38...|[-10563.451320627...|[1.0,2.0315050853...|       0.0|
|    0|(262145,[6,619,64...|[-34415.988250408...|[1.0,2.3592609958...|       0.0|
|    0|(262145,[6,619,15...|[-37983.562046146...|           [1.0,0.0]|       0.0|
|    0|(262145,[6,921,15...|[-23047.776539665...|           [1.0,0.0]|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
acc_eval = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol = 'prediction')
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting fake news was : %f" % acc)

Accuracy of model at predicting fake news was : 0.938895


In [None]:
news_df["title"].str.split(expand=True).stack().value_counts()