## DIC Lab3 --part2--Text Classification Using pyspark and DataFrames approach

#### Name1: Rajiv Ranjan Name2: Pradeep Aitha

#### Firstly initialize the findspark package so that we could run the spark using python from the desktop or home location

In [1]:
import findspark
findspark.init()

##### Now we need to initialize the sparkcontext and sqlcontext


In [2]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc =SparkContext()
sqlContext = SQLContext(sc)

##### Sqlcontext is used to read the input file
##### It is to be noted that the collected articles were firstly saved in a csv file using the write_csv.py script

In [3]:
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('articles.csv')

##### As we see the data is two column format first column is article and the second category is category of that article

In [4]:
data.columns

['article', 'category']

##### Check the top 5 of the RDD

In [5]:
data.show(5)

+--------------------+--------+
|             article|category|
+--------------------+--------+
|AdvertisementSupp...|   Music|
|AdvertisementSupp...|   Music|
|AdvertisementSupp...|   Music|
|AdvertisementSupp...|   Music|
|AdvertisementSupp...|   Music|
+--------------------+--------+
only showing top 5 rows



##### Now we can check the schema of the dataset

In [6]:
data.printSchema()

root
 |-- article: string (nullable = true)
 |-- category: string (nullable = true)



##### Check the count of various categories

In [7]:
from pyspark.sql.functions import col

data.groupBy("Category").count().orderBy(col("count").desc()).show()

+--------+-----+
|Category|count|
+--------+-----+
|  Sports|   50|
|   Music|   50|
|Business|   50|
|Politics|   50|
+--------+-----+



##       We create our Model pipeline now
###        It includes 3 steps:
####        1) Tokenization (with Regular Expression)
####        2) Remove Stop Words
####        3) Count vectors (“document-term vectors”)

In [8]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

#First step of Model pipeline regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="article", outputCol="words", pattern="\\W")

