   # News Article Classification
  
  *This notebook demonstrates the implementation of News article classification in Spark using mllib*

1. The articles are downloaded from the New York times using NYT API from python. The code for that is avaliable in another notebook. 
2. The Downloaded data is in **news_data** folder with individual categories as the names of the subfolders
3. The articles are in text format

##  Importing Libraries

- In this step we import all the required libraries from pyspark.

In [70]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF,StopWordsRemover,IDF,Tokenizer
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier,NaiveBayes
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


 ## Spark Context

- Here we instantiate the spark context. Please note that i have commented below because When i launch pyspark from cmd sc is already created.If it doesnt then we can create as per below commented line
- Then i use SQL context which will be used later to create Dataframe

In [71]:
#When i launch pyspark from cmd sc is already created. So i dont have to use sparkcontext. 
#sc = SparkContext("local[4]","news_analysis")
sqlContext = SQLContext(sc)


 ## Reading data and Converting to Dataframes

- Now i read the files as wholetext files which creates a PairRDD as (filepath,text)
- Then i will create the schema from the files read.
- I will create a schema with header news_id,news_category,news_text
- path[0] has the filepath . I split the function and use rightmost as id(which is the name of the text file) , 2nd rightmost as category which is the name of the subfolders
- path[1] gives me the text data of the file
- Using the Dataframe API i create the dataframe
- The sample Data is shown as the output

In [72]:
#Reading the path and data and splitting for creating dataframe
#https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/SparkContext.html#wholeTextFiles-java.lang.String-int-
news_data = sc.wholeTextFiles("News_Data/*")
news_schema = StructType([
                    StructField("news_id" , StringType(), True),
                    StructField("news_category" , StringType(), True),
                    StructField("news_text" , StringType(), True)
                    ])
 
news_split = news_data.map(lambda path :  (path[0].split("/")[-1], path[0].split("/")[-2],path[1]))
news_dataframe = sqlContext.createDataFrame(news_split,news_schema)
news_dataframe.sample(False,0.1).show(30)

+--------+-------------+--------------------+
| news_id|news_category|           news_text|
+--------+-------------+--------------------+
|1000.txt|     Business|From the outset, ...|
|1108.txt|     Business|Most presidents w...|
|1195.txt|     Business|The Trump adminis...|
|1207.txt|     Business|LONDON � If an au...|
|  50.txt|     Business|WASHINGTON � Pres...|
| 513.txt|     Business|Each week, Farhad...|
| 522.txt|     Business|Delta Air Lines s...|
| 673.txt|     Business|FRANKFURT � Polit...|
| 698.txt|     Business|TOKYO � Getting h...|
| 700.txt|       Movies|Handsome cinemato...|
| 767.txt|       Movies|This year�s Acade...|
| 863.txt|       Movies|LOS ANGELES � Opr...|
| 907.txt|       Movies|On Wednesday, tho...|
|1037.txt|     Politics|WASHINGTON � Pres...|
|1071.txt|     Politics|WASHINGTON � When...|
| 314.txt|     Politics|This is the first...|
| 323.txt|     Politics|ROME � It happene...|
| 325.txt|     Politics|Gov. Andrew M. Cu...|
| 411.txt|     Politics|WASHINGTON

## Encoding Labels
Here i use stringindexer to encode categorical labels to numeric labels. Few sample data is shown as output

In [73]:

label_stringIdx = StringIndexer(inputCol = "news_category", outputCol = "label")
indexed = label_stringIdx.fit(news_dataframe).transform(news_dataframe)
indexed.sample(False,0.1).show(30)

+--------+-------------+--------------------+-----+
| news_id|news_category|           news_text|label|
+--------+-------------+--------------------+-----+
|1004.txt|     Business|One appointee wen...|  0.0|
|1042.txt|     Business|FRANKFURT � Polit...|  0.0|
|1122.txt|     Business|A new chapter ope...|  0.0|
|1162.txt|     Business|The Abraaj Group ...|  0.0|
|1202.txt|     Business|On a Thursday eve...|  0.0|
| 478.txt|     Business|JERUSALEM � Seven...|  0.0|
| 522.txt|     Business|Delta Air Lines s...|  0.0|
| 575.txt|     Business|A possible merger...|  0.0|
| 701.txt|     Business|For weeks, users ...|  0.0|
| 531.txt|       Movies|With accusations ...|  2.0|
| 704.txt|       Movies|LOS ANGELES � �Th...|  2.0|
| 707.txt|       Movies|The Oscars turn 9...|  2.0|
| 757.txt|       Movies|Since sexual hara...|  2.0|
| 930.txt|       Movies|Say hello to this...|  2.0|
| 962.txt|       Movies|The �we� in �What...|  2.0|
|1016.txt|     Politics|WASHINGTON � Gary...|  1.0|
|1028.txt|  

