# Document Type Classification using PySpark

#### The Project is based on Distributed Big Data Technology.

The objective of this project is to create a Machine Learning(ML) model by performing data intensive computing using Pyspark which involves handling data acquired from NewYorkTimes API.

#### Document Classification :
Document Classification analogous to general classification of instances, deals with assigning labels to documents. The documents can be of different length, structure(can be written by different authors using variety of writing styles) and source.
Here multi-class classification is used to classify the document types into the respective categories they belong to. Different types of models/classifiers are defined to predict the categories of the articles and their accuracy are studied.

One of the main goals of this project is to optimize the handling and processing of huge data and to observe it using Spark environment .

#### Apache Spark:
Apache Spark is a Big Data framework which operates on distributed data collections. It furnishes in-memory computations for improved and quicker data processing over MapReduce. It is a cluster-computing framework which is designed for faster data computations

## Setting Spark

In [1]:
import findspark
findspark.init('C:/extras/spark')

## Creating Spark session

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder\
                          .master("local")\
                          .appName('DOC_classifier')\
                          .getOrCreate()
sc=spark.sparkContext

## Importing necessary Libraries 

In [3]:
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import nltk
from nltk.corpus import stopwords
import pandas as pd

## Setting SQL Context 

A SQLContext can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.

In [4]:
sqlContext = SQLContext(sc)

## Loading Data

#### Data consists of text files based on following class:
#### 1) Fashion 2) Technology 3) Science 4) Movie

In [5]:
fas_df = spark.read.text('Data/Fashion/*')
fas_df = fas_df.withColumn("category",lit("Fashion"))

tech_df = spark.read.text('Data/Technology/*')
tech_df = tech_df.withColumn("category",lit("Technology"))

sci_df = spark.read.text('Data/Science/*')
sci_df = sci_df.withColumn("category",lit("science"))

mov_df = spark.read.text('Data/Movie/*')
mov_df = mov_df.withColumn("category",lit("Movie"))


merge_df1 = fas_df.union(tech_df)
merge_df2 = merge_df1.union(sci_df)
merge_df3 = merge_df2.union(mov_df)

#### Final Dataset

In [6]:
data = merge_df3.select([column for column in merge_df3.columns])
data.show(5)

+--------------------+--------+
|               value|category|
+--------------------+--------+
|   Sections SEARC...| Fashion|
|   Sections SEARC...| Fashion|
|   Sections SEARC...| Fashion|
|   Sections SEARC...| Fashion|
|   Sections SEARC...| Fashion|
+--------------------+--------+
only showing top 5 rows



#### Final Unknown Dataset

In [7]:
Fas_udf = spark.read.text('Data/unknown/Fashion/*')
Fas_udf = Fas_udf.withColumn("category",lit("Fashion"))

science_udf = spark.read.text('Data/unknown/science/*')
science_udf = science_udf.withColumn("category",lit("science"))

tech_udf = spark.read.text('Data/unknown/technology/*')
tech_udf = tech_udf.withColumn("category",lit("technology"))

movie_udf = spark.read.text('Data/unknown/Movie/*')
movie_udf = movie_udf.withColumn("category",lit("Movie"))

merge_udf1 = Fas_udf.union(science_udf)
merge_udf2 = merge_udf1.union(tech_udf)
merge_udf3 = merge_udf2.union(movie_udf)

unknown_data = merge_udf3.select([column for column in merge_udf3.columns])
unknown_data.show(5)

+--------------------+--------+
|               value|category|
+--------------------+--------+
|   Sections SEARC...| Fashion|
|   Sections SEARC...| Fashion|
|   Sections SEARC...| Fashion|
|   Sections SEARC...| Fashion|
|   Sections SEARC...| Fashion|
+--------------------+--------+
only showing top 5 rows



## Setting Metric 

In [8]:
data_m = {'metric':['Train_Accuracy', 'Test_Accuracy','Unknown_Accuracy']}
metric_df=pd.DataFrame(data_m)

In [9]:
metric_df.set_index(['metric'])

Train_Accuracy
Test_Accuracy
Unknown_Accuracy


## Data Manipulation using Pyspark libraries 

### RegexTokenizer  

A regex based tokenizer that extracts tokens either by using the provided regex pattern (in Java dialect) to split the text (default) or repeatedly matching the regex (if gaps is false). Optional parameters also allow filtering tokens using a minimal length. It returns an array of strings that can be empty.

In [10]:
regexTokenizer = RegexTokenizer(inputCol="value", outputCol="words", pattern="\\W")

### Downloading Stopwords

