<a href="https://colab.research.google.com/github/siot12/testRepo/blob/main/Good_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Apache SpamAssassin Project maintains a nice collection of old e-mail messages that we can use. The messages age from early 2000’s, and probably the scammers are way smarter now, so please don’t use this in any production environment :)

Download the data set using the following code:

In [4]:
from os import makedirs, path, remove, rename, rmdir
from tarfile import open as open_tar
from urllib import request, parse


def download_corpus(dataset_dir: str = 'data2'):
    base_url = 'https://spamassassin.apache.org'
    corpus_path = 'old/publiccorpus'
    files = {
        '20021010_easy_ham.tar.bz2': 'ham',
        '20021010_spam.tar.bz2': 'spam',
    }

    downloads_dir = path.join(dataset_dir, 'downloads')
    ham_dir = path.join(dataset_dir, 'ham')
    spam_dir = path.join(dataset_dir, 'spam')


    makedirs(downloads_dir, exist_ok=True)
    makedirs(ham_dir, exist_ok=True)
    makedirs(spam_dir, exist_ok=True)

    for file, spam_or_ham in files.items():
        # download file
        url = parse.urljoin(base_url, f'{corpus_path}/{file}')
        tar_filename = path.join(downloads_dir, file)
        request.urlretrieve(url, tar_filename)

        # list e-mails in compressed file
        emails = []
        with open_tar(tar_filename) as tar:
            tar.extractall(path=downloads_dir)
            for tarinfo in tar:
                if len(tarinfo.name.split('/')) > 1:
                    emails.append(tarinfo.name)

        # move e-mails to ham or spam dir
        for email in emails:
            directory, filename = email.split('/')
            directory = path.join(downloads_dir, directory)
            rename(path.join(directory, filename),
                   path.join(dataset_dir, spam_or_ham, filename))

        rmdir(directory)


download_corpus()

We have a corpus of 6952 hams and 2399 spams.

In [5]:
from glob import glob
from os import path
ham_dir = path.join('data2', 'ham')
spam_dir = path.join('data2', 'spam')
print('hams:', len(glob(f'{ham_dir}/*')))  # hams: 6952
print('spams:', len(glob(f'{spam_dir}/*')))  # spams: 2399

hams: 2551
spams: 501


**Parsing messages**

If you open any of these individual files, you will see they are very hard to read. This is because they are in MIME format. Python has a standard library that helps us to extract only the part that we care about, namely subject and body.

Let’s create a class to represent a message. This class will hold the subject, the body, and will have a method to retrieve a clean string containing only letters.

In [6]:
from re import sub
class SimpleEmail:
    def __init__(self, subject: str, body: str):
        self.subject = subject
        self.body = body

    @property
    def clean(self):
        sanitizer = '[^A-Za-z]+'
        clean = sub(sanitizer, ' ', f'{self.subject} {self.body}')
        clean = clean.lower()
        return sub('\s+', ' ', clean)

    def __str__(self):
        subject = f'subject: {self.subject}'
        body_first_line = self.body.split('\n')[0]
        body = f'body: {body_first_line}...'
        return f'{subject}\n{body}'
    def __repr__(self):
        return self.__str__()

When we first started, we thought these messages would be heavy to load at once in memory, and because of that we built this generator for reading messages from a directory. In the end the messages are not that heavy…

In [7]:
from email import message_from_file
from glob import glob
class EmailIterator:
    def __init__(self, directory: str):
        self._files = glob(f'{directory}/*')
        self._pos = 0

    def __iter__(self):
        self._pos = -1
        return self

    def __next__(self):
        if self._pos < len(self._files) - 1:
            self._pos += 1
            return self.parse_email(self._files[self._pos])
        raise StopIteration()

    @staticmethod
    def parse_email(filename: str) -> SimpleEmail:
        with open(filename,
                  encoding='utf-8',
                  errors='replace') as fp:
            message = message_from_file(fp)

        subject = None
        for item in message.raw_items():
            if item[0] == 'Subject':
                subject = item[1]

        if message.is_multipart():
            body = []
            for b in message.get_payload():
                body.append(str(b))
            body = '\n'.join(body)
        else:
            body = message.get_payload()

        return SimpleEmail(subject, body)

**Pre-processing**

Let’s load everything in memory, we will be fine

In [8]:
import numpy as np
ham_emails = EmailIterator('data2/ham')
spam_emails = EmailIterator('data2/spam')
hams = np.array([email.clean for email in ham_emails])
spams = np.array([email.clean for email in spam_emails])

