In [1]:
import pandas as pd
import urllib
from pathlib import Path
import tarfile
import html2text
from urlextract import URLExtract
import nltk
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB

[nltk_data] Downloading package punkt to /home/vamsi10010/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /home/vamsi10010/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /home/vamsi10010/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [2]:
def fetch_spam_data():
    spam_root = "http://spamassassin.apache.org/old/publiccorpus/"
    ham_url = spam_root + "20030228_easy_ham.tar.bz2"
    spam_url = spam_root + "20030228_spam.tar.bz2"

    spam_path = Path() / "datasets" / "spam"
    spam_path.mkdir(parents=True, exist_ok=True)
    for dir_name, tar_name, url in (("easy_ham", "ham", ham_url),
                                    ("spam", "spam", spam_url)):
        if not (spam_path / dir_name).is_dir():
            path = (spam_path / tar_name).with_suffix(".tar.bz2")
            print("Downloading", path)
            urllib.request.urlretrieve(url, path)
            tar_bz2_file = tarfile.open(path)
            tar_bz2_file.extractall(path=spam_path)
            tar_bz2_file.close()
    return [spam_path / dir_name for dir_name in ("easy_ham", "spam")]

ham_dir, spam_dir = fetch_spam_data()

In [3]:
ham_filenames = [f for f in sorted(ham_dir.iterdir()) if len(f.name) > 20]
spam_filenames = [f for f in sorted(spam_dir.iterdir()) if len(f.name) > 20]

In [4]:
import email
import email.policy

def load_file(file):
    with open(file, 'rb') as f:
        return email.parser.BytesParser(policy=email.policy.default).parse(f)

In [5]:
ham_emails = [load_file(file) for file in ham_filenames]
spam_emails = [load_file(file) for file in spam_filenames]

In [6]:
#converts email to raw text

def email_to_text(email):
    output = ''
    for part in email.walk():
        if part.get_content_type() == 'text/plain':
            try:
                output += part.get_content()
            except:
                output += str(part.get_payload())
        elif part.get_content_type() == 'text/html':
            try:
                output += html2text.html2text(part.get_content())
            except:
                output += html2text.html2text(str(part.get_payload()))
    return output

In [7]:
#train and test data

import numpy as np
from sklearn.model_selection import train_test_split

X = np.array(ham_emails + spam_emails, dtype=object)
y = np.array([0] * len(ham_emails) + [1] * len(spam_emails))

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
                                                    random_state=42)

In [8]:
def url_replacer(text):
    extractor = URLExtract()
    urls = list(set(extractor.find_urls(text)))
    urls.sort(key=lambda url: len(url), reverse=True)
    for url in urls:
        text = text.replace(url, " URL ")
    return text

In [9]:
import re

def remove_punctuation(text):
    return re.sub(r'\W+', ' ', text, flags=re.M)

In [12]:
from nltk.tokenize import word_tokenize
from nltk import PorterStemmer
from nltk.corpus import stopwords
from sklearn.base import BaseEstimator, TransformerMixin

class EmailProcessor(BaseEstimator, TransformerMixin):
    def __init__(self, replace_urls=True, replace_numbers=True, remove_stopwords=True, use_headers=False):
        self.use_headers = use_headers
        self.replace_urls = replace_urls
        self.replace_numbers = replace_numbers
        self.remove_stopwords = remove_stopwords
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        content = []
        for email in X:
            sub = email['Subject']
            text = email_to_text(email)
            if self.replace_urls:
                text = url_replacer(text)
            if self.replace_numbers:
                text = re.sub(r'\d+(?:\.\d*)?(?:[eE][+-]?\d+)?', 'NUMBER', text)
                sub = re.sub(r'\d+(?:\.\d*)?(?:[eE][+-]?\d+)?', 'NUMBER', sub)
            
            stemmer = PorterStemmer()
            tokens = [stemmer.stem(s) for s in word_tokenize(remove_punctuation(text))]
            sub_tokens = [stemmer.stem(s) for s in word_tokenize(remove_punctuation(sub))]
            if self.remove_stopwords:
                stop_words = set(stopwords.words('english'))
                tokens = [i for i in tokens if i not in stop_words]
                sub_tokens = [i for i in sub_tokens if i not in stop_words]
            text = " ".join(tokens)
            sub = " ".join(sub_tokens)
            
            content.append((sub + text) if self.use_headers else text)
            
        # output = pd.DataFrame(content, columns=['Content'])
        return content
            

In [13]:
from sklearn.pipeline import Pipeline

preprocess = Pipeline([
    ('email_processor', EmailProcessor(use_headers=True)),
    ('vectorizer', TfidfVectorizer())
])

X_train_tf = preprocess.fit_transform(X_train)

In [14]:
from sklearn.naive_bayes import MultinomialNB
from sklearn.model_selection import cross_val_score

model = MultinomialNB()
score = cross_val_score(model, X_train_tf.toarray(), y_train, cv=10)

score.mean()

0.85