# NLP Using PySpark

## Objective:
- The objective from this project is to create a <b>Spam filter using NaiveBayes classifier</b>.
- We'll use a dataset from UCI Repository. SMS Spam Detection: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection

### Create a spark session and import the required libraries

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=d6f47490b634c7ac6b0a6d564e3e8e15d17bb3d8a591a682d4af0d6c8f4f28c5
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("SpamFilter") \
    .getOrCreate()

In [17]:
from pyspark.sql.functions import *

In [20]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.pipeline import Pipeline

In [22]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [25]:
from pyspark.ml.classification import NaiveBayes

In [34]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### Read the data into a DataFrame

In [5]:
df = spark.read.csv("/content/drive/MyDrive/SpamFilter/SMSSpamCollection",
                             sep="\t",
                             inferSchema=True)

### Print the schema

In [7]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



### Rename the first column to 'class' and second column to 'text'

In [9]:
df2 = df.withColumnRenamed("_c0", "class").withColumnRenamed("_c1", "text")
df2.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



### Show the first 10 rows from the dataframe
- Show once with truncate=True and once with truncate=False

In [11]:
df2.show(10, truncate=True)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows



In [12]:
df2.show(10, truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                            |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                                 |
|ham  |Ok lar... Joking wif u oni...                                                                                                                                   |
|spam |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075o

## Clean and Prepare the Data

### Create a new feature column contains the length of the text column

In [15]:
df3 = df2.withColumn("text_length", length(df2["text"]))

### Show the new dataframe

In [16]:
df3.show(10)

+-----+--------------------+-----------+
|class|                text|text_length|
+-----+--------------------+-----------+
|  ham|Go until jurong p...|        111|
|  ham|Ok lar... Joking ...|         29|
| spam|Free entry in 2 a...|        155|
|  ham|U dun say so earl...|         49|
|  ham|Nah I don't think...|         61|
| spam|FreeMsg Hey there...|        147|
|  ham|Even my brother i...|         77|
|  ham|As per your reque...|        160|
| spam|WINNER!! As a val...|        157|
| spam|Had your mobile 1...|        154|
+-----+--------------------+-----------+
only showing top 10 rows



### Get the average text length for each class (give alias name to the average length column)

+-----+-----------------+
|class|      Avg. Lenght|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [19]:
avg_text_length = df3.groupBy("class").agg(avg(length(df3["text"])).alias("Avg. Length"))
avg_text_length.show()


+-----+-----------------+
|class|      Avg. Length|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## Feature Transformations

### In this part you transform you raw text in to tf_idf model :

### Perform the following steps to obtain TF-IDF:
1. Import the required transformers/estimators for the subsequent steps.
2. Create a <b>Tokenizer</b> from the text column.
3. Create a <b>StopWordsRemover</b> to remove the <b>stop words</b> from the column obtained from the <b>Tokenizer</b>.
4. Create a <b>CountVectorizer</b> after removing the <b>stop words</b>.
5. Create the <b>TF-IDF</b> from the <b>CountVectorizer</b>.

In [21]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
count_vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="tfidf_features")

In [None]:
# Create a pipeline combining all the steps
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, count_vectorizer, idf])

# Fit the pipeline to the DataFrame
pipeline_model = pipeline.fit(df3)

# Transform the DataFrame to obtain the TF-IDF features
tfidf_df = pipeline_model.transform(df3)

# Show the TF-IDF features DataFrame
tfidf_df.select("class", "tfidf_features").show(truncate=False)

- Convert the <b>class column</b> to index using <b>StringIndexer</b>
- Create feature column from the <b>TF-IDF</b> and <b>lenght</b> columns.

In [24]:
string_indexer = StringIndexer(inputCol="class", outputCol="label")
feature_assembler = VectorAssembler(inputCols=["tfidf_features", "text_length"], outputCol="features")

## The Model
- Create a <b>NaiveBayes</b> classifier with the default parameters.

In [26]:
naive_bayes = NaiveBayes()


## Pipeline
### Create a pipeline model contains all the steps starting from the Tokenizer to the NaiveBays classifier.

In [27]:
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, count_vectorizer, idf, string_indexer, feature_assembler, naive_bayes])

### Split your data to trian and test data with ratios 0.7 and 0.3 respectively.

In [28]:
train_data, test_data = df3.randomSplit([0.7, 0.3], seed=42)

print("Training Data Count: " + str(train_data.count()))
print("Test Data Count: " + str(test_data.count()))


Training Data Count: 3981
Test Data Count: 1593


### Fit your Pipeline model to the training data

In [29]:
pipeline_model = pipeline.fit(train_data)

### Perform predictions on tests dataframe

In [30]:
predictions = pipeline_model.transform(test_data)

### Print the schema of the prediction dataframe

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)
 |-- label: double (nullable = false)
 |-- token_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stop_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- c_vec: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [33]:
predictions.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- text_length: integer (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- raw_features: vector (nullable = true)
 |-- tfidf_features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



## Model Evaluation
- Use <b>MulticlassClassificationEvaluator</b> to calculate the <b>f1_score</b>.

In [35]:
evaluator = MulticlassClassificationEvaluator(metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("F1 Score:", f1_score)


F1 Score: 0.9727502290227267
