# Text Classification using MLlib

**Problem Statement:**

Using a dataset of collection of SMS, predict whether a piece of text/sms is a Spam or not

**Read the dataset into a Spark Dataframe**

In [10]:
# from pyspark.sql import SQLContext
# sqlContext = SQLContext(sc)

# spark = SparkSession \
#     .builder \
#     .appName("Python Spark dataframe basic example") \
#     .getOrCreate()
from pyspark.sql import SparkSession

data = spark.read.csv('../data/SMSSpamCollection.csv', header='true')

**Print out the column names**

In [11]:
data.columns

['category', 'text']

**Let's have a look to our Dataframe**

In [12]:
data.show()

+--------+--------------------+
|category|                text|
+--------+--------------------+
|     ham|Go until jurong p...|
|     ham|Ok lar... Joking ...|
|    spam|Free entry in 2 a...|
|     ham|U dun say so earl...|
|     ham|Nah I don't think...|
|    spam|FreeMsg Hey there...|
|     ham|Even my brother i...|
|     ham|As per your reque...|
|    spam|WINNER!! As a val...|
|    spam|Had your mobile 1...|
|     ham|I'm gonna be home...|
|    spam|SIX chances to wi...|
|    spam|URGENT! You have ...|
|     ham|I've been searchi...|
|     ham|I HAVE A DATE ON ...|
|    spam|XXXMobileMovieClu...|
|     ham|Oh k...i'm watchi...|
|     ham|Eh u remember how...|
|     ham|Fine if that¬ís t...|
|    spam|England v Macedon...|
+--------+--------------------+
only showing top 20 rows



**Print out the schema to see datatype of each column**

In [13]:
data.printSchema()

root
 |-- category: string (nullable = true)
 |-- text: string (nullable = true)



**Count number of messages in each catergory present into our dataset**

In [14]:
from pyspark.sql.functions import col

# by top 20 categories
data.groupBy("category") \
    .count() \
    .show()

+--------+-----+
|category|count|
+--------+-----+
|     ham| 4827|
|    spam|  747|
+--------+-----+



# Cross checking with Pandas

In [15]:
import pandas as pd

df = pd.read_csv('../data/SMSSpamCollection.csv')


In [16]:
df['category'].value_counts()

ham     4827
spam     747
Name: category, dtype: int64

**Next, perform three tasks:**

1. Prepare your data i.e. `text` column by tokenizing the words
2. Remove stop words from the tokenized column
3. Finally Bag of words i.e. CountVectorizer to get features


Use official documentation guide Spark on MLlib to solve the exercise. Link:

