# NLP Using PySpark

## Objective:
- The objective from this project is to create a <b>Spam filter using NaiveBayes classifier</b>.
- It is required to obtain <b>f1_scored > 0.9</b>.
- We'll use a dataset from UCI Repository. SMS Spam Detection: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection
- Data is also provided for you in the assignment (you do not have to download it).

### Create a spark session and import the required libraries

In [None]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Project").getOrCreate()

### Read the readme file to learn more about the data

### Read the data into a DataFrame

In [None]:
data = spark.read.csv("SMSSpamCollection", inferSchema="true", sep='\t')

### Print the schema

In [None]:
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



### Rename the first column to 'class' and second column to 'text'

In [None]:
data = data.selectExpr("_c0 as class", "_c1 as text")
data.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



### Show the first 10 rows from the dataframe

In [None]:
data.show(10)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows



## Clean and Prepare the Data

### Create a new feature column contains the length of the text column

In [None]:
from pyspark.sql.functions import length
data = data.withColumn('length',length(data['text']))

### Show the new dataframe

In [None]:
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



### Get the average text length for each class 

In [None]:
from pyspark.sql.functions import mean
data.groupby('class').agg(mean('length').alias('length average')).show()

+-----+-----------------+
|class|   length average|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## Feature Transformations

### In this part you transform you raw text in to tf_idf model :

### Perform the following steps to obtain TF-IDF:
1. Import the required transformers/estimators for the subsequent steps.
2. Create a <b>Tokenizer</b> from the text column.
3. Create a <b>StopWordsRemover</b> to remove the <b>stop words</b> from the column obtained from the <b>Tokenizer</b>.
4. Create a <b>CountVectorizer</b> after removing the <b>stop words</b>.
5. Create the <b>TF-IDF</b> from the <b>CountVectorizer</b>.

In [None]:
from pyspark.ml.feature import Tokenizer, \
                               StopWordsRemover, \
                               CountVectorizer,\
                               IDF,StringIndexer, \
                               VectorAssembler

In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizer_text")

In [None]:
stopWordsRemove = StopWordsRemover(inputCol='tokenizer_text',outputCol='text_without_stopWords')

In [None]:
count_vector = CountVectorizer(inputCol='text_without_stopWords',outputCol='count_vec')

- Convert the <b>class column</b> to index using <b>StringIndexer</b>
- Create feature column from the <b>TF-IDF</b> and <b>lenght</b> columns.

In [None]:
indexer = StringIndexer(inputCol='class', outputCol='label')

In [None]:
idf = IDF(inputCol="count_vec", outputCol="tf_idf")

In [None]:
vectorAssembler = VectorAssembler(inputCols=['tf_idf','length'], outputCol='features')

## The Model
- Create a <b>NaiveBayes</b> classifier with the default parameters.

In [None]:
from pyspark.ml.classification import NaiveBayes

In [None]:
nb = NaiveBayes()

## Pipeline
### Create a pipeline model contains all the steps starting from the Tokenizer to the NaiveBays classifier.

In [None]:
from pyspark.ml import Pipeline

In [None]:
pipeline = Pipeline(stages=[tokenizer, indexer, stopWordsRemove, count_vector, idf, vectorAssembler, nb])

### Split your data to trian and test data with ratios 0.7 and 0.3 respectively.

In [None]:
(training,testing) = data.randomSplit([0.7, 0.3])

### Fit your Pipeline model to the training data

In [None]:
model = pipeline.fit(training)

### Perform predictions on tests dataframe

In [None]:
prediction = model.transform(testing)
prediction.show()

+-----+--------------------+------+--------------------+-----+----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|class|                text|length|      tokenizer_text|label|text_without_stopWords|           count_vec|              tf_idf|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+------+--------------------+-----+----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  ham| &lt;DECIMAL&gt; ...|   132|[, &lt;decimal&gt...|  0.0|  [, &lt;decimal&gt...|(10824,[3,71,107,...|(10824,[3,71,107,...|(10825,[3,71,107,...|[-691.47588728510...|[1.0,1.9593202765...|       0.0|
|  ham| and  picking the...|    41|[, and, , picking...|  0.0|  [, , picking, var...|(10824,[3,1336],[...|(10824,[3,1336],[...|(10825,[3,1336,10...|[-120.98704346268...|[1.0,8.8681084061...|  

### Print the schema of the prediction dataframe

In [None]:
prediction.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)
 |-- tokenizer_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- label: double (nullable = false)
 |-- text_without_stopWords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- count_vec: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



## Model Evaluation
- Use <b>MulticlassClassificationEvaluator</b> to calculate the <b>f1_score</b>.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
evaluator = MulticlassClassificationEvaluator(metricName='f1')
f1_score = evaluator.evaluate(prediction)

print("f1_score is: {}".format(f1_score))

f1_score is: 0.9761584458866029