# Our custom made stop words 
stopwords = ["http","https","amp","rt","t","c","the"] # standard stop words
stopwords += ["AdvertisementSupported","Advertisement","Aug.","25","2015"]
stopwords += ['a', 'about', 'above', 'across', 'after', 'afterwards']
stopwords += ['again', 'against', 'all', 'almost', 'alone', 'along']
stopwords += ['already', 'also', 'although', 'always', 'am', 'among']
stopwords += ['amongst', 'amoungst', 'amount', 'an', 'and', 'another']
stopwords += ['any', 'anyhow', 'anyone', 'anything', 'anyway', 'anywhere']
stopwords += ['are', 'around', 'as', 'at', 'back', 'be', 'became']
stopwords += ['because', 'become', 'becomes', 'becoming', 'been']
stopwords += ['before', 'beforehand', 'behind', 'being', 'below']
stopwords += ['beside', 'besides', 'between', 'beyond', 'bill', 'both']
stopwords += ['bottom', 'but', 'by', 'call', 'can', 'cannot', 'cant']
stopwords += ['co', 'computer', 'con', 'could', 'couldnt', 'cry', 'de']
stopwords += ['describe', 'detail', 'did', 'do', 'done', 'down', 'due']
stopwords += ['during', 'each', 'eg', 'eight', 'either', 'eleven', 'else']
stopwords += ['elsewhere', 'empty', 'enough', 'etc', 'even', 'ever']
stopwords += ['every', 'everyone', 'everything', 'everywhere', 'except']
stopwords += ['few', 'fifteen', 'fifty', 'fill', 'find', 'fire', 'first']
stopwords += ['five', 'for', 'former', 'formerly', 'forty', 'found']
stopwords += ['four', 'from', 'front', 'full', 'further', 'get', 'give']
stopwords += ['go', 'had', 'has', 'hasnt', 'have', 'he', 'hence', 'her']
stopwords += ['here', 'hereafter', 'hereby', 'herein', 'hereupon', 'hers']
stopwords += ['herself', 'him', 'himself', 'his', 'how', 'however']
stopwords += ['hundred', 'i', 'ie', 'if', 'in', 'inc', 'indeed']
stopwords += ['interest', 'into', 'is', 'it', 'its', 'itself', 'keep']
stopwords += ['last', 'latter', 'latterly', 'least', 'less', 'ltd', 'made']
stopwords += ['many', 'may', 'me', 'meanwhile', 'might', 'mill', 'mine']
stopwords += ['more', 'moreover', 'most', 'mostly', 'move', 'much']
stopwords += ['must', 'my', 'myself', 'name', 'namely', 'neither', 'never']
stopwords += ['nevertheless', 'next', 'nine', 'no', 'nobody', 'none']
stopwords += ['noone', 'nor', 'not', 'nothing', 'now', 'nowhere', 'of']
stopwords += ['off', 'often', 'on','once', 'one', 'only', 'onto', 'or']
stopwords += ['other', 'others', 'otherwise', 'our', 'ours', 'ourselves']
stopwords += ['out', 'over', 'own', 'part', 'per', 'perhaps', 'please']
stopwords += ['put', 'rather', 're', 's', 'same', 'see', 'seem', 'seemed']
stopwords += ['seeming', 'seems', 'serious', 'several', 'she', 'should']
stopwords += ['show', 'side', 'since', 'sincere', 'six', 'sixty', 'so']
stopwords += ['some', 'somehow', 'someone', 'something', 'sometime']
stopwords += ['sometimes', 'somewhere', 'still', 'such', 'system', 'take']
stopwords += ['ten', 'than', 'that', 'the', 'their', 'them', 'themselves']
stopwords += ['then', 'thence', 'there', 'thereafter', 'thereby']
stopwords += ['therefore', 'therein', 'thereupon', 'these', 'they']
stopwords += ['thick', 'thin', 'third', 'this', 'those', 'though', 'three']
stopwords += ['three', 'through', 'throughout', 'thru', 'thus', 'to']
stopwords += ['together', 'too', 'top', 'toward', 'towards', 'twelve']
stopwords += ['twenty', 'two', 'un', 'under', 'until', 'up', 'upon']
stopwords += ['us', 'very', 'via', 'was', 'we', 'well', 'were', 'what']
stopwords += ['whatever', 'when', 'whence', 'whenever', 'where']
stopwords += ['whereafter', 'whereas', 'whereby', 'wherein', 'whereupon']
stopwords += ['wherever', 'whether', 'which', 'while', 'whither', 'who']
stopwords += ['whoever', 'whole', 'whom', 'whose', 'why', 'will', 'with']
stopwords += ['within', 'without', 'would', 'yet', 'you', 'your']
stopwords += ['yours', 'yourself', 'yourselves']

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(stopwords)

#Finally using Bag of words approach for words counting and frequency determination
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=3)

##### Using String Indexer because the category field in our case is a string which needs fto be converted into proper numerical value for the classifiaction to happen


