# NLP Using PySpark

## Objective:
- The objective from this project is to create a <b>Spam filter using NaiveBayes classifier</b>.
- It is required to obtain <b>f1_scored > 0.9</b>.
- We'll use a dataset from UCI Repository. SMS Spam Detection: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection

### Creating a spark session and importing the required libraries

In [2]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as pyf
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
spark = SparkSession.builder.getOrCreate()

### Reading the data into a DataFrame

In [5]:
df = spark.read.csv('SMSSpamCollection', sep='\t')

In [6]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [7]:
new_df = df.withColumnRenamed('_c0', 'class').withColumnRenamed('_c1', 'text')

In [8]:
new_df.show(10)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows



## Clean and Prepare the Data

### Creating a new feature column contains the length of the text column

In [10]:
df_len = new_df.withColumn('length', pyf.length(pyf.col('text')))

In [11]:
df_len.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



### Geting the average text length for each class

In [12]:
df_len.groupBy('class').agg(pyf.avg('length').alias('Avg. Length')).show()

+-----+-----------------+
|class|      Avg. Length|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## Feature Transformations

### In this part I transform the raw text in to tf_idf model :

### Performing the following steps to obtain TF-IDF:
1. Creating a <b>Tokenizer</b> from the text column.
2. Creating a <b>StopWordsRemover</b> to remove the <b>stop words</b> from the column obtained from the <b>Tokenizer</b>.
3. Creating a <b>CountVectorizer</b> after removing the <b>stop words</b>.
4. Creating the <b>TF-IDF</b> from the <b>CountVectorizer</b>.

In [13]:
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")

- Converting the <b>class column</b> to index using <b>StringIndexer</b>
- Creating feature column from the <b>TF-IDF</b> and <b>length</b> columns.

In [14]:
hamSpamNum = StringIndexer(inputCol='class',outputCol='label')
vecAssembler = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

## The Model
- Creating a <b>NaiveBayes</b> classifier with the default parameters.

In [15]:
nbModel = NaiveBayes(featuresCol='features', labelCol='label')

## Pipeline
### Creating a pipeline model contains all the steps starting from the Tokenizer to the NaiveBays classifier.

In [16]:
pipeline = Pipeline(stages=[tokenizer,stopremove, count_vec, idf, hamSpamNum, vecAssembler, nbModel])

### Spliting the data to trian and test data with ratios 0.7 and 0.3 respectively.

In [17]:
trainDF, testDF = df_len.randomSplit([.7,.3],seed=42)

### Fitting your Pipeline model to the training data

In [18]:
pipelineModel = pipeline.fit(trainDF)

### Performing predictions on tests dataframe

In [19]:
predDF = pipelineModel.transform(testDF)

### Printing the schema of the prediction dataframe

In [20]:
predDF.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)
 |-- token_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stop_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- c_vec: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



## Model Evaluation
- Using <b>MulticlassClassificationEvaluator</b> to calculate the <b>f1_score</b>.

In [21]:
classificationEvaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')

In [22]:
f1 = classificationEvaluator.evaluate(predDF)

In [23]:
print(f"f1_score is {f1}")

f1_score is 0.9727502290227267