In [11]:
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\prajw\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

### StopWordsRemover

A feature transformer that filters out stop words from input. Since 3.0.0, StopWordsRemover can filter out multiple columns at once by setting the inputCols parameter. Note that when both the inputCol and inputCols parameters are set, an Exception will be thrown.

In [12]:
add_stopwords=nltk.corpus.stopwords.words('english')
add_stopwords_1 = ["nytimes","com","sense","day","common","business","todays","said","food","review","sunday","letters","politics","events","terms","services","years","contributors","companies","listings","applications","tax","trump","president","contributing","make","think","woman","federal","called","system","found","american","sale","headline","arts","times","subscriptions","choices","privacy","take","jobs","books","account","accounts","television","nyc","writers","multimedia","journeys","editorials","photography","automobiles","paper","city","tool","sports","weddings","columnists","contribution","even","nyt","obituary","state","travel","advertise","pm","street","go","corrections","saturday","company","dance","states","real","movies","estate","percent","music","tech","living","science","fashion","please","opinion","art","new","york","time","u","wa","reading","ha","video","image","photo","credit","edition","magazine","oped","could","crossword","mr","term","feedback","index","get","also","b","help","year","health","united","education","week","think","guide","event","two","first","subscription","service","cut","is","nytimescom","section","sections","Sections","Home","home","Search","search","Skip","skip","content","navigation","View","view","mobile","version","Subscribe","subscribe","Now","now","Log","log","In","in","setting","settings","Site","site","Loading","loading","article","next","previous","Advertisement","ad","advertisement","Supported","supported","by","Share","share","Page","page","Continue","continue","main","story","newsletter","Sign","Up","Manage","email","preferences","Not","you","opt","out","contact","us","anytime","thank","subscribing","see","more","email"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered1").setStopWords(add_stopwords)
stopwordsRemover1 = StopWordsRemover(inputCol="filtered1", outputCol="filtered").setStopWords(add_stopwords_1)


# StringIndexer, HashingTF and IDF

StringIndexer :A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values.

Hashingtf:Maps a sequence of terms to their term frequencies using the hashing trick. Currently we use Austin Appleby’s MurmurHash 3 algorithm (MurmurHash3_x86_32) to calculate the hash code value for the term object. Since a simple modulo is used to transform the hash function to a column index, it is advisable to use a power of two as the numFeatures parameter; otherwise the features will not be mapped evenly to the columns.

IDF: he standard formulation is used: idf = log((m + 1) / (d(t) + 1)), where m is the total number of documents and d(t) is the number of documents that contain term t.This implementation supports filtering out terms which do not appear in a minimum number of documents (controlled by the variable minDocFreq). For terms that are not in at least minDocFreq documents, the IDF is found as 0, resulting in TF-IDFs of 0.

In [13]:
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

Pipeline :A simple pipeline, which acts as an estimator. A Pipeline consists of a sequence of stages, each of which is either an Estimator or a Transformer. When Pipeline.fit() is called, the stages are executed in order. If a stage is an Estimator, its Estimator.fit() method will be called on the input dataset to fit a model. Then the model, which is a transformer, will be used to transform the dataset as the input to the next stage. If a stage is a Transformer, its Transformer.transform() method will be called to produce the dataset for the next stage. The fitted model from a Pipeline is a PipelineModel, which consists of fitted models and transformers, corresponding to the pipeline stages. If stages is an empty list, the pipeline acts as an identity transformer.

In [14]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover,stopwordsRemover1, hashingTF, idf, label_stringIdx])

## Data Transformation and splitting

In [15]:
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)


## Logistic Rgression 

Logistic regression is a popular method to predict a categorical response. It is a special case of Generalized Linear models that predicts the probability of the outcomes. In spark.ml logistic regression can be used to predict a binary outcome by using binomial logistic regression, or it can be used to predict a multiclass outcome by using multinomial logistic regression. Use the family parameter to select between these two algorithms, or leave it unset and Spark will infer the correct variant.

Multinomial logistic regression can be used for binary classification by setting the family param to “multinomial”. It will produce two sets of coefficients and two intercepts.


In [16]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

### Performance on train data