## Cleaning and Feature Extraction
- Here we use tokenizer to convert sentence to words or tokens
- Stop words to remove irrelevant words
- I use hashingTF to hash the features so that it will be faster and easier
- Then we use TF-IDF for feature extraction.For more information regarding TF-IDF use link 
https://spark.apache.org/docs/2.2.0/mllib-feature-extraction.html

In [74]:
#the below code is taken from pipeline.example from pyspark example program and
#https://spark.apache.org/docs/1.6.1/ml-features.html#tf-idf-hashingtf-and-idf

train_set, test_set = indexed.randomSplit([0.8,0.2])
tokenizer = Tokenizer(inputCol="news_text", outputCol="news_words")
remover = StopWordsRemover(inputCol="news_words", outputCol="filtered_words",caseSensitive=False)
hashingTF = HashingTF(inputCol="filtered_words",outputCol="hashed_features",numFeatures=1000)
idf = IDF(inputCol="hashed_features",outputCol="features")


## 1. Multi-Class Classification using Decision Tree Classifier
- Here i instantiate Decision Tree Classifer and create a pipeline using it
- Then i use my training set to train the model
- I use the testing set to test my model and the sample output is given

In [75]:
#Decision Tree Classifier
#https://spark.apache.org/docs/1.5.0/ml-decision-tree.html
DC = DecisionTreeClassifier(labelCol = "label",featuresCol = "features")
DC_pipeline = Pipeline(stages=[tokenizer, remover,hashingTF, idf,DC])
DC_model = DC_pipeline.fit(train_set)
DC_predictions = DC_model.transform(test_set)
DC_predictions.select("news_id","news_category","prediction","label").show(10)


+--------+-------------+----------+-----+
| news_id|news_category|prediction|label|
+--------+-------------+----------+-----+
|1006.txt|     Politics|       1.0|  1.0|
|1014.txt|     Business|       0.0|  0.0|
|1016.txt|     Politics|       1.0|  1.0|
|1026.txt|     Politics|       2.0|  1.0|
|1034.txt|     Politics|       0.0|  1.0|
|1037.txt|     Politics|       1.0|  1.0|
|1051.txt|     Politics|       3.0|  1.0|
|1071.txt|     Politics|       1.0|  1.0|
|1115.txt|     Business|       0.0|  0.0|
|1164.txt|     Business|       0.0|  0.0|
+--------+-------------+----------+-----+
only showing top 10 rows



## Prediction Accuracy using Decision Tree Classifer
We use evaluators to predict the accuracy of the model.

In [76]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(DC_predictions)

0.6723382376206296

## 2. Multi-Class Classification using Random Forest Classifier
- Here i instantiate Random Forest Classifier and create a pipeline using it
- Then i use my training set to train the model
- I use the testing set to test my model and the sample output is given

In [77]:
#https://spark.apache.org/docs/2.1.0/ml-classification-regression.html
RF = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)
RF_pipeline = Pipeline(stages=[tokenizer, remover,hashingTF, idf,RF])
RF_model = RF_pipeline.fit(train_set)
RF_predictions = RF_model.transform(test_set)
RF_predictions.select("news_id","news_category","prediction","label").show(20)

