Connect to Spark and read in the data from the csv file (using the Amazon Web Services S3 url)

In [None]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.1.2' #<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com] [1 InRelease 14.2 kB/88.7 kB 16%] [Waitin                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/u

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Hashing1").getOrCreate()

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://butlerunit22.s3.us-east-2.amazonaws.com/all_articles_final.csv"
spark.sparkContext.addFile(url)
news_import_df = spark.read.csv(SparkFiles.get("all_articles_final.csv"), sep=",", header=True)

# Show DataFrame
news_import_df =.show()

+---+----+---------+--------------------+--------------------+-------+-----------------+
|_c0|type|news_type|               title|                text|subject|             date|
+---+----+---------+--------------------+--------------------+-------+-----------------+
|  0|   0|     fake| Donald Trump Sen...|Donald Trump just...|   News|December 31, 2017|
|  1|   0|     fake| Drunk Bragging T...|House Intelligenc...|   News|December 31, 2017|
|  2|   0|     fake| Sheriff David Cl...|On Friday, it was...|   News|December 30, 2017|
|  3|   0|     fake| Trump Is So Obse...|On Christmas day,...|   News|December 29, 2017|
|  4|   0|     fake| Pope Francis Jus...|Pope Francis used...|   News|December 25, 2017|
|  5|   0|     fake| Racist Alabama C...|The number of cas...|   News|December 25, 2017|
|  6|   0|     fake| Fresh Off The Go...|Donald Trump spen...|   News|December 23, 2017|
|  7|   0|     fake| Trump Said Some ...|In the wake of ye...|   News|December 23, 2017|
|  8|   0|     fake| 

In [None]:
news_import_df = news_import_df.dropDuplicates()

mf = news_import_df.groupby('news_type')
news_import_df.count()


44230

In [None]:
cleaned = news_import_df.filter((news_import_df['news_type'] == 'fake') | (news_import_df['news_type'] == 'true') )
cleaned.count()

44226

In [None]:
mfc = cleaned.groupby('news_type')
mfc.count().show()

+---------+-----+
|news_type|count|
+---------+-----+
|     fake|22838|
|     true|21388|
+---------+-----+



In [None]:
# export into Pandas
import pandas as pd

pd_df = cleaned.toPandas()


In [None]:
from sklearn import preprocessing
# label_encoder object knows how to understand word labels. 
label_encoder = preprocessing.LabelEncoder()
# Encode labels in column 'Country'. 
pd_df['news_code']= label_encoder.fit_transform(pd_df['news_type']) 
print(pd_df.head())

   _c0 type news_type  ... subject              date news_code
0  203    0      fake  ...    News  October 20, 2017         0
1  226    0      fake  ...    News  October 12, 2017         0
2  461    0      fake  ...    News   August 31, 2017         0
3  617    0      fake  ...    News    August 9, 2017         0
4  683    0      fake  ...    News    August 2, 2017         0

[5 rows x 8 columns]


In [None]:
data = pd_df["text"]
labels = pd_df['news_code']

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.naive_bayes import MultinomialNB
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.model_selection import train_test_split, GridSearchCV

text_clf = Pipeline([('vect', CountVectorizer()),
                     ('tfidf', TfidfTransformer()),
                     ('clf', MultinomialNB())])

tuned_parameters = {
    'vect__ngram_range': [(1, 1), (1, 2), (2, 2)],
    'tfidf__use_idf': (True, False),
    'tfidf__norm': ('l1', 'l2'),
    'clf__alpha': [1, 1e-1, 1e-2]
}

x_train, x_test, y_train, y_test = train_test_split(data, labels, test_size=0.33, random_state=42)

In [None]:
from sklearn.metrics import classification_report

clf = GridSearchCV(text_clf, tuned_parameters, cv=10, verbose=3)#, scoring=score)


clf.fit(x_train, y_train)

classification_report(y_test, clf.predict(x_test), digits=4)

Fitting 10 folds for each of 36 candidates, totalling 360 fits
[CV] clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1) 


[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.


[CV]  clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1), score=0.908, total=   9.6s
[CV] clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1) 


