# Spam Detection

In [13]:
#################### Initialize ###########################

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()
import time

# Feature Engineering
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer)
from pyspark.sql.functions import length

# Models
from pyspark.ml.classification import GBTClassifier,RandomForestClassifier, NaiveBayes, LogisticRegression

# Pipeline
from pyspark.ml import Pipeline

# Evaluators
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [2]:
#################### Data ##################################

# Bring in Data
data = spark.read.csv("smsspamcollection/SMSSpamCollection",inferSchema=True,sep='\t')
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')
data.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



### Feature Engineering

In [3]:
# Add Length Column
data = data.withColumn('length',length(data['text']))

In [4]:
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [5]:
# To avoid Data Leakage Need to split the test train before the transformations
(training,testing) = data.randomSplit([0.7,0.3])

In [11]:
# Setup Transformations
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')

# Vectorize
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

# Build Pipeline
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])

# Call Pipeline for training and testing

#To prevent data leakage, transform the test data on the learned documents from training. 
#This is like the real world where only have access to the training data.
cleaner = data_prep_pipe.fit(training)
training_cleaner = cleaner.transform(training)
testing_cleaner = cleaner.transform(testing)

# Select Clean Data
train_clean_data = training_cleaner.select(['label','features'])
test_clean_data = testing_cleaner.select(['label','features'])

### Model Building

#### Naive Bayes

In [12]:
# Initialize the Model
start_time = time.time()
nb = NaiveBayes()

# Fit model
naive_model = nb.fit(train_clean_data)

# Evaluate the model
test_results = naive_model.transform(test_clean_data)
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))
print("-- Execution time: %s seconds ---" % (time.time() - start_time))

Accuracy of model at predicting spam was: 0.9786533824327943
-- Execution time: 1.2675139904022217 seconds ---


#### Logistic Regression

In [14]:
start_time = time.time()
# Setup Model
log_reg = LogisticRegression()
log_model = log_reg.fit(train_clean_data)

# Evaluate the model
test_results = log_model.transform(test_clean_data)
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))
print("-- Execution time: %s seconds ---" % (time.time() - start_time))

Accuracy of model at predicting spam was: 0.9586711361583795
-- Execution time: 4.155614137649536 seconds ---


#### Gradient Boost

In [None]:
start_time = time.time()
gbt = GBTClassifier()
gbt_model = gbt.fit(train_clean_data)

# Evaluate the model
test_results = gbt_model.transform(test_clean_data)
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))
print("-- Execution time: %s seconds ---" % (time.time() - start_time))

#### Random Forest

In [None]:
start_time = time.time()
rfc = RandomForestClassifier()

# Train model.  This also runs the indexers.
rfc_model = rfc.fit(training)

# Evaluate the model
test_results = rfc_model.transform(testing)
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))
print("-- Execution time: %s seconds ---" % (time.time() - start_time))