In [17]:
predictions_train = lrModel.transform(trainingData)
predictions_train.filter(predictions_train['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)


+------------------------------+--------+------------------------------+-----+----------+
|                         value|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|   Sections SEARCH Skip to ...|   Movie|[0.9822394802109057,0.00235...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.9811567547470802,0.00302...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.9770537920390813,0.00631...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.9763162809849949,0.01115...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.9762569248827576,0.00562...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.970554468408183,0.009335...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.9679333738863134,0.00870...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.9669405158721934,0.00738...|  0.0|       0.0|
|   Sectio

### Train Accuracy 

In [18]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
train_accuracy= evaluator.evaluate(predictions_train)*100
print("-------Accuracy of train data using logistic_regression-----: " + str(evaluator.evaluate(predictions_train)*100)+"%")

-------Accuracy of train data using logistic_regression-----: 98.76222962433778%


### Performance on Test Data and Unknown Data

In [19]:
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)


+------------------------------+--------+------------------------------+-----+----------+
|                         value|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|   Sections SEARCH Skip to ...|   Movie|[0.9754632623041882,0.01057...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.9431747402417925,0.02006...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.9172397458630736,0.02471...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.9053576601908154,0.02005...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.8962780254409616,0.02195...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.8507710146790355,0.01021...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.8454293725021643,0.05217...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|   Movie|[0.8197670154180591,0.01462...|  0.0|       0.0|
|   Sectio

In [20]:
pipelineFit2 = pipeline.fit(unknown_data)
unknown_dataset = pipelineFit2.transform(unknown_data)

In [21]:
predictions2 = lrModel.transform(unknown_dataset)
predictions2.filter(predictions2['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)
predictions2.show(3)

+-----+--------+-----------+-----+----------+
|value|category|probability|label|prediction|
+-----+--------+-----------+-----+----------+
+-----+--------+-----------+-----+----------+

+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|               value|category|               words|           filtered1|            filtered|         rawFeatures|            features|label|       rawPrediction|         probability|prediction|
+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|   Sections SEARC...| Fashion|[sections, search...|[sections, search...|[today, industry,...|(1000,[10,12,43,5...|(1000,[10,12,43,5...|  3.0|[0.01492090532267...|[0.21386014488012...|       1.0|
|   Sections SEARC...| Fashion|

### Test and Unknown Accuracy

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("-------Accuracy of test data using logistic_regression-----: " + str(evaluator.evaluate(predictions)*100)+"%")
test_accuracy= evaluator.evaluate(predictions)*100

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("-------Accuracy of unknown data using logistic_regression-----: " + str(evaluator.evaluate(predictions2)*100)+"%")
u_accuracy= evaluator.evaluate(predictions2)*100


-------Accuracy of test data using logistic_regression-----: 64.6386141204794%


In [None]:
metric_df['Logistic Regression'] = [train_accuracy,test_accuracy,u_accuracy]

##  Naive Bayes classifiers

Naive Bayes classifiers are a family of simple probabilistic, multiclass classifiers based on applying Bayes’ theorem with strong (naive) independence assumptions between every pair of features.

Naive Bayes can be trained very efficiently. With a single pass over the training data, it computes the conditional probability distribution of each feature given each label. For prediction, it applies Bayes’ theorem to compute the conditional probability distribution of each label given an observation.

In [None]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)

### Performace on Train Data 

In [None]:
predictions_train = model.transform(trainingData)
predictions_train.filter(predictions_train['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

### Train Accuracy

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
train_accuracy= evaluator.evaluate(predictions_train)*100
print("-------Accuracy of train data using Naive Bayes Classifier-----: " + str(evaluator.evaluate(predictions_train)*100)+"%")

### Performance on Test Data and Unknown Data

In [None]:
predictions3 = model.transform(testData)
predictions3.filter(predictions3['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)
predictions3.show(10)



In [None]:
predictions4 = model.transform(unknown_dataset)
predictions4.filter(predictions4['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)
predictions4.show(5)

### Test and Unknown Accuracy

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("-------Accuracy of test data using naive_bayes-----: " + str(evaluator.evaluate(predictions3)*100)+"%")
test_accuracy= evaluator.evaluate(predictions3)*100
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("-------Accuracy of unknown data using naive_bayes-----: " + str(evaluator.evaluate(predictions4)*100)+"%")
u_accuracy= evaluator.evaluate(predictions4)*100

In [None]:
metric_df['Naive Bayes'] = [train_accuracy,test_accuracy,u_accuracy]

##  Decision tree classifier

Decision trees and their ensembles are popular methods for the machine learning tasks of classification and regression. Decision trees are widely used since they are easy to interpret, handle categorical features, extend to the multiclass classification setting, do not require feature scaling, and are able to capture non-linearities and feature interactions. Tree ensemble algorithms such as random forests and boosting are among the top performers for classification and regression tasks.

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
pipelineFit_dt = pipeline.fit(data)
dataset = pipelineFit_dt.transform(data)
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
dt = DecisionTreeClassifier(impurity="gini")
dtModel = dt.fit(trainingData)

### Performance on Train data

In [None]:
predictions_dt = dtModel.transform(trainingData)
predictions_dt.filter(predictions_dt['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

### Train Accuracy

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("-------Accuracy of train data using Decision Tree-----: " + str(evaluator.evaluate(predictions_dt)*100)+"%")
train_accuracy= evaluator.evaluate(predictions_dt)*100

### Performace on Test and Unknown Data

In [None]:
predictions = dtModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [None]:
pipelineFit2 = pipeline.fit(unknown_data)
unknown_dataset = pipelineFit2.transform(unknown_data)
predictions2 = dtModel.transform(unknown_dataset)
predictions2.filter(predictions2['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

### Test and Unknown Accuracy

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("-------Accuracy of test data using Decision Tree-----: " + str(evaluator.evaluate(predictions)*100)+"%")
test_accuracy= evaluator.evaluate(predictions)*100
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("-------Accuracy of unknown data using Decision Tree-----: " + str(evaluator.evaluate(predictions2)*100)+"%")
u_accuracy= evaluator.evaluate(predictions2)*100

In [None]:
metric_df['Decision Tree'] = [train_accuracy,test_accuracy,u_accuracy]

## Random forest classifier 

Random forests are ensembles of decision trees. Random forests combine many decision trees in order to reduce the risk of overfitting. The spark.ml implementation supports random forests for binary and multiclass classification and for regression, using both continuous and categorical features.

In [None]:
from pyspark.ml.classification import RandomForestClassifier
pipelineFit_rf = pipeline.fit(data)
dataset = pipelineFit_rf.transform(data)
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
rf = RandomForestClassifier(numTrees=50)
rfModel = rf.fit(trainingData)

### Performance on Train Data

In [None]:

predictions_rf = rfModel.transform(trainingData)
predictions_rf.filter(predictions_rf['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

### Train Accuracy

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("-------Accuracy of train data using Random Forest-----: " + str(evaluator.evaluate(predictions_rf)*100)+"%")
train_accuracy= evaluator.evaluate(predictions_rf)*100


### Performance on Test and Unknown Data

In [None]:
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [None]:

pipelineFit2 = pipeline.fit(unknown_data)
unknown_dataset = pipelineFit2.transform(unknown_data)
predictions2 = rfModel.transform(unknown_dataset)
predictions2.filter(predictions2['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

### Test and Unknown Accuracy 

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("-------Accuracy of test data using Random Forest-----: " + str(evaluator.evaluate(predictions)*100)+"%")
test_accuracy= evaluator.evaluate(predictions)*100
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("-------Accuracy of unknown data using Random Forest-----: " + str(evaluator.evaluate(predictions2)*100)+"%")
u_accuracy= evaluator.evaluate(predictions2)*100

In [None]:
metric_df['Random Forest'] = [train_accuracy,test_accuracy,u_accuracy]

##  One-vs-Rest

OneVsRest is an example of a machine learning reduction for performing multiclass classification given a base classifier that can perform binary classification efficiently. It is also known as “One-vs-All".

In [None]:
from pyspark.ml.classification import OneVsRest
pipelineFit_svc = pipeline.fit(data)
dataset = pipelineFit_svc.transform(data)
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
ovr = OneVsRest(classifier=lr)
ovrModel = ovr.fit(trainingData)

### Performance on Train Data

In [None]:
predictions_svc = ovrModel.transform(trainingData)

### Train Accuracy

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
train_accuracy= evaluator.evaluate(predictions_svc)*100
print("-------Accuracy of train data using One-vs-Rest-----: " + str(train_accuracy)+"%")


### Performance on Test and Unknown Data

In [None]:
predictions = ovrModel.transform(testData)

In [None]:
pipelineFit2 = pipeline.fit(unknown_data)
unknown_dataset = pipelineFit2.transform(unknown_data)
predictions2 = ovrModel.transform(unknown_dataset)

### Test and Unknown Accuracy

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
test_accuracy= evaluator.evaluate(predictions)*100
print("-------Accuracy of test data using One-vs-Rest-----: " + str(test_accuracy)+"%")

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
u_accuracy= evaluator.evaluate(predictions2)*100
print("-------Accuracy of unknown data using One-vs-Rest-----: " + str(u_accuracy)+"%")


In [None]:
metric_df['One-vs-Rest'] = [train_accuracy,test_accuracy,u_accuracy]

## Metric Display

In [None]:
metric_df