# NLP Using PySpark

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, StringIndexer, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import length
from pyspark.ml.feature import VectorAssembler


##Spam filter using NaiveBayes classifier:


In [4]:
#Spark session
spark = SparkSession.builder \
    .appName("SpamFilter") \
    .getOrCreate()

In [6]:
data = spark.read.option("sep", "\t").csv("/content/SMSSpamCollection", inferSchema=True, header=False)

In [7]:
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [8]:
data = data.withColumnRenamed("_c0", "class").withColumnRenamed("_c1", "text")

In [9]:
data.show(10, truncate=True)
data.show(10, truncate=False)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|class|text                                                                                                                                                            |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham  |Go until jurong point, crazy.. Available only in bugis

## Data Preparation and Cleaning

In [10]:
data = data.withColumn("length", length("text"))
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [11]:
avg_length = data.groupBy("class").avg("length").withColumnRenamed("avg(length)", "Avg_Length")
avg_length.show()

+-----+-----------------+
|class|       Avg_Length|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## Feature Transformations

In [12]:
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stop_words_remover = StopWordsRemover(inputCol="token_text", outputCol="stop_tokens")
count_vec = CountVectorizer(inputCol="stop_tokens", outputCol="c_vec")
idf = IDF(inputCol="c_vec", outputCol="tf_idf")

In [16]:
indexer = StringIndexer(inputCol="class", outputCol="label")
assembler = VectorAssembler(inputCols=["tf_idf", "length"], outputCol="features")

## The Model


In [21]:
# Create a NaiveBayes classifier
nb = NaiveBayes()


## Pipeline


In [23]:
# a pipeline model
pipeline = Pipeline(stages=[tokenizer, stop_words_remover, count_vec, idf, indexer, assembler, nb])

In [24]:
train_data, test_data = data.randomSplit([0.7, 0.3])

In [25]:
model = pipeline.fit(train_data)

In [26]:
predictions = model.transform(test_data)

### Print the schema of the prediction dataframe

In [21]:
predictions.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)
 |-- token_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stop_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- c_vec: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



## Model Evaluation

In [27]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("f1_score is:", f1_score)

f1_score is: 0.9826402170280536


In [28]:

#Predicting with real SMS after evalution
custom_sms = spark.createDataFrame([(0, "Hello son dinner is ready")], ["id", "text"])
custom_sms = custom_sms.withColumn("length", length("text"))
custom_predictions = model.transform(custom_sms)
custom_predictions.select("text", "prediction").show()

+--------------------+----------+
|                text|prediction|
+--------------------+----------+
|Hello son dinner ...|       0.0|
+--------------------+----------+



In [29]:
custom_sms = spark.createDataFrame([(0, "Hello son dinner is ready"),
                                    (1, "free access send visa"),
                                    (2, "meeting is cancelled")],
                                   ["id", "text"])
custom_sms = custom_sms.withColumn("length", length("text"))
custom_predictions = model.transform(custom_sms)
custom_predictions.select("text", "prediction").show()

+--------------------+----------+
|                text|prediction|
+--------------------+----------+
|Hello son dinner ...|       0.0|
|free access send ...|       1.0|
|meeting is cancelled|       0.0|
+--------------------+----------+

