In [38]:
import re
import os
import tarfile
from html import unescape
from urllib import request
from email import policy
from email.message import EmailMessage
from email.parser import BytesParser


import numpy as np
import urlextract
from nltk.stem import PorterStemmer
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import precision_score, recall_score

In [14]:
DOWNLOAD_ROOT = "http://spamassassin.apache.org/old/publiccorpus/"
HAM_URL = DOWNLOAD_ROOT + "20030228_easy_ham.tar.bz2"
SPAM_URL = DOWNLOAD_ROOT + "20030228_spam.tar.bz2"
SPAM_PATH = os.path.join("datasets", "spam")


def featch_spam_data(ham_url: str = HAM_URL, spam_url: str = SPAM_URL, spam_path: str = SPAM_PATH) -> None:

    os.makedirs(spam_path, exist_ok=True)

    for file_name, url in zip(["ham.tar.bz2", "spam.tar.bz2"], [ham_url, spam_url]):
        path = os.path.join(spam_path, file_name)
        if not os.path.isfile(path):
            request.urlretrieve(url, path)
        
        with tarfile.open(path) as tar:
            tar.extractall(path=spam_path)


featch_spam_data()

In [15]:
HAM_DIR = os.path.join(SPAM_PATH, 'easy_ham')
SPAM_DIR = os.path.join(SPAM_PATH, 'spam')

ham_filenames = [name for name in sorted(os.listdir(HAM_DIR)) if len(name) > 20]
spam_filenames = [name for name in sorted(os.listdir(SPAM_DIR)) if len(name) > 20]

print("ham_filenames: ", len(ham_filenames))
print("spam_filenames: ", len(spam_filenames))

ham_filenames:  2500
spam_filenames:  500


In [16]:
def load_email(is_spam: bool, file_name: str, spam_path: str = SPAM_PATH) -> EmailMessage:
    directory = "spam" if is_spam else "easy_ham"
    path = os.path.join(spam_path, directory, file_name)
    with open(path, 'rb') as file:
        return BytesParser(policy=policy.default).parse(file)
    
ham_emails = [load_email(is_spam=False, file_name=name) for name in ham_filenames]
spam_emails = [load_email(is_spam=True, file_name=name) for name in spam_filenames]

In [17]:
print(ham_emails[1].get_content().strip())

Martin A posted:
Tassos Papadopoulos, the Greek sculptor behind the plan, judged that the
 limestone of Mount Kerdylio, 70 miles east of Salonika and not far from the
 Mount Athos monastic community, was ideal for the patriotic sculpture. 
 
 As well as Alexander's granite features, 240 ft high and 170 ft wide, a
 museum, a restored amphitheatre and car park for admiring crowds are
planned
---------------------
So is this mountain limestone or granite?
If it's limestone, it'll weather pretty fast.

------------------------ Yahoo! Groups Sponsor ---------------------~-->
4 DVDs Free +s&p Join Now
http://us.click.yahoo.com/pt6YBB/NXiEAA/mG3HAA/7gSolB/TM
---------------------------------------------------------------------~->

To unsubscribe from this group, send an email to:
forteana-unsubscribe@egroups.com

 

Your use of Yahoo! Groups is subject to http://docs.yahoo.com/info/terms/


In [18]:
print(spam_emails[1].get_content().strip())

1) Fight The Risk of Cancer!
http://www.adclick.ws/p.cfm?o=315&s=pk007

2) Slim Down - Guaranteed to lose 10-12 lbs in 30 days
http://www.adclick.ws/p.cfm?o=249&s=pk007

3) Get the Child Support You Deserve - Free Legal Advice
http://www.adclick.ws/p.cfm?o=245&s=pk002

4) Join the Web's Fastest Growing Singles Community
http://www.adclick.ws/p.cfm?o=259&s=pk007

5) Start Your Private Photo Album Online!
http://www.adclick.ws/p.cfm?o=283&s=pk007

Have a Wonderful Day,
Offer Manager
PrizeMama













If you wish to leave this list please use the link below.
http://www.qves.com/trim/?ilug@linux.ie%7C17%7C114258


-- 
Irish Linux Users' Group: ilug@linux.ie
http://www.linux.ie/mailman/listinfo/ilug for (un)subscription information.
List maintainer: listmaster@linux.ie


In [19]:
X = np.array(ham_emails + spam_emails, dtype=object)
y = np.array([0] * len(ham_emails) + [1] * len(spam_emails))

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

In [20]:
stemmer = PorterStemmer()
url_extractor = urlextract.URLExtract()

In [39]:
def html_to_plain_text(html: str) -> str:
    text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
    text = re.sub('<a\s.*?>', ' HYPERLINK ', text, flags=re.M | re.S | re.I)
    text = re.sub('<.*?>', '', text, flags=re.M | re.S)
    text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)
    return unescape(text)

def email_to_text(email: EmailMessage) -> str:
    html = None
    for part in email.walk():
        ctype = part.get_content_type()
        if not ctype in ['text/plain', 'text/html']:
            continue
        try:
            content = part.get_content()
        except:
            content = str(part.get_payload())
        if ctype == 'text/plain':
            return content
        else:
            html = content
    if html:
        return html_to_plain_text(html)

In [40]:
class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):

    def __init__(self):

        self.number_pattern = re.compile(r'\d+(?:\.\d*)?(?:[eE][+-]?\d+)?')
        self.non_word_pattern = re.compile(r'\W+')

    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y = None):
        transformed = []
        for email in X:
            text = email_to_text(email) or ""
            text = text.lower()

            urls = sorted(set(url_extractor.find_urls(text)), key=lambda url: len(url), reverse=True)
            for url in urls:
                text = text.replace(url, " URL ")

            text = self.number_pattern.sub('NUMBER', text)
            text = self.non_word_pattern.sub(' ', text)
            transformed.append(text)

        return transformed

In [41]:
preprocess_pipeline = Pipeline([
    ("email_to_wordcount", EmailToWordCounterTransformer()),
    ('vectorizer', CountVectorizer(
        analyzer='word',
        tokenizer=lambda text: [stemmer.stem(word) for word in text.split()],
    )),
])

X_train_transformed = preprocess_pipeline.fit_transform(X_train)



In [42]:
log_clf = LogisticRegression(max_iter=1000, random_state=42)
score = cross_val_score(log_clf, X_train_transformed, y_train, cv=3, verbose=3)
score.mean()

[CV] END ................................ score: (test=0.983) total time=   0.2s
[CV] END ................................ score: (test=0.986) total time=   0.4s
[CV] END ................................ score: (test=0.984) total time=   0.2s


np.float64(0.9841666666666667)

In [43]:
X_test_transformed = preprocess_pipeline.transform(X_test)

log_clf = LogisticRegression(max_iter=1000, random_state=42)
log_clf.fit(X_train_transformed, y_train)

y_pred = log_clf.predict(X_test_transformed)

print("Precision: {:.2f}%".format(100 * precision_score(y_test, y_pred)))
print("Recall: {:.2f}%".format(100 * recall_score(y_test, y_pred)))

Precision: 100.00%
Recall: 92.00%