Since we have an unbalanced data set (6952 hams and 2399 spams) we need to take care when splitting it in training and test sets. We can use Scikit-Learn’s StratifiedShuffleSplit for that. It will make sure that we have a homogeneous distribution of hams and spams in both training and test sets.

In [9]:
from sklearn.model_selection import StratifiedShuffleSplit
split = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
emails = np.concatenate((hams, spams))
labels = np.concatenate((np.zeros(hams.size), np.ones(spams.size)))
for train_index, test_index in split.split(emails, labels):
    emails_train, labels_train = \
        emails[train_index], labels[train_index]
    emails_test, labels_test = \
        emails[test_index], labels[test_index]

Using the messages in the training set, we build a dictionary with the occurrences of each word across all messages.

In [10]:
from collections import defaultdict
dictionary = defaultdict(int)
for email in emails_train:
    for word in email.split(' '):
        dictionary[word] += 1

And then we select only the top 1000 most frequent ones (you can experiment varying this number). Also, notice we are ignoring single letters (len(word) > 1).

In [11]:
top = 1000
descending_dictionary = sorted(dictionary.items(),
                               key=lambda v: v[1],
                               reverse=True)
dictionary = [
    word for (word, occur) in descending_dictionary
    if len(word) > 1
][:top]

The idea now is that we will encode each message (subject + body) into an array where each index indicates how many times a given word appears there. For instance, if our dictionary was only:

In [None]:
["yes", "no", "have", "write", "script", "myself", "to"]

And a certain message is “I would prefer not to have to write a script myself”, it would be encoded as:

In [None]:
[0, 0, 1, 1, 1, 1, 2]

Which means:

In [None]:
[
    0,  # zero occurrence(s) of word yes
    0,  # zero occurrence(s) of word no
    1,  # one  occurrence(s) of word have
    1,  # one  occurrence(s) of word write
    1,  # one  occurrence(s) of word script
    1,  # one  occurrence(s) of word myself
    2   # two  occurrence(s) of word to
]

You could also do “0” or “1” for “not occur” or “occur”. The following function encodes a given message using the approach just described.

In [12]:
def encode_email(email: SimpleEmail,
                 dictionary_: list,
                 binary: bool = False) -> np.array:
    encoded = np.zeros(dictionary_.size)
    words = email.split(' ')

    for word in words:
        index = np.where(dictionary_ == word)[0]
        if index.size == 1:  # we ignore unknown words
            if binary:
                encoded[index[0]] = 1
            else:
                encoded[index[0]] += 1
    return encoded


And then we encode our messages.

In [15]:
from functools import partial
dictionary = np.array(dictionary)
_encode_email = partial(encode_email, dictionary_=dictionary)
encoded_train = np.array(list(map(_encode_email, emails_train)))
encoded_test = np.array(list(map(_encode_email, emails_test)))
print(encoded_train);

[[ 6.  2.  2. ...  0.  0.  0.]
 [ 3.  2.  1. ...  0.  0.  0.]
 [ 5.  2.  2. ...  0.  0.  0.]
 ...
 [13.  5.  4. ...  0.  0.  0.]
 [12. 13.  6. ...  0.  0.  0.]
 [10. 12.  9. ...  0.  0.  0.]]


**Perceptron training**

In [43]:
from sklearn.linear_model import Perceptron
from sklearn.metrics import accuracy_score,f1_score,precision_score,recall_score
from sklearn.model_selection import cross_val_predict
import numpy as np

perceptron = Perceptron(max_iter=5, random_state=45)
#perceptron.fit(encoded_train, labels_train)

# Make predictions on the test set
#labels_pred = perceptron.predict(encoded_train)
labels_pred = cross_val_predict(perceptron,encoded_train,labels_train,cv=5)

# Evaluate the accuracy
accuracy = accuracy_score(labels_train, labels_pred)
print(f"Accuracy: {accuracy * 100:.2f}%")

print('precision:', precision_score(labels_train, labels_pred))

print('recall:', recall_score(labels_train, labels_pred))

print('f1:', f1_score(labels_train, labels_pred))






Accuracy: 97.50%
precision: 0.9521276595744681
recall: 0.8927680798004988
f1: 0.9214929214929216


**Perceptron Prediction**

In [44]:
perceptron.fit(encoded_train,labels_train)
labels_pred = perceptron.predict(encoded_test)

print('accuracy:', accuracy_score(labels_test, labels_pred))

print('precision:', precision_score(labels_test, labels_pred))

print('recall:', recall_score(labels_test, labels_pred))

print('f1:', f1_score(labels_test, labels_pred))

accuracy: 0.8690671031096563
precision: 0.5581395348837209
recall: 0.96
f1: 0.7058823529411763


