In this part, we randomly partition our dataset into test-train splits. We then transform our train dataset to extract features and train a Multinomial Naive Bayes model. Then, we use the test dataset to calculate the accuracy of our prediction.

### If on Google Colab

Execute the below cells only if running on Google Colab. They install the needed packages and download the "structured.xlsx" file from Google drive. 

In [1]:
!pip install PyDrive
!pip install xlrd

Collecting xlrd
[?25l  Downloading https://files.pythonhosted.org/packages/07/e6/e95c4eec6221bfd8528bcc4ea252a850bffcc4be88ebc367e23a1a84b0bb/xlrd-1.1.0-py2.py3-none-any.whl (108kB)
[K    100% |████████████████████████████████| 112kB 4.2MB/s 
[?25hInstalling collected packages: xlrd
Successfully installed xlrd-1.1.0


In [0]:
import os
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

In [0]:
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [0]:
!rm -rf structured.xlsx
download = drive.CreateFile({'id': '1oh_fic0-1N1xh4OlTvGMQ5BTKjOmUYKi'})
download.GetContentFile('structured.xlsx')

In [0]:
download3 = drive.CreateFile({'id': '1iFVW0RxqL1VNrIOrfJXrDkPHF8ewXH49'})
download3.GetContentFile('spark-2.3.1-bin-hadoop2.7.tgz')

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

Set environment variables for Java and Spark

In [0]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

### If not on Google Colab

If running locally, run from here.

In [1]:
import pandas as pd
import numpy as np
import os
from time import time
from bs4 import BeautifulSoup
import math

We read the "structured.xlsx" file into a pandas dataframe.

In [2]:
def conv(content):
    '''
    convert datatype to string or np.NaN
    '''
    # if content is NaN
    if(content != content):
        return np.NaN
    # else
    return str(content)
    

In [3]:
converters = {'Spam':conv, 'Body':conv, 'Subject':conv, 'From':conv, 'To':conv, 'X-UIDL':conv, 'Message-Id':conv, 'Sender':conv}

In [4]:
df_final = pd.read_excel('structured.xlsx', sheet_name='Sheet1', index_col=None, converters=converters )

## Engineering features from Email Headers

Email headers show the route an email has taken before arriving at its recipient. It contains important information like sender, recipient, message-id, date and time, subject etc. 

There are two reasons why spammers try to malform email headers.<br/>
    1.They try to conceil their identity and real source of the email.<br/>
    2.They try to conceil the fact that the email was part of a mass mailing effort. 

FEATURE1: Spammers will sometimes enter all recipients in the Bcc field, and the address in the 'From' field is used in the 'To' field. Creating a new column 'Feature1' where 1 indicates 'From' field is the same as 'To' field, 0 indicates otherwise.

In [5]:
# Feature1 -> From same as To. If yes -> 1, else 0
df_final['Feature1'] = 0
df_final.loc[df_final['From'] == df_final['To'], 'Feature1'] = 1
df_final.head()

Unnamed: 0,From,To,Message-Id,Subject,Body,Spam,X-UIDL,Sender,Feature1
0,aj881c <aj881c@ix.netcom.com>\n,<bagpipes@acadia.net>\n,<19943672.886214@relay.comanche.denmark.eu> M...,2-1\n,email marketing works!!\n\nbull's eye gold is ...,Spam,,,0
1,iwbp@mailcity.com\n,members@your.net\n,<>\n,"Exclusive Internet Business, 1st Time Offered...",>>>this is the most exciting breakthrough ever...,Spam,,,0
2,am74rt <am74rt@worldnet.att.net>\n,<badams@eastky.com>\n,<19943672.886214@relay.comanche.denmark.eu> T...,2-17\n,email marketing works!!\n\nbull's eye gold is ...,Spam,,,0
3,"""D.Reynolds"" <subwiz1@friendlyserver.com>\n",,<199802161222.EAA24869@net1.aoci.com>\n,ADV: FREE DOWNLOAD:Register your web site to ...,free download.register your web site to over 7...,Spam,,,0
4,carlover@goplay.com\n,carlovers@america.com\n,<>\n,AUTOMOBILE OPPORTUNITY\n,do you love cars?\n\nwant your own business?\n...,Spam,,,0


FEATURE2: Again, because spammers send out emails by filling the 'Bcc' field, they sometimes leave the 'To' field empty or with an invalid string. Creating a new column 'Feature2' where 1 indicates invalid or NaN 'To' field, 0 indicates otherwise.

The method defined below splits the passed string around the ',' character, to get individual email addresses, 
which are stripped off of the new line chars. Method can handle email address strings like 
`"Tomas Jacobs" <RickyAmes@aol.com>` also. Regex is then used to check format correctness

In [6]:
import re
def isValidEmailFormat(emails):
    """The method splits the passed string around the ',' character, to get individual 
    email addresses, which are stripped off of the new line chars. Method can handle email address 
    strings like  "Tomas Jacobs" <RickyAmes@aol.com> also. Regex is then used to check format 
    correctness"""
    for email in str(emails).split(','):
        if(email.isspace() or len(email) == 0):
            continue
        
        # strip new line chars
        email = re.sub(r'(\n+)', r' ', str(email)).strip()
        #print(email)
        
        # handle both "Tomas Jacobs" <RickyAmes@aol.com> or <RickyAmes@aol.com>
        if(re.match(r"(.+)<(.+)>|<(.+)>", email)):
            email = email[email.find("<")+1:-1]
            #print(0, email)
            
        if(len(email) > 7):
            if(re.match("^.+@([?)[a-zA-Z0-9-.]+.([a-zA-Z]{2,3}|[0-9]{1,3})(]?))$", email) != None):
                #print('continue')
                continue
            else:
                #print('1')
                return 1
        else:
            return 1
        
    return 0

In [7]:
# Feature2 -> is the To column na or invalid ? 1->invalid, 0->valid
df_final['Feature2'] = 0
df_final['Feature2'] = df_final['To'].map(isValidEmailFormat)
df_final.loc[df_final['To'].isna(),'Feature2'] = 1
df_final[['Feature2','To']].head()

Unnamed: 0,Feature2,To
0,0,<bagpipes@acadia.net>\n
1,0,members@your.net\n
2,0,<badams@eastky.com>\n
3,1,
4,0,carlovers@america.com\n


In [8]:
#df_final[df_final['To'].notna() & df_final['Feature2'] == 1][['To','Feature2']]

In [9]:
import re
def isValidMessageID(mid):
    '''
    checks for valid domain in message id where 0 is valid and 1 is invalid
    '''
    for email in str(mid).split('\n'):
        if(email.isspace() or len(email) == 0 or email.find('@') < 0):
            continue
            
        email = email.strip()
        #print(email)
        
        if(re.match(r"(.+)?<<(.+)@(.+)>>(.+)?", email)):
            email = email[email.find("<<")+1:email.rfind(">>")]
            #print(0, email)
        
        if(re.match(r"(.+)?<(.+)@(.+)>(.+)?", email)):
            email = email[email.find("<")+1:email.rfind(">")]
            #print(0, email)
            
        if(len(email) > 7):
            if(re.match("^.+@([?)[a-zA-Z0-9-.]+.([a-zA-Z]{2,3}|[0-9]{1,3})(]?))$", email) != None):
                return 0
            else:
                continue
        else:
            continue
        
    return 1

FEATURE3: Since the 'Message-Id' contains information about where the email is coming from, in spam mails, it is typically missing or malformed. Message-Ids are of the form xxx@domain.com. The method below checks the correctness of the Message-Id format. Creating a new column 'Feature3' where 1 indicates Message-ID malformed or missing, 0 indicates otherwise. 

In [10]:
# Feature3 -> is the Message-Id column na or invalid ? 1->invalid, 0->valid
df_final['Feature3'] = 0
df_final['Feature3'] = df_final['Message-Id'].map(isValidMessageID)
df_final.loc[df_final['Message-Id'].isna(),'Feature3'] = 1
df_final[['Feature3','Message-Id']].head()

Unnamed: 0,Feature3,Message-Id
0,0,<19943672.886214@relay.comanche.denmark.eu> M...
1,1,<>\n
2,0,<19943672.886214@relay.comanche.denmark.eu> T...
3,0,<199802161222.EAA24869@net1.aoci.com>\n
4,1,<>\n


Displaying Message-Id values that are malformed.

In [11]:
df_final.loc[df_final['Feature3'] == 1]['Message-Id'].head()

1                           <>\n
4                           <>\n
5       Mach10 1.1 fxpromo.com\n
10     <199803250408.UAA03361>\n
13                           NaN
Name: Message-Id, dtype: object

FEATURE4: The 'X-UIDL' header is intended to stop the recepient's mail server from downloading multiple copies of the mail once the mail is received. Normally, X-UIDL is stripped once the mail is received. Spammers' intentionally add the X-UIDL, so that mail servers download multiple copies of the mail, increasing chances of it being read. Creating a new column 'Feature4', where 1 indicates X-UIDL not empty, 0 indicates otherwise.

In [12]:
# Feature4, is the X-UIDL header not empty or na ? 1->not empty, 0->empty 
df_final['Feature4'] = 0
df_final.loc[df_final['X-UIDL'].notna(), 'Feature4'] = 1
df_final.loc[df_final['X-UIDL'].isna(), 'Feature4'] = 0
df_final[['Feature4', 'X-UIDL']].head()

Unnamed: 0,Feature4,X-UIDL
0,0,
1,0,
2,0,
3,0,
4,0,


Displaying rows where X-UIDL is not empty.

In [13]:
df_final.loc[df_final['Feature4']==1]['X-UIDL'].head()

9      c89dd4e061ba173523703cf25c3133a2\n
11     763cf6e5123c1287a83f12d7e99c60c9\n
16                  10293287_192832.222\n
22     f2c3e4bf7654f32bfd17a6c54dc32f1d\n
24     11111111111111111111111111111111\n
Name: X-UIDL, dtype: object

FEATURE5: Now, we process the body to extract features that distinguish spam from ham. Spammers will typically use certain words (eg. free, limited offer, click here) to catch the attention of their recipients. Overuse of capitals and punctuation marks are also a marked characteristic of spam. Also, spammers will intentionally mis-spell words (eg. w4rning for warning), to bypass spam filters. So, a quantity like 'percent mis-spelt email' may make a good feature for detecting spam. 

We will try to leverage the fact that spammers use certain words often in their emails. First, we create a pyspark dataframe from our pandas dataframe. 

In [14]:
import findspark
findspark.init()
import pyspark
from pyspark.ml.feature import HashingTF, IDF, IDFModel, Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [15]:
# Now, we create a Spark, Pandas dataframe of features from df_final
# Features include TFIDF vector, vector of misspellings count per email, punctuation count per email 

In [16]:
import re
# clean a string off punctuations and additional whitespace, new line characters.
def processString(body):
    body = body.replace("'", "")
    body = re.sub(r"[^\w\s]|_", " ", body)
    body = re.sub("[\s+]", " ", body)
    return body

Right now, the email body is in it's raw form. We process it to remove punctuation, redundant white space and trim it.

In [17]:
# Feature 5 - TFIDF
body = df_final['Body']
body = body.map(processString, na_action='ignore')

Convert to pandas dataframe

In [18]:
pddf = pd.DataFrame(body, columns=['Body'])
pddf['RawFeature1'] = df_final['Feature1']
pddf['RawFeature2'] = df_final['Feature2']
pddf['RawFeature3'] = df_final['Feature3']
pddf['RawFeature4'] = df_final['Feature4']
pddf['Spam'] = df_final['Spam']
pddf.head()

Unnamed: 0,Body,RawFeature1,RawFeature2,RawFeature3,RawFeature4,Spam
0,email marketing works bulls eye gold is the...,0,0,0,0,Spam
1,this is the most exciting breakthrough ever...,0,0,1,0,Spam
2,email marketing works bulls eye gold is the...,0,0,0,0,Spam
3,free download register your web site to over 7...,0,1,0,0,Spam
4,do you love cars want your own business th...,0,0,1,0,Spam


Before creating the spark dataframe, we replace all NaNs with empty string. Our spark dataframe now contains body, individual features 1-4 and the label(spam or ham). 

In [19]:
pddf.fillna("", inplace=True)
df = spark.createDataFrame(pddf)
df.show(3)

+--------------------+-----------+-----------+-----------+-----------+----+
|                Body|RawFeature1|RawFeature2|RawFeature3|RawFeature4|Spam|
+--------------------+-----------+-----------+-----------+-----------+----+
|email marketing w...|          0|          0|          0|          0|Spam|
|   this is the mo...|          0|          0|          1|          0|Spam|
|email marketing w...|          0|          0|          0|          0|Spam|
+--------------------+-----------+-----------+-----------+-----------+----+
only showing top 3 rows



Using spark StringIndexer to index the label column. 0.0 indicates spam and 1.0 indicates ham.

In [20]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol='Spam', outputCol='label')
df = stringIndexer.fit(df).transform(df)
df.show(3)

+--------------------+-----------+-----------+-----------+-----------+----+-----+
|                Body|RawFeature1|RawFeature2|RawFeature3|RawFeature4|Spam|label|
+--------------------+-----------+-----------+-----------+-----------+----+-----+
|email marketing w...|          0|          0|          0|          0|Spam|  0.0|
|   this is the mo...|          0|          0|          1|          0|Spam|  0.0|
|email marketing w...|          0|          0|          0|          0|Spam|  0.0|
+--------------------+-----------+-----------+-----------+-----------+----+-----+
only showing top 3 rows



Using RegexTokenizer to tokenize the Body. This breaks the body into chunks around non-word delimiters (\\W).

In [21]:
regexTokenizer = RegexTokenizer(inputCol='Body', outputCol='Body_Tokens', pattern='\\W')

In [22]:
df_tokenized = regexTokenizer.transform(df)
df_tokenized['Body_Tokens']

Column<b'Body_Tokens'>

In [23]:
df_tokenized.show(3)

+--------------------+-----------+-----------+-----------+-----------+----+-----+--------------------+
|                Body|RawFeature1|RawFeature2|RawFeature3|RawFeature4|Spam|label|         Body_Tokens|
+--------------------+-----------+-----------+-----------+-----------+----+-----+--------------------+
|email marketing w...|          0|          0|          0|          0|Spam|  0.0|[email, marketing...|
|   this is the mo...|          0|          0|          1|          0|Spam|  0.0|[this, is, the, m...|
|email marketing w...|          0|          0|          0|          0|Spam|  0.0|[email, marketing...|
+--------------------+-----------+-----------+-----------+-----------+----+-----+--------------------+
only showing top 3 rows



Using StopWordsRemover to filter out meaningless stop words.

In [24]:
stopWordsRemover = StopWordsRemover(inputCol="Body_Tokens", outputCol="Body_Tokens2")
df_tokenized = stopWordsRemover.transform(df_tokenized)
df_tokenized.show(5)

+--------------------+-----------+-----------+-----------+-----------+----+-----+--------------------+--------------------+
|                Body|RawFeature1|RawFeature2|RawFeature3|RawFeature4|Spam|label|         Body_Tokens|        Body_Tokens2|
+--------------------+-----------+-----------+-----------+-----------+----+-----+--------------------+--------------------+
|email marketing w...|          0|          0|          0|          0|Spam|  0.0|[email, marketing...|[email, marketing...|
|   this is the mo...|          0|          0|          1|          0|Spam|  0.0|[this, is, the, m...|[exciting, breakt...|
|email marketing w...|          0|          0|          0|          0|Spam|  0.0|[email, marketing...|[email, marketing...|
|free download reg...|          0|          1|          0|          0|Spam|  0.0|[free, download, ...|[free, download, ...|
|do you love cars ...|          0|          0|          1|          0|Spam|  0.0|[do, you, love, c...|[love, cars, want...|
+-------

TF-IDF in spark is divided into HashingTF and then IDF. Applying HashingTF to create term frequencies from "Body_Tokens2" column into "TermFreqs" column.

In [25]:
hashingTF = HashingTF(inputCol="Body_Tokens2", outputCol="TermFreqs", numFeatures=20)
df3 = hashingTF.transform(df_tokenized)

In [26]:
df3.columns

['Body',
 'RawFeature1',
 'RawFeature2',
 'RawFeature3',
 'RawFeature4',
 'Spam',
 'label',
 'Body_Tokens',
 'Body_Tokens2',
 'TermFreqs']

Here, we create an IDF model and fit it over "TermFreqs" column. A new column "RawFeature5" is created that contains the TF-IDF vector for every email row. 

In [27]:
idfModel = IDF(inputCol="TermFreqs", outputCol="RawFeature5").fit(df3)
df4 = idfModel.transform(df3)
df4.show(5)

+--------------------+-----------+-----------+-----------+-----------+----+-----+--------------------+--------------------+--------------------+--------------------+
|                Body|RawFeature1|RawFeature2|RawFeature3|RawFeature4|Spam|label|         Body_Tokens|        Body_Tokens2|           TermFreqs|         RawFeature5|
+--------------------+-----------+-----------+-----------+-----------+----+-----+--------------------+--------------------+--------------------+--------------------+
|email marketing w...|          0|          0|          0|          0|Spam|  0.0|[email, marketing...|[email, marketing...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
|   this is the mo...|          0|          0|          1|          0|Spam|  0.0|[this, is, the, m...|[exciting, breakt...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
|email marketing w...|          0|          0|          0|          0|Spam|  0.0|[email, marketing...|[email, marketing...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
|fre

Spark VectorAssembler is a transformer that combines a list of raw features into a single feature vector.

In [28]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols=['RawFeature1', 'RawFeature2', 'RawFeature3', 'RawFeature4','RawFeature5'], outputCol='Features')

In [29]:
df5 = vectorAssembler.transform(df4)
df5.show(2)

+--------------------+-----------+-----------+-----------+-----------+----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|                Body|RawFeature1|RawFeature2|RawFeature3|RawFeature4|Spam|label|         Body_Tokens|        Body_Tokens2|           TermFreqs|         RawFeature5|            Features|
+--------------------+-----------+-----------+-----------+-----------+----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|email marketing w...|          0|          0|          0|          0|Spam|  0.0|[email, marketing...|[email, marketing...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|[0.0,0.0,0.0,0.0,...|
|   this is the mo...|          0|          0|          1|          0|Spam|  0.0|[this, is, the, m...|[exciting, breakt...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|[0.0,0.0,1.0,0.0,...|
+--------------------+-----------+-----------+-----------+-------

In [30]:
# Keeping only columns that we need
df6 = df5.drop('Body', 'RawFeature2', 'RawFeature2', 'RawFeature3','RawFeature4','Body_Tokens','Body_Tokens2','TermFreqs','RawFeature5')
df6.columns

['RawFeature1', 'Spam', 'label', 'Features']

In [31]:
df6.columns

['RawFeature1', 'Spam', 'label', 'Features']

Splitting the spark dataframe randomly into 75% train and 25% test dataframes.

In [32]:
df6_train, df6_test = df6.randomSplit([3.0, 1.0], 24)

Using Multinomial Naive Bayes model and training it over train dataset.

In [33]:
from pyspark.ml.classification import NaiveBayes

In [34]:
nb = NaiveBayes(smoothing=1.0, modelType='multinomial', featuresCol='Features')

In [35]:
NBModel = nb.fit(df6_train)

Testing the trained model over test dataset.

In [36]:
predictions = NBModel.transform(df6_test)

Calculating the accuracy of our model.

In [37]:
correct = predictions[predictions['label'] == predictions['prediction']]
incorrect = predictions[predictions['label'] != predictions['prediction']]

In [38]:
print('Correct predictions: ', correct.count())
print('Incorrect predictions: ', incorrect.count())

Correct predictions:  898
Incorrect predictions:  328


In [39]:
print('Accuracy in %: ', (correct.count() * 100.) / (correct.count() + incorrect.count()))

Accuracy in %:  73.2463295269168


Another feature that is WIP is 'percent mis-spelt words'. We can use the pattern.en package's suggest method to check if a word is mis-spelt or no. This could be applied over all tokenized words of an email. Divided by length of the email, it results in 'percent mis-spelt words'. 

## Feature 6 Engineering -> Percent Misspellings -> Work In Progress

In [40]:
"""
df_tokenized = df_tokenized.sample(0.005)
df_tokenized.show()"""

'\ndf_tokenized = df_tokenized.sample(0.005)\ndf_tokenized.show()'

In [41]:
"""df_tokenized.count()"""

'df_tokenized.count()'

In [43]:
"""# Feature5 -> Percent of misspellings in mail body
from pattern.en import spelling
spelling.suggest('wrng')"""

"# Feature5 -> Percent of misspellings in mail body\nfrom pattern.en import spelling\nspelling.suggest('wrng')"

In [None]:
"""from functools import reduce
from pattern.en import spelling
percentSpelling = []
def getPercentMisspelled(wordList):
    print(wordList)
    f = lambda x,y: int(x)+1 if spelling.suggest(y)[0][0] != y else int(x)
    e = float(reduce(f, wordList, 0)/len(wordList))
    percentSpelling.append(e)"""

In [None]:
#r1 = df_tokenized.first()['Body2']

In [None]:
#getPercentMisspelled(r1)

In [None]:
"""df_tokenized"""

In [None]:
"""from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
#myudf = udf(getPercentMisspelled, FloatType())"""

In [None]:
#c = myudf(df_tokenized.Body2)

In [None]:
#df_tokenized = df_tokenized.withColumn('Feature5', c)