+--------+-------------+----------+-----+
| news_id|news_category|prediction|label|
+--------+-------------+----------+-----+
|1006.txt|     Politics|       1.0|  1.0|
|1014.txt|     Business|       0.0|  0.0|
|1016.txt|     Politics|       1.0|  1.0|
|1026.txt|     Politics|       1.0|  1.0|
|1034.txt|     Politics|       1.0|  1.0|
|1037.txt|     Politics|       1.0|  1.0|
|1051.txt|     Politics|       1.0|  1.0|
|1071.txt|     Politics|       1.0|  1.0|
|1115.txt|     Business|       1.0|  0.0|
|1164.txt|     Business|       0.0|  0.0|
|1206.txt|     Business|       0.0|  0.0|
|   2.txt|     Business|       0.0|  0.0|
| 281.txt|       Movies|       2.0|  2.0|
| 288.txt|     Politics|       2.0|  1.0|
| 313.txt|     Politics|       1.0|  1.0|
| 473.txt|     Business|       0.0|  0.0|
| 481.txt|     Business|       0.0|  0.0|
|  50.txt|     Business|       0.0|  0.0|
| 511.txt|     Business|       0.0|  0.0|
| 610.txt|     Business|       0.0|  0.0|
+--------+-------------+----------

## Prediction Accuracy using Random Forest Classifer
We use evaluators to predict the accuracy of the model.

In [78]:
evaluator.evaluate(RF_predictions)

0.7335654479565111

## 3. Multi-Class Classification using Naive Bayes
- Here i instantiate Naive Bayes and create a pipeline using it
- Then i use my training set to train the model
- I use the testing set to test my model and the sample output is given

In [79]:
#https://spark.apache.org/docs/2.1.0/ml-classification-regression.html
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
nb_pipeline = Pipeline(stages=[tokenizer, remover,hashingTF, idf,nb])
nb_model = nb_pipeline.fit(train_set)
nb_predictions = nb_model.transform(test_set)
nb_predictions.select("news_id","news_category","prediction","label").show(20)

+--------+-------------+----------+-----+
| news_id|news_category|prediction|label|
+--------+-------------+----------+-----+
|1006.txt|     Politics|       1.0|  1.0|
|1014.txt|     Business|       1.0|  0.0|
|1016.txt|     Politics|       1.0|  1.0|
|1026.txt|     Politics|       1.0|  1.0|
|1034.txt|     Politics|       0.0|  1.0|
|1037.txt|     Politics|       1.0|  1.0|
|1051.txt|     Politics|       1.0|  1.0|
|1071.txt|     Politics|       1.0|  1.0|
|1115.txt|     Business|       0.0|  0.0|
|1164.txt|     Business|       0.0|  0.0|
|1206.txt|     Business|       0.0|  0.0|
|   2.txt|     Business|       0.0|  0.0|
| 281.txt|       Movies|       2.0|  2.0|
| 288.txt|     Politics|       1.0|  1.0|
| 313.txt|     Politics|       1.0|  1.0|
| 473.txt|     Business|       0.0|  0.0|
| 481.txt|     Business|       0.0|  0.0|
|  50.txt|     Business|       1.0|  0.0|
| 511.txt|     Business|       0.0|  0.0|
| 610.txt|     Business|       0.0|  0.0|
+--------+-------------+----------

## Prediction Accuracy using Naive Bayes
We use evaluators to predict the accuracy of the model.

In [80]:
evaluator.evaluate(nb_predictions)

0.7476621417797887

## 4. Multi-Class Classification using Logistic Regression
- Here i instantiate Logistic Regression and create a pipeline using it
- Then i use my training set to train the model
- I use the testing set to test my model and the sample output is given

In [81]:
#Logistic Regression. This code is taken from Logistic regression summary example from the spark
LR = LogisticRegression(maxIter=10)
LR_pipeline = Pipeline(stages=[tokenizer, remover,hashingTF, idf,LR])
LR_model = LR_pipeline.fit(train_set)
LR_predictions = LR_model.transform(test_set)
LR_predictions.select("news_id","news_category","probability","prediction","label").show(20)