In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "article", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+--------------------+--------+--------------------+--------------------+--------------------+-----+
|             article|category|               words|            filtered|            features|label|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
|AdvertisementSupp...|   Music|[advertisementsup...|[bythe, library, ...|(4188,[11,17,19,2...| 22.0|
|AdvertisementSupp...|   Music|[advertisementsup...|[bycapping, celeb...|(4188,[1,2,4,5,7,...| 20.0|
|AdvertisementSupp...|   Music|[advertisementsup...|[bythat, decisive...|(4188,[1,2,8,13,2...|  5.0|
|AdvertisementSupp...|   Music|[advertisementsup...|[byop, ed, contri...|(4188,[4,6,9,12,1...|  8.0|
|AdvertisementSupp...|   Music|[advertisementsup...|[byit, rich, hist...|(4188,[2,8,13,19,...| 19.0|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



##### We now partition our dataset into Training & Test sets

In [10]:
(trainingData_and_testData, validateData) = dataset.randomSplit([0.95, 0.05], seed = 29)
(trainingData, testData) = trainingData_and_testData.randomSplit([0.8, 0.2])
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))
print("Validate Dataset Count: " + str(validateData.count()))

Training Dataset Count: 152
Test Dataset Count: 37
Validate Dataset Count: 11


##### Here we are using our Dataframe approach hence we will use the ML packages and not mllib packages which are used with RDDS 

##### Model Training and Evaluation Logistic Regression using Count Vector Features
##### Our model will make predictions and score on the test set; we then look at the top 5 predictions with the highest probability.

In [11]:
lr = LogisticRegression(maxIter=50, regParam=0.2, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0).select("article", "category") \
    .orderBy("category", ascending=False) \
    .show(n = 5, truncate = 25)

+-------------------------+--------+
|                  article|category|
+-------------------------+--------+
|AdvertisementThey had ...|  Sports|
|AdvertisementThey had ...|  Sports|
+-------------------------+--------+



##### With Multiclass classification we get one of the prediction metrics which is accuracy. Let's see the accuracy of the model

In [12]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9369369369369367

#### We use crossvalidation to find the accuracy with the best model. This is the process of tuning the hyperparameters 

In [33]:

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


paramGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.3, 0.5]).addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]).build())

cv = CrossValidator(estimator=lr,estimatorParamMaps=paramGrid,evaluator=evaluator, numFolds=5)

cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)


0.9430769230769234

##### Classification Using second method. This time we will use Naive bayes approach.

In [14]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)

predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0).select("article","category").orderBy("probability", ascending=False).show(n = 10, truncate = 30)

+------------------------------+--------+
|                       article|category|
+------------------------------+--------+
|AdvertisementThey had the b...|  Sports|
|AdvertisementThey had the b...|  Sports|
+------------------------------+--------+



###### Accuracy of naive bayes on the test data

In [15]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7435897435897437

### Predict New data

### 1st Method: Entirely new data collected from the Nytimes api

##### Create a model on training and test data

In [16]:
final_lr_model = lr.fit(dataset)

In [34]:
naive_model = nb.fit(dataset)

##### Load a new data set which was collected from the nytimes

In [39]:
new_data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('unknown.csv')

##### See the new data

In [40]:
new_data.show()

+--------------------+--------+
|             article|category|
+--------------------+--------+
|This summer, an e...|   Music|
|�Three Billboards...|Business|
|SEOUL, South Kore...|  Sports|
|The Tet offensive...|Politics|
+--------------------+--------+



###### Pass it to the pipeline so that it is ready for the classification

In [41]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "article", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(new_data)
dataset = pipelineFit.transform(new_data)
dataset.show(5)

+--------------------+--------+--------------------+--------------------+--------------------+-----+
|             article|category|               words|            filtered|            features|label|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
|This summer, an e...|   Music|[this, summer, an...|[summer, engineer...|(21,[0,1,2,4,5,7,...|  1.0|
|�Three Billboards...|Business|[three, billboard...|[billboards, outs...|(21,[0,1,2,3,4,5,...|  3.0|
|SEOUL, South Kore...|  Sports|[seoul, south, ko...|[seoul, south, ko...|(21,[0,1,3,4,5,6,...|  2.0|
|The Tet offensive...|Politics|[the, tet, offens...|[tet, offensive, ...|(21,[0,2,3,6,7,8,...|  0.0|
+--------------------+--------+--------------------+--------------------+--------------------+-----+



##### Use this data to run on the model. Do the classification

In [20]:
test_new_data=final_lr_model.transform(dataset)

In [35]:
test_new_data_nb = naive_model.transform(dataset)

##### See the  Accuracy rate for LR approach

In [30]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(test_new_data)

0.748307587892347

##### See the accuracy for the naive bayes approach

In [36]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(test_new_data_nb)


0.608303957309347

### Predict new data using the validate data which has been sepaarted out in beginning 

#### 2nd Method using unused and unseen validate data

In [24]:
test_new_data1=final_lr_model.transform(validateData)

In [37]:
test_new_data2=naive_model.transform(validateData)

In [25]:
test_new_data1

DataFrame[article: string, category: string, words: array<string>, filtered: array<string>, features: vector, label: double, rawPrediction: vector, probability: vector, prediction: double]

#### Accuracy for the LR method

In [29]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(test_new_data1)

0.9449897096823037

##### Accuracy for naive bayes

In [38]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(test_new_data2)


0.9029305092093457