[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    9.6s remaining:    0.0s


[CV]  clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1), score=0.918, total=   9.6s
[CV] clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1) 


[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:   19.1s remaining:    0.0s


[CV]  clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1), score=0.903, total=   9.6s
[CV] clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1) 
[CV]  clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1), score=0.915, total=   9.7s
[CV] clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1) 
[CV]  clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1), score=0.912, total=   9.6s
[CV] clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1) 
[CV]  clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1), score=0.904, total=   9.6s
[CV] clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1) 
[CV]  clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1), score=0.915, total=   9.5s
[CV] clf__alpha=1, tfidf__norm=l1, tfidf__use_idf=True, vect__ngram_range=(1, 1) 
[CV]  clf__alpha=1, tfidf__norm=l1, tfid

[Parallel(n_jobs=1)]: Done 360 out of 360 | elapsed: 175.4min finished


'              precision    recall  f1-score   support\n\n           0     0.9679    0.9796    0.9737      7541\n           1     0.9779    0.9653    0.9715      7054\n\n    accuracy                         0.9727     14595\n   macro avg     0.9729    0.9724    0.9726     14595\nweighted avg     0.9727    0.9727    0.9727     14595\n'

In [None]:
predictions = clf.predict(x_test)

In [None]:
print(classification_report(y_test, predictions, digits=4, target_names=["fake", "true"]))

              precision    recall  f1-score   support

        fake     0.9679    0.9796    0.9737      7541
        true     0.9779    0.9653    0.9715      7054

    accuracy                         0.9727     14595
   macro avg     0.9729    0.9724    0.9726     14595
weighted avg     0.9727    0.9727    0.9727     14595



In [None]:
clf.best_estimator_

Pipeline(memory=None,
         steps=[('vect',
                 CountVectorizer(analyzer='word', binary=False,
                                 decode_error='strict',
                                 dtype=<class 'numpy.int64'>, encoding='utf-8',
                                 input='content', lowercase=True, max_df=1.0,
                                 max_features=None, min_df=1,
                                 ngram_range=(2, 2), preprocessor=None,
                                 stop_words=None, strip_accents=None,
                                 token_pattern='(?u)\\b\\w\\w+\\b',
                                 tokenizer=None, vocabulary=None)),
                ('tfidf',
                 TfidfTransformer(norm='l2', smooth_idf=True,
                                  sublinear_tf=False, use_idf=False)),
                ('clf',
                 MultinomialNB(alpha=0.01, class_prior=None, fit_prior=True))],
         verbose=False)

In [None]:
clf.best_params_


{'clf__alpha': 0.01,
 'tfidf__norm': 'l2',
 'tfidf__use_idf': False,
 'vect__ngram_range': (2, 2)}

In [None]:
clf.best_score_

0.9695253834630708

In [None]:
import joblib

In [None]:
joblib.dump(clf, "class_model.sav")

['class_model.sav']

In [None]:
!pip list


In [None]:
!ls -lh

total 811M
-rw-r--r--  1 root root 169M Aug 21 19:12 class_model.sav
drwxr-xr-x  1 root root 4.0K Aug 13 13:35 sample_data
drwxr-xr-x 13 1000 1000 4.0K May 24 05:00 spark-3.1.2-bin-hadoop2.7
-rw-r--r--  1 root root 215M May 24 05:01 spark-3.1.2-bin-hadoop2.7.tgz
-rw-r--r--  1 root root 215M May 24 05:01 spark-3.1.2-bin-hadoop2.7.tgz.1
-rw-r--r--  1 root root 215M May 24 05:01 spark-3.1.2-bin-hadoop2.7.tgz.2


For connection in app.py for the project, need to used cloudpickle.dump in place of joblib.dump

In [None]:
import joblib

In [None]:
model = joblib.load('/content/drive/MyDrive/Colab Notebooks/class_model.sav')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import cloudpickle

In [None]:
with open("pickle_model.sav", mode="wb") as file:
  cloudpickle.dump(model, file)