+--------+-------------+--------------------+----------+-----+
| news_id|news_category|         probability|prediction|label|
+--------+-------------+--------------------+----------+-----+
|1006.txt|     Politics|[2.87055638133401...|       1.0|  1.0|
|1014.txt|     Business|[0.94630158747331...|       0.0|  0.0|
|1016.txt|     Politics|[2.87055638133401...|       1.0|  1.0|
|1026.txt|     Politics|[5.65882720967919...|       1.0|  1.0|
|1034.txt|     Politics|[0.47128466297770...|       0.0|  1.0|
|1037.txt|     Politics|[7.19995614450264...|       1.0|  1.0|
|1051.txt|     Politics|[1.82821913164354...|       1.0|  1.0|
|1071.txt|     Politics|[7.70170280532057...|       1.0|  1.0|
|1115.txt|     Business|[0.99537068551977...|       0.0|  0.0|
|1164.txt|     Business|[0.96529871014097...|       0.0|  0.0|
|1206.txt|     Business|[0.98851652259368...|       0.0|  0.0|
|   2.txt|     Business|[0.99994490550686...|       0.0|  0.0|
| 281.txt|       Movies|[7.76175080894985...|       2.0

## Prediction Accuracy using Logistic Regression
We use evaluators to predict the accuracy of the model.

In [82]:
evaluator.evaluate(LR_predictions)

0.8239795918367346

### The best model is Logistic regression as the accuracy is higher.  I will be using the two models and now in the next step we will use this model and try to optimize the parameters for higher accuracy

## Parameter tuning for  Decision Tree Classifier

In [83]:
DC_paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [100, 1000, 10000]).addGrid(DC.maxDepth, [1, 2, 6, 10]).addGrid(DC.maxBins, [20, 40, 80]).build()
crossval = CrossValidator(estimator=DC_pipeline,estimatorParamMaps=DC_paramGrid,evaluator=evaluator,numFolds=10)
DC_cvModel = crossval.fit(train_set)

## Prediction Accuracy for Tuned Decision Tree Classifer
As the accuracy suggests the tuned model performs better than untuned model. So we will use tuned model to use to predict our test data

In [84]:
DC_cv_predictions = DC_cvModel.transform(test_set)
evaluator.evaluate(DC_cv_predictions)

0.7509109872175966

## Parameter tuning for  Logistic Regression

In [85]:
# taken from crossvalidation example from spark
paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [100, 1000, 10000]).addGrid(LR.regParam, [0.1, 0.3, 0.5]).addGrid(LR.elasticNetParam, [0.1, 0.2, 0.4]).build()
crossval = CrossValidator(estimator=LR_pipeline,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=10)
cvModel = crossval.fit(train_set)



## Prediction Accuracy for Tuned  Logistic Regression
As the accuracy suggests the tuned model performs better than untuned model. So we will use tuned model to use to predict our test data

In [86]:
cv_predictions = cvModel.transform(test_set)
evaluator.evaluate(cv_predictions)

0.8793422682000509

*The Tuned model of Logistic Regression is the best two models for news classification. I will be using Tuned Decision Classifer as a second classifier to compare with Logistic regression model*

# Testing Some Unknown data for Grading Purpose
- This part can be used by TA for testing the model
- Here i am using the two tuned model ie Tuned Decsion tree and Tuned logistic regression 
- The article that can be used for testing is placed in the test_data folder insid eunknown folder
- Note that the article present in test data has not been used for training or testing purpose while buliding model
- If TA wants to verify with their own data then please keep the data in test folder as i have done

  

## Loading the Test Data

In [87]:
#For testing purpose
from pyspark.sql.types import *
test_data = sc.wholeTextFiles("test_data/*")
test_schema = StructType([
                    StructField("news_id" , StringType(), True),
                    StructField("news_category" , StringType(), True),
                    StructField("news_text" , StringType(), True)
                    ])
 
t = test_data.map(lambda path :  (path[0].split("/")[-1], path[0].split("/")[-2],path[1]))
test_dataframe = sqlContext.createDataFrame(t,test_schema)
label1_stringIdx = StringIndexer(inputCol = "news_category", outputCol = "label")
indexed1 = label1_stringIdx.fit(test_dataframe).transform(test_dataframe)
indexed1.show()

+--------------+-------------+--------------------+-----+
|       news_id|news_category|           news_text|label|
+--------------+-------------+--------------------+-----+
| Business1.txt|      Unknown|LOS ANGELES � In ...|  0.0|
| Business2.txt|      Unknown|The working world...|  0.0|
| Business3.txt|      Unknown|WASHINGTON � Afte...|  0.0|
| Business4.txt|      Unknown|With its $11.9 bi...|  0.0|
| Business5.txt|      Unknown|LONDON � During h...|  0.0|
|   Movies1.txt|      Unknown|The title of �Mai...|  0.0|
|   Movies2.txt|      Unknown|�The Leisure Seek...|  0.0|
|   Movies3.txt|      Unknown|Here�s a look at ...|  0.0|
|   Movies4.txt|      Unknown|Finding an opport...|  0.0|
|   Movies5.txt|      Unknown|Here�s a look at ...|  0.0|
| Politics1.txt|      Unknown|For Gov. Andrew M...|  0.0|
|Politics15.txt|      Unknown|PLAINVILLE, Conn....|  0.0|
| Politics2.txt|      Unknown|SEOUL, South Kore...|  0.0|
| Politics3.txt|      Unknown|WASHINGTON � Befo...|  0.0|
| Politics4.tx

## Classifing Test Data using Cross validated Decision Tree Classifier
- For verifing we can compare the name of the file and labels

In [91]:
test_predictions = DC_cvModel.transform(indexed1)
test_predictions.select("news_id","news_category","probability","prediction").show()

+--------------+-------------+--------------------+----------+
|       news_id|news_category|         probability|prediction|
+--------------+-------------+--------------------+----------+
| Business1.txt|      Unknown|[0.90196078431372...|       0.0|
| Business2.txt|      Unknown|   [0.0,0.0,0.0,1.0]|       3.0|
| Business3.txt|      Unknown|   [0.0,1.0,0.0,0.0]|       1.0|
| Business4.txt|      Unknown|   [0.0,0.0,1.0,0.0]|       2.0|
| Business5.txt|      Unknown|[0.90196078431372...|       0.0|
|   Movies1.txt|      Unknown|   [0.0,0.0,1.0,0.0]|       2.0|
|   Movies2.txt|      Unknown|   [0.0,0.0,1.0,0.0]|       2.0|
|   Movies3.txt|      Unknown|   [0.0,0.0,1.0,0.0]|       2.0|
|   Movies4.txt|      Unknown|   [0.0,0.0,1.0,0.0]|       2.0|
|   Movies5.txt|      Unknown|   [0.0,0.0,1.0,0.0]|       2.0|
| Politics1.txt|      Unknown|   [0.0,1.0,0.0,0.0]|       1.0|
|Politics15.txt|      Unknown|[0.90196078431372...|       0.0|
| Politics2.txt|      Unknown|   [0.0,1.0,0.0,0.0]|    

## Classifing Test Data using Cross validated Logistic Regression model
- For verifing we can compare the name of the file and labels

In [90]:
test_predictions = cvModel.transform(indexed1)
test_predictions.select("news_id","news_category","probability","prediction").show()

+--------------+-------------+--------------------+----------+
|       news_id|news_category|         probability|prediction|
+--------------+-------------+--------------------+----------+
| Business1.txt|      Unknown|[0.63437357374768...|       0.0|
| Business2.txt|      Unknown|[0.32044585695464...|       2.0|
| Business3.txt|      Unknown|[0.24021041446877...|       1.0|
| Business4.txt|      Unknown|[0.30522460503989...|       0.0|
| Business5.txt|      Unknown|[0.30520737583808...|       0.0|
|   Movies1.txt|      Unknown|[0.28281169688546...|       2.0|
|   Movies2.txt|      Unknown|[0.23598544692892...|       2.0|
|   Movies3.txt|      Unknown|[0.04742457198801...|       2.0|
|   Movies4.txt|      Unknown|[0.23669207009264...|       2.0|
|   Movies5.txt|      Unknown|[0.04742457198801...|       2.0|
| Politics1.txt|      Unknown|[0.22250325027191...|       1.0|
|Politics15.txt|      Unknown|[0.15856301869865...|       1.0|
| Politics2.txt|      Unknown|[0.06159611122567...|    

## Conclusion

As you can see we get a good classification by using Logistic Regression.But still we can find that there are cases which is not predicted right. This can be improved.
- Since the input datasets contains only 100 data for each category we can definitely improve with increase in datasets
- Also there are articles which appear in both the category ie something like "Trump increasing Tarrifs" which comes under both Politics and Business . So this can be accepted in both
- Also since the data is taken only for a particular period we can expect only particular kind of data present. This can be overcome with improved datasets

For the given problem of News Classifiation and Dataset we can see that Tuned Logistic regression is the  best model. We have also verified the model with unknown dataset which was not used to training or testing the model.
Thus i have creted a pipelined model . I have completed the five main stages as per requirement of this assignment ie:      Collecting and Cleaning, feature extracting , Multiclass classification, Testing and Documentation

# AUTHOR : Sachin Kumar Kuppayya