# Text Classification using MLlib

# Goal

The aim is to mainly explore Spark's Machine learning library - **MLlib**. How to go about building models using MLlib? and to demonstrate the similarities and main differencies between two powerful Machine Learning libraries: scikit-learn and Spark's MLlib.


We all know how handy scikit-learn's API is - it is so easy to use that it can be described with 4 little words:

## IMPORT - INSTANTIATE - FIT - PREDICT

Most of the pipelines or algorithms within scikit-learn revolves around above words. Because this simplicity, Spark's MLlib is build with inspiration from scikit-learn's structure, so the scikit-learn user will easily be able to use Spark ML API when working on Big Data workflows is needed.

So we will see simplicity of moving from scikit-learn to Spark's MLlib when working on a bigger range of data to train and use Machine Learning workflows.

Let's get started!

#### Dataset


##### SMS Spam Collection Data Set 

The SMS Spam Collection is a public set of SMS labeled messages as `spam` or `ham` that have been collected for mobile phone spam research. The file contains a collection of more than 5 thousand SMS phone messages.

In [3]:
%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd
import sklearn
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import SVC, LinearSVC
from sklearn.metrics import classification_report, f1_score, accuracy_score, confusion_matrix
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV, StratifiedKFold, cross_val_score, train_test_split

## Step 1: Load data and look around

A collection of texts is also sometimes called *corpus*. Let's print the first five messages in this SMS corpus:


In [4]:
sms_df = pd.read_csv('../data/SMSSpamCollection.csv')

sms_df.head()

Unnamed: 0,category,text
0,ham,"Go until jurong point, crazy.. Available only ..."
1,ham,Ok lar... Joking wif u oni...
2,spam,Free entry in 2 a wkly comp to win FA Cup fina...
3,ham,U dun say so early hor... U c already then say...
4,ham,"Nah I don't think he goes to usf, he lives aro..."


This corpus will be our labeled training set. Using these ham/spam examples, we'll **train a machine learning model to learn to discriminate between ham/spam automatically**. Then, with a trained model, we'll be able to **classify arbitrary unlabeled messages as ham or spam**.

![supervised ML](../img/supervised_ML.png 'supervised ML')

In [5]:
sms_df['category'].value_counts()

ham     4827
spam     747
Name: category, dtype: int64

**Spark way:**

In [6]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('../data/SMSSpamCollection.csv')

In [7]:
data.columns

['category', 'text']

In [8]:
data.show(n=5)

+--------+--------------------+
|category|                text|
+--------+--------------------+
|     ham|Go until jurong p...|
|     ham|Ok lar... Joking ...|
|    spam|Free entry in 2 a...|
|     ham|U dun say so earl...|
|     ham|Nah I don't think...|
+--------+--------------------+
only showing top 5 rows



In [9]:
data.printSchema()

root
 |-- category: string (nullable = true)
 |-- text: string (nullable = true)



In [10]:
from pyspark.sql.functions import col

# by top 20 categories
data.groupBy("category") \
    .count() \
    .show()

+--------+-----+
|category|count|
+--------+-----+
|     ham| 4827|
|    spam|  747|
+--------+-----+



## Step 2: Data Preprocessing and Splitting

In [11]:
# convert label to a numerical variable
sms_df['label_num'] = sms_df.category.map({'ham':0, 'spam':1})
sms_df.head(5)

Unnamed: 0,category,text,label_num
0,ham,"Go until jurong point, crazy.. Available only ...",0
1,ham,Ok lar... Joking wif u oni...,0
2,spam,Free entry in 2 a wkly comp to win FA Cup fina...,1
3,ham,U dun say so early hor... U c already then say...,0
4,ham,"Nah I don't think he goes to usf, he lives aro...",0


In [12]:
# how to define X and y (from the SMS data) for use with COUNTVECTORIZER
X = sms_df.text
y = sms_df.category
print(X.shape)
print(y.shape)

(5574,)
(5574,)


In [13]:
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=1)
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)

(4180,)
(1394,)
(4180,)
(1394,)


# Step 3: Vectorizing our dataset

### 2a. Using `CountVectorizer`

**sklearn**

In [14]:
from nltk.corpus import stopwords

stopwords_list = stopwords.words('english')

In [16]:
# instantiate the vectorizer
vect = CountVectorizer(stop_words=stopwords_list)

# learn training data vocabulary, then use it to create a document-term matrix
vect.fit(X_train)
X_train_dtm = vect.transform(X_train)

> equivalently: combine fit and transform into a single step by following line of code:


`X_train_dtm = vect.fit_transform(X_train)`

In [17]:
X_train_dtm

<4180x7334 sparse matrix of type '<class 'numpy.int64'>'
	with 35435 stored elements in Compressed Sparse Row format>

In [18]:
# transform testing data (using fitted vocabulary) into a document-term matrix
X_test_dtm = vect.transform(X_test)
X_test_dtm

<1394x7334 sparse matrix of type '<class 'numpy.int64'>'
	with 10648 stored elements in Compressed Sparse Row format>

# Visualising Document Term matrix

