# Building Spam Filtering Engine using Spark MlLib

SOURCE:
````     
     https://acadgild.com/blog/building-spam-filtering-engine-using-spark-mllib
````

## What is Spam Filtering?
* Spam filtering is the process of detecting the unwanted or unsolicited 
  email or text from getting into the user’s inbox. 
* Spam filtering applications work on text filters. 
* Text filters work by using algorithms to detect which words 
  and phrases are most often used in the spam emails.

* Now, let us build a spam filtering application using logistic regression.

In [1]:
# Data
# $ pwd
# /Users/mparsian/work_in_progress/data/emails

# $ ls -1
# query.txt
# training_emails_nospam.txt (normal emails, which are NOT spam)
# training_emails_spam.txt (spam emails)


In [2]:
# check SparkSession
spark

In [3]:
spam_mails = spark.sparkContext.textFile('data/emails/training_emails_spam.txt')

In [4]:
spam_mails.count()

20

In [5]:
spam_mails.collect()

["Samsung Galaxy End of YearPromo You have 1 week remaining to retrieve your won prize for the Samsung Galaxy Xmas Promo 'C' draw category winning prize of Seven Hundred and Fifty Thousand Euros each and a Samsung Galaxy S6 EDGE. Winning Ticket Number:WIN-707-COS.  We advise you to keep this winning notification confidential and away from public notice to avoid double claim/mistransfer or impersonation until after remittance/payment to you.",
 "We've picked out 10 new matches for you. Meet them now and then check out all the singles in your area! you might win a prize too",
 'For claim fill in the attached claim application form completely and follow instructions carefully.',
 'Dear sir, I am a Prince in a far kingdom you have not heard of.  I want to send you money via wire transfer so please ...',
 'Get Viagra real cheap!  Send money right away to ...',
 'Oh my gosh you can be really strong too with these drugs found in the rainforest. Get them cheap right now ...',
 'YOUR COMPUTER H

In [6]:
spam_mails.take(2)

["Samsung Galaxy End of YearPromo You have 1 week remaining to retrieve your won prize for the Samsung Galaxy Xmas Promo 'C' draw category winning prize of Seven Hundred and Fifty Thousand Euros each and a Samsung Galaxy S6 EDGE. Winning Ticket Number:WIN-707-COS.  We advise you to keep this winning notification confidential and away from public notice to avoid double claim/mistransfer or impersonation until after remittance/payment to you.",
 "We've picked out 10 new matches for you. Meet them now and then check out all the singles in your area! you might win a prize too"]

In [7]:
# non-spam emails:
ham_mails = spark.sparkContext.textFile('data/emails/training_emails_nospam.txt')

In [8]:
ham_mails.count()

20

In [9]:
ham_mails.take(3)

['Dear Spark Learner, Thanks so much for attending the Spark Summit 2014!  Check out videos of talks from the summit at ...',
 'Hi Mom, Apologies for being late about emailing and forgetting to send you the package.  I hope you and bro have been ...',
 'Wow, hey Fred, just heard about the Spark petabyte sort.  I think we need to take time to try it out immediately ...']

## Feature hashing

In [10]:
# In machine learning, feature hashing, also known as the hashing trick, 
# is a fast and space-efficient way of vectorizing features, i.e. turning 
# arbitrary features into indices in a vector or matrix. It works by applying 
# a hash function to the features and using their hash values as indices directly, 
# rather than looking the indices up in an associative array.

# This can be done as follows
from pyspark.mllib.feature import HashingTF

# Maps a sequence of terms to their term frequencies using the hashing trick.
features = HashingTF(numFeatures = 1000)

In [11]:
features

<pyspark.mllib.feature.HashingTF at 0x10c7e2ac8>

In [12]:
# This will create feature vectors by converting the text into bigrams of characters 
# using n-gram model and hashing them to a length 1000 feature vector that can be 
# passed into a Mllib application.


In [13]:
# Now we need to map these features with our datasets. 
# This can be done as follows

features_spam = spam_mails.map(lambda mail : features.transform(mail.split(" ")))
features_ham = ham_mails.map(lambda mail : features.transform(mail.split(" ")))

In [14]:
features_spam

PythonRDD[8] at RDD at PythonRDD.scala:53

In [15]:
features_spam.count()

20

In [16]:
features_spam.take(1)

[SparseVector(1000, {0: 1.0, 9: 1.0, 28: 3.0, 31: 3.0, 34: 1.0, 69: 1.0, 70: 2.0, 80: 2.0, 94: 1.0, 101: 3.0, 147: 1.0, 155: 1.0, 169: 2.0, 174: 1.0, 184: 1.0, 216: 1.0, 224: 1.0, 240: 2.0, 241: 1.0, 242: 1.0, 268: 1.0, 289: 1.0, 300: 1.0, 301: 1.0, 314: 1.0, 317: 1.0, 321: 1.0, 350: 1.0, 365: 4.0, 437: 1.0, 484: 1.0, 589: 1.0, 590: 1.0, 599: 1.0, 614: 1.0, 733: 1.0, 759: 1.0, 772: 1.0, 778: 1.0, 787: 1.0, 798: 1.0, 799: 1.0, 821: 1.0, 827: 1.0, 864: 1.0, 882: 1.0, 887: 1.0, 903: 1.0, 912: 1.0, 919: 1.0, 922: 1.0, 925: 2.0, 932: 1.0, 940: 1.0, 941: 1.0})]

In [17]:
features_ham

PythonRDD[11] at RDD at PythonRDD.scala:53

In [18]:
features_ham.count()

20

In [19]:
features_ham.take(1)

[SparseVector(1000, {0: 1.0, 6: 1.0, 69: 1.0, 80: 1.0, 150: 1.0, 161: 1.0, 162: 2.0, 317: 2.0, 370: 1.0, 403: 1.0, 423: 1.0, 521: 1.0, 604: 1.0, 651: 1.0, 743: 1.0, 770: 1.0, 809: 1.0, 827: 1.0, 831: 1.0, 872: 1.0, 971: 1.0})]

## Spam filtering is a kind of supervised learning
## Create Labels for spam and non-spam emails


In [20]:
# As spam filtering is a kind of supervised learning, 
# we need to provide labeled data to the application. 
# Labeled data typically consists of a bag of multidimensional 
# feature vectors. A labeled point is a local vector, 
# either dense or sparse, associated with a label/response. 
# In MLlib, labeled points are used in supervised learning algorithms. 
# We use a double to store a label, so we can use labeled points in 
# both regression and classification.

from pyspark.mllib.regression import LabeledPoint

positive_data = features_spam.map(lambda features : LabeledPoint(1, features))
negative_data = features_ham.map(lambda features :  LabeledPoint(0, features))

In [21]:
positive_data.take(1)

[LabeledPoint(1.0, (1000,[0,9,28,31,34,69,70,80,94,101,147,155,169,174,184,216,224,240,241,242,268,289,300,301,314,317,321,350,365,437,484,589,590,599,614,733,759,772,778,787,798,799,821,827,864,882,887,903,912,919,922,925,932,940,941],[1.0,1.0,3.0,3.0,1.0,1.0,2.0,2.0,1.0,3.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0]))]

In [22]:
negative_data.take(1)

[LabeledPoint(0.0, (1000,[0,6,69,80,150,161,162,317,370,403,423,521,604,651,743,770,809,827,831,872,971],[1.0,1.0,1.0,1.0,1.0,1.0,2.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))]

## Create Training and Test Data

In [23]:
# Now we need to create the training data for our application, 
# training data will be the 60% of total data. So first we will 
# club both the spam and ham datasets and then we will create the 
# training and test data as follows.

data = positive_data.union(negative_data)
data.cache()
data.count()

40

In [24]:
training, test = data.randomSplit([0.6, 0.4], seed=12345)

In [25]:
training.count()

23

In [26]:
test.count()

17

## Create a Logistic Regression Model

In [27]:
# Let’s create a logistic regression learner which uses the LBFGS optimizer.

from pyspark.mllib.classification import LogisticRegressionWithSGD
logistic_learner = LogisticRegressionWithSGD()

# We need to run the model using the training data.

model = logistic_learner.train(training)

In [28]:
# Test the built model
# Next, we need to test the model by creating a prediction label.

prediction_label = test.map(lambda x : (model.predict(x.features),x.label))

In [29]:
prediction_label.count()

17

In [30]:
prediction_label.collect()

[(1, 1.0),
 (1, 1.0),
 (0, 1.0),
 (1, 1.0),
 (0, 1.0),
 (0, 1.0),
 (0, 1.0),
 (1, 1.0),
 (1, 1.0),
 (0, 0.0),
 (0, 0.0),
 (0, 0.0),
 (0, 0.0),
 (0, 0.0),
 (0, 0.0),
 (0, 0.0),
 (0, 0.0)]

## Calculate Accuracy of Model

In [31]:
# Calculate the accuracy of the model. 
# Accuracy can be calculated by taking 
# the matching terms from both the training 
# and test data. This can be done as follows:

accuracy = 1.0 * prediction_label.filter(lambda x : x[0] == x[1]).count() / training.count()
accuracy

0.5652173913043478

## Naive Bayes Model

In [32]:
# In machine learning, naïve Bayes classifiers are a 
# family of simple "probabilistic classifiers" based on 
# applying Bayes' theorem with strong independence 
# assumptions between the features. They are among 
# the simplest Bayesian network models.

from pyspark.mllib.classification import NaiveBayes
model = NaiveBayes.train(training, 1.0)

In [33]:
model

<pyspark.mllib.classification.NaiveBayesModel at 0x10d1e3c88>

In [34]:
prediction_label = test.map(lambda x : (model.predict(x.features),x.label))

In [35]:
prediction_label.collect()

[(1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (0.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (1.0, 0.0),
 (1.0, 0.0),
 (0.0, 0.0),
 (1.0, 0.0),
 (0.0, 0.0)]