[Spark MLlib guide](http://spark.apache.org/docs/1.6.2/ml-features.html#vectorassembler)

In [17]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from nltk.corpus import stopwords

# regular expression tokenizer
#regexTokenizer = RegexTokenizer()

# stop words
#add_stopwords = stopwords.words('english')

#stopwordsRemover = StopWordsRemover().setStopWords(add_stopwords)

# bag of words count
#countVectors = CountVectorizer()

Next, Apply StringIndexer to the `category` column to get label indexing

In [18]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
# String Indexer - A label indexer that maps a string column of labels to an ML column of label indices.

# label_stringIdx = StringIndexer()

Pipeline All the steps performed above upsing Spark Pipeline API

In [19]:
from pyspark.ml import Pipeline

# A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

# pipeline = Pipeline(stages=[])

# Fit the pipeline to training documents.
# pipelineFit = pipeline.fit()
# dataset = pipelineFit.transform()

**Use `show()` to have a look to your transformed dataframe**

In [20]:
# dataset.show()

**Next, split the dataset into training and test data**

**Also, count the number of rows into each dataset**

In [22]:
### Randomly split data into training and test sets. set seed for reproducibility

# (trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

# print("Training Dataset Count: " + str(trainingData.count()))
# print("Test Dataset Count: " + str(testData.count()))

# Logistic Regression using Count Vector Features

**Apply Logisitic Regression using Count Vector Features generated and train the model**

In [23]:
# Build the model

# lr = LogisticRegression()

# Train model with Training Data

# lrModel = lr.fit()

**Perform prediction using the test dataset**

In [24]:
# predictions = lrModel.transform()

# predictions.filter(predictions['prediction'] == 0) \
#     .select("text","category","probability","label","prediction") \
#     .orderBy("probability", ascending=False) \
#     .show(n = 10, truncate = 30)

**Perform evaluations using MulticlassClassificationEvaluator**

Hint: Use MulticlassClassificationEvaluator
`from pyspark.ml.evaluation import MulticlassClassificationEvaluator`

In [25]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# evaluator = MulticlassClassificationEvaluator()
# evaluator.evaluate()

# Logistic Regression using TF-IDF Features

**Now, Apply Logistic Regression using TF-IDF to see if we can improve the accuracy further**

In [31]:
from pyspark.ml.feature import HashingTF, IDF

# Add HashingTF and IDF to transformation
# hashingTF = HashingTF(inputCol="", outputCol="", numFeatures=10000)
# idf = IDF(inputCol="", outputCol="", minDocFreq=5) #minDocFreq: remove sparse terms

# # Redo Pipeline
# pipeline = Pipeline(stages=[])

In [32]:
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

### Randomly split data into training and test sets. set seed for reproducibility
# (trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

# # Build the model
# lr = LogisticRegression()

# # Train model with Training Data
# lrModel = lr.fit(trainingData)

**Perform predictions on Testdata using this model**

In [27]:
# predictions = lrModel.transform()

# predictions.filter(predictions['prediction'] == 0) \
#     .select("text","category","probability","label","prediction") \
#     .orderBy("probability", ascending=False) \
#     .show(n = 10, truncate = 30)

**Evaluate your model using MulticlassClassificationEvaluator**

In [28]:
# evaluator = MulticlassClassificationEvaluator(predictionCol="")
# evaluator.evaluate(predictions)

# Custom text

**Give a custom message to your trained model and check results. Remember, feed your text in form of dataframe to the model.**

#### Example of how to create a spark dataframe

```python
from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = sqlContext.createDataFrame(people)

print(type(schemaPeople))
#  pyspark.sql.dataframe.DataFrame
```


In [29]:
from pyspark.sql import Row
ll = [('Hurry up! Answer simple questions and WINNER will get $900 prize reward! To claim call us. Valid 12 hours only.'),('Hey, How are you? Long time no see')]
rdds = sc.parallelize(ll)
tx = rdds.map(lambda x: Row(text=x))
schematxt = sqlContext.createDataFrame(tx)

In [30]:
schematxt.show()

+--------------------+
|                text|
+--------------------+
|Hurry up! Answer ...|
|Hey, How are you?...|
+--------------------+



## Calculating features for test sample data

In [37]:
# test_new_dataset = pipelineFit.transform()
# test_new_dataset.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|               words|            filtered|         rawFeatures|            features|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Hurry up! Answer ...|[hurry, up, answe...|[hurry, answer, s...|(10000,[1,721,727...|(10000,[1,721,727...|
|Hey, How are you?...|[hey, how, are, y...|[hey, long, time,...|(10000,[7515,8157...|(10000,[7515,8157...|
+--------------------+--------------------+--------------------+--------------------+--------------------+



## Predicting on calculated features of test sample data

In [38]:
# test_pred = lrModel.transform(test_new_dataset)

# test_pred.filter(test_pred['prediction'] == 0) \
#     .select("text","probability","prediction") \
#     .orderBy("probability", ascending=False)

In [39]:
# test_pred.select("text","probability","prediction").show()

+--------------------+--------------------+----------+
|                text|         probability|prediction|
+--------------------+--------------------+----------+
|Hurry up! Answer ...|[0.40530136808716...|       1.0|
|Hey, How are you?...|[0.96833775344934...|       0.0|
+--------------------+--------------------+----------+



# Exercise

# Build Model using Naive Bayes algorithm

### Predict on sample text

In [31]:
from pyspark.ml.classification import NaiveBayes
# nb = NaiveBayes()
# model = nb.fit()
# predictions = model.transform()

In [32]:
# evaluator = MulticlassClassificationEvaluator()
# evaluator.evaluate()

## prediction on sample dataset

In [33]:
# test_pred.select("").show()

As you can see Naive Bayes performed well and it's able to identify message as spam! Congrats!

### Also you could try Random Forest. See how well it's performing?