In [27]:
docs = ['The quick brown fox jumps over the lazy dog', 'I have dog called robert']

In [28]:
vect_d = CountVectorizer(stop_words=stopwords_list)

In [34]:
dtm = vect_d.fit_transform(docs)

In [30]:
vect_d.get_feature_names()

['brown', 'called', 'dog', 'fox', 'jumps', 'lazy', 'quick', 'robert']

In [35]:
dtm.toarray()

array([[1, 0, 1, 1, 1, 1, 1, 0],
       [0, 1, 1, 0, 0, 0, 0, 1]], dtype=int64)

In [36]:
pd.DataFrame(dtm.toarray(), columns=vect_d.get_feature_names())

Unnamed: 0,brown,called,dog,fox,jumps,lazy,quick,robert
0,1,0,1,1,1,1,1,0
1,0,1,1,0,0,0,0,1


**pyspark**

In [37]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(stopwords_list)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [38]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")

In [39]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

In [43]:
dataset.show()

+--------+--------------------+--------------------+--------------------+--------------------+-----+
|category|                text|               words|            filtered|            features|label|
+--------+--------------------+--------------------+--------------------+--------------------+-----+
|     ham|Go until jurong p...|[go, until, juron...|[go, jurong, poin...|(1714,[10,14,35,5...|  0.0|
|     ham|Ok lar... Joking ...|[ok, lar, joking,...|[ok, lar, joking,...|(1714,[0,8,247,36...|  0.0|
|    spam|Free entry in 2 a...|[free, entry, in,...|[free, entry, 2, ...|(1714,[2,9,21,22,...|  1.0|
|     ham|U dun say so earl...|[u, dun, say, so,...|[u, dun, say, ear...|(1714,[0,53,78,79...|  0.0|
|     ham|Nah I don't think...|[nah, i, don, t, ...|[nah, think, goes...|(1714,[49,131,361...|  0.0|
|    spam|FreeMsg Hey there...|[freemsg, hey, th...|[freemsg, hey, da...|(1714,[8,13,19,24...|  1.0|
|     ham|Even my brother i...|[even, my, brothe...|[even, brother, l...|(1714,[13,123,279.

In [40]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 3964
Test Dataset Count: 1610


## Notice the differences:

In Sklearn flow is like this:

1. LabelEncoding
2. Train-Test split
3. Stopwords Removal and Tokenizer within CountVectorizer - `fit` and `transform`



On other hand, Spark MLlib flow is like this:

1. Regextokenizer and stopwords removal
2. CountVectorizer - `fit` and `transform`
3. Train-Test split

# Logistic Regression using Count Vector Features

**sklearn**

In [41]:
from sklearn.linear_model import LogisticRegression

lr_sklearn = LogisticRegression()

In [42]:
%time lr_sklearn.fit(X_train_dtm, y_train)

CPU times: user 23.9 ms, sys: 3.86 ms, total: 27.8 ms
Wall time: 30.8 ms


LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
          intercept_scaling=1, max_iter=100, multi_class='ovr', n_jobs=1,
          penalty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False)

In [43]:
# make class predictions for X_test_dtm
y_pred_class = lr_sklearn.predict(X_test_dtm)

In [44]:
# calculate accuracy of class predictions
from sklearn import metrics
metrics.accuracy_score(y_test, y_pred_class)

0.9856527977044476

In [45]:
# print the confusion matrix
metrics.confusion_matrix(y_test, y_pred_class)

array([[1210,    1],
       [  19,  164]])

**pyspark**

In [46]:
from pyspark.ml.classification import LogisticRegression

# Build the model
lr = LogisticRegression(maxIter=100, regParam=0.001, elasticNetParam=0)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [47]:
predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                          text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|The last thing i ever wante...|     ham|[0.9999999999999682,3.17461...|  0.0|       0.0|
|No i'm not. I can't give yo...|     ham|[0.9999999923256403,7.67435...|  0.0|       0.0|
|K, wen ur free come to my h...|     ham|[0.9999999627400645,3.72599...|  0.0|       0.0|
|For me the love should star...|     ham|[0.9999999539665888,4.60334...|  0.0|       0.0|
|Oh... Haha... Den we shld h...|     ham|[0.9999998825435645,1.17456...|  0.0|       0.0|
|Heart is empty without love...|     ham|[0.9999998445649958,1.55435...|  0.0|       0.0|
|K, fyi I'm back in my paren...|     ham|[0.9999998395913674,1.60408...|  0.0|       0.0|
|Thanks again for your reply...|     ham|[0.9999997954104055,2.04589...|  0.0|       0.0|
|He neva g

In [49]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
evaluator.evaluate(predictions)

0.9420289855072463

## 2b. Logistic Regression using TF-IDF Features

**sklearn**

In [50]:
# instantiate the vectorizer
tfidf_vect = TfidfVectorizer(stop_words=stopwords_list)

# learn training data vocabulary, then use it to create a document-term matrix
tfidf_vect.fit(X_train)
X_train_dtm_tfidf = tfidf_vect.transform(X_train)

In [51]:
# transform testing data (using fitted vocabulary) into a document-term matrix
X_test_dtm_tfidf = tfidf_vect.transform(X_test)
X_test_dtm_tfidf

<1394x7334 sparse matrix of type '<class 'numpy.float64'>'
	with 10648 stored elements in Compressed Sparse Row format>

In [52]:
%time lr_sklearn.fit(X_train_dtm_tfidf, y_train)

CPU times: user 17.1 ms, sys: 1.74 ms, total: 18.8 ms
Wall time: 17.4 ms


LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
          intercept_scaling=1, max_iter=100, multi_class='ovr', n_jobs=1,
          penalty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False)

In [53]:
# make class predictions for X_test_dtm
y_pred_class_tfidf = lr_sklearn.predict(X_test_dtm_tfidf)

In [54]:
# calculate accuracy of class predictions
from sklearn import metrics
metrics.accuracy_score(y_test, y_pred_class)

0.9856527977044476

In [55]:
# print the confusion matrix
metrics.confusion_matrix(y_test, y_pred_class)

array([[1210,    1],
       [  19,  164]])

**pyspark**

In [56]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF

# Add HashingTF and IDF to transformation
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

# Redo Pipeline
pipeline_tf = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

In [57]:
pipelineFit_tf = pipeline_tf.fit(data)
dataset = pipelineFit_tf.transform(data)

### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

# Build the model
lr = LogisticRegression(maxIter=100, regParam=0.001, elasticNetParam=0)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [58]:
predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                          text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|The last thing i ever wante...|     ham|[0.9999999999446623,5.53378...|  0.0|       0.0|
|No i'm not. I can't give yo...|     ham|[0.9999999839674525,1.60325...|  0.0|       0.0|
|He neva grumble but i sad l...|     ham|[0.9999999185998198,8.14001...|  0.0|       0.0|
|THING R GOOD THANX GOT EXAM...|     ham|[0.9999999143905409,8.56094...|  0.0|       0.0|
|For me the love should star...|     ham|[0.9999998708332496,1.29166...|  0.0|       0.0|
|Although i told u dat i'm i...|     ham|[0.9999997789977655,2.21002...|  0.0|       0.0|
|Thanks again for your reply...|     ham|[0.9999997672482266,2.32751...|  0.0|       0.0|
|Wen ur lovable bcums angry ...|     ham|[0.9999997132583697,2.86741...|  0.0|       0.0|
|Heart is 

In [59]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
evaluator.evaluate(predictions)

0.9431159420289855

# Custom text

#### Example of how to create a spark dataframe

```python
from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = sqlContext.createDataFrame(people)

print(type(schemaPeople))
#  pyspark.sql.dataframe.DataFrame
```


In [60]:
from pyspark.sql import Row
ll = [('Hurry up! Answer simple questions and WINNER will get $900 prize reward! To claim call us. Valid 12 hours only.'),('Hey, How are you? Long time no see')]
rdds = sc.parallelize(ll)
tx = rdds.map(lambda x: Row(text=x))
schematxt = sqlContext.createDataFrame(tx)

In [61]:
schematxt.show()

+--------------------+
|                text|
+--------------------+
|Hurry up! Answer ...|
|Hey, How are you?...|
+--------------------+



## Calculating features for test sample data

In [62]:
test_new_dataset = pipelineFit_tf.transform(schematxt)
test_new_dataset.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|               words|            filtered|         rawFeatures|            features|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Hurry up! Answer ...|[hurry, up, answe...|[hurry, answer, s...|(10000,[1,721,727...|(10000,[1,721,727...|
|Hey, How are you?...|[hey, how, are, y...|[hey, long, time,...|(10000,[7515,8157...|(10000,[7515,8157...|
+--------------------+--------------------+--------------------+--------------------+--------------------+



## Predicting on calculated features of test sample data

In [63]:
test_pred = lrModel.transform(test_new_dataset)

In [64]:
test_pred.select("text","probability","prediction").show()

+--------------------+--------------------+----------+
|                text|         probability|prediction|
+--------------------+--------------------+----------+
|Hurry up! Answer ...|[0.18778340849998...|       1.0|
|Hey, How are you?...|[0.99950414795125...|       0.0|
+--------------------+--------------------+----------+



# Exercise

# Build Model using Naive Bayes algorithm

### and predict on sample text

In [65]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
nb_model = nb.fit(trainingData)
predictions = nb_model.transform(testData)

In [66]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
evaluator.evaluate(predictions)

0.9641304347826087

## prediction on sample dataset

In [67]:
test_pred.select("text","probability","prediction").show()

+--------------------+--------------------+----------+
|                text|         probability|prediction|
+--------------------+--------------------+----------+
|Hurry up! Answer ...|[0.18778340849998...|       1.0|
|Hey, How are you?...|[0.99950414795125...|       0.0|
+--------------------+--------------------+----------+



See how well Naive Bayes performing?

## Perform Cross Validation on dataset using Logistic Regression Technique

In [69]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

# Build the model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
evaluator.evaluate(predictions)

0.9206521739130434