In [1]:
import os
import tarfile
import urllib.request
from pathlib import Path

def fetch_spam_data():
  spam_root = "http://spamassassin.apache.org/old/publiccorpus/"
  urls = {
    "easy_ham": spam_root + "20030228_easy_ham.tar.bz2",
    "spam": spam_root + "20030228_spam.tar.bz2",
  }

  spam_path = Path() / "datasets" / "Spam"
  spam_path.mkdir(parents=True, exist_ok=True)

  for dir_name, url in urls.items():
        if not (spam_path / dir_name).is_dir():
            path = (spam_path / dir_name).with_suffix(".tar.bz2")
            print("Downloading", path)
            urllib.request.urlretrieve(url, path)
            with tarfile.open(path) as tar_bz2_file:
                tar_bz2_file.extractall(path=spam_path)
                tar_bz2_file.close()
            os.remove(path)
  return [spam_path / dir_name for dir_name in urls]

ham_dir, spam_dir = fetch_spam_data()



Downloading datasets\Spam\easy_ham.tar.bz2
Downloading datasets\Spam\spam.tar.bz2


In [2]:
import email
import email.policy

def load_email(filepath):
  with open(filepath, "rb") as f:
    return email.parser.BytesParser(policy=email.policy.default).parse(f)

ham_filenames = [f for f in sorted(ham_dir.iterdir()) if len(f.name) > 20]
spam_filenames = [f for f in sorted(spam_dir.iterdir()) if len(f.name) > 20]
print("Ham files:", len(ham_filenames))
print("Spam Files:", len(spam_filenames))

ham_emails = [load_email(filepath) for filepath in ham_filenames]
spam_emails = [load_email(filepath) for filepath in spam_filenames]


Ham files: 2500
Spam Files: 500


In [3]:
import numpy as np
from sklearn.model_selection import train_test_split

X = np.array(ham_emails + spam_emails, dtype=object)
y = np.array([0]*len(ham_emails) + [1]*len(spam_emails))

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [4]:
# preprocessing 

import re
from html import unescape

# convert HTML to Plain Text
def html_to_plain_text(html):
  # replace all head tags with white space effectivly removing them
  text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
  # replace all <a> tags with the word HYPERLINK
  text = re.sub('<a\s.*?>', ' HYPERLINK ', text, flags=re.M | re.S | re.I)
  # removes the rest of the html tags
  text = re.sub('<.*?>', '', text, flags=re.M | re.S)
  # condenses multi newlines and whitespace into single \n
  text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)
  return unescape(text)

# convert email to plain text
def email_to_text(email):
  html = None
  for part in email.walk():
        ctype = part.get_content_type()
        if not ctype in ("text/plain", "text/html"):
            continue
        try:
            content = part.get_content()
        except:  # in case of encoding issues
            content = str(part.get_payload())
        if ctype == "text/plain":
            return content
        else:
            html = content
  if html:
    return html_to_plain_text(html)
  return ""


  text = re.sub('<a\s.*?>', ' HYPERLINK ', text, flags=re.M | re.S | re.I)


In [5]:
# email to word counter transformer

import nltk
from collections import Counter

stemmer = nltk.PorterStemmer()
import urlextract
url_extractor = urlextract.URLExtract()

from sklearn.base import BaseEstimator, TransformerMixin

class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, strip_headers=True, lower_case=True,
                 remove_punctuation=True, replace_urls=True,
                 replace_numbers=True, stemming=True):
        self.strip_headers = strip_headers
        self.lower_case = lower_case
        self.remove_punctuation = remove_punctuation
        self.replace_urls = replace_urls
        self.replace_numbers = replace_numbers
        self.stemming = stemming

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        X_transformed = []
        for email in X:
            text = email_to_text(email) or ""
            if self.lower_case:
                text = text.lower()
            if self.replace_urls and url_extractor is not None:
                urls = list(set(url_extractor.find_urls(text)))
                urls.sort(key=lambda url: len(url), reverse=True)
                for url in urls:
                    text = text.replace(url, " URL ")
            if self.replace_numbers:
                text = re.sub(r'\d+(?:\.\d*)?(?:[eE][+-]?\d+)?', 'NUMBER', text)
            if self.remove_punctuation:
                text = re.sub(r'\W+', ' ', text, flags=re.M)
            word_counts = Counter(text.split())
            if self.stemming and stemmer is not None:
                stemmed_word_counts = Counter()
                for word, count in word_counts.items():
                    stemmed_word = stemmer.stem(word)
                    stemmed_word_counts[stemmed_word] += count
                word_counts = stemmed_word_counts
            X_transformed.append(word_counts)
        return np.array(X_transformed)


In [6]:
# word counter to vector transformer
from scipy.sparse import csr_matrix

class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, vocabulary_size=1000):
        self.vocabulary_size = vocabulary_size

    def fit(self, X, y=None):
        total_count = Counter()
        for word_count in X:
            for word, count in word_count.items():
                total_count[word] += min(count, 10)
        most_common = total_count.most_common()[:self.vocabulary_size]
        self.vocabulary_ = {word: index + 1
                            for index, (word, count) in enumerate(most_common)}
        return self

    def transform(self, X, y=None):
        rows = []
        cols = []
        data = []
        for row, word_count in enumerate(X):
            for word, count in word_count.items():
                rows.append(row)
                cols.append(self.vocabulary_.get(word, 0))
                data.append(count)
        return csr_matrix((data, (rows, cols)),
                          shape=(len(X), self.vocabulary_size + 1))

In [7]:
# data transformation
from sklearn.pipeline import Pipeline

preprocess_pipeline = Pipeline([
    ("email_to_wordcount", EmailToWordCounterTransformer()),
    ("wordcount_to_vector", WordCounterToVectorTransformer()),
])

X_train_transformed = preprocess_pipeline.fit_transform(X_train)

In [8]:
# training the classifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

log_clf = LogisticRegression(max_iter=1000, random_state=42)
log_clf.fit(X_train_transformed,y_train)

In [10]:
# eval clf
from sklearn.metrics import precision_score, recall_score

X_test_transformed = preprocess_pipeline.transform(X_test)
y_pred = log_clf.predict(X_test_transformed)

print(f"Precision: {precision_score(y_test, y_pred):.2%}")
print(f"Recall: {recall_score(y_test, y_pred):.2%}")

Precision: 95.88%
Recall: 97.89%
