# Spam Classifier

## Download examples of spam and ham (non-spam) from https://spamassassin.apache.org/old/publiccorpus/

In [1]:
import os
import tarfile
from six.moves import urllib

DOWNLOAD_ROOT = "https://spamassassin.apache.org/old/publiccorpus/"
SPAM_PATH = os.path.join("datasets", "spam") #creating dir datasets and datasets/spam
SPAM_URL = DOWNLOAD_ROOT + "20030228_spam.tar.bz2"
HAM_URL = DOWNLOAD_ROOT + "20030228_easy_ham.tar.bz2"

def fetch_spam_data(spam_url=SPAM_URL, spam_path=SPAM_PATH):
    if not os.path.isdir(spam_path):
        os.makedirs(spam_path)
        
    for filename, url in (("ham.tar.bz2", HAM_URL), ("spam.tar.bz2", SPAM_URL)): # iterate through tuple of tuples
        path = os.path.join(spam_path, filename)
        if not os.path.isfile(path):
            urllib.request.urlretrieve(url, path)
        tar_bz2_file = tarfile.open(path)
        tar_bz2_file.extractall(path=SPAM_PATH)
        tar_bz2_file.close()
        
fetch_spam_data()

## Load filenames of each email onto variables

In [2]:
ham_path = os.path.join(SPAM_PATH, "easy_ham")
spam_path = os.path.join(SPAM_PATH, "spam")
ham_filenames = [filename for filename in sorted(os.listdir(ham_path)) if len(filename) > 20]
spam_filenames = [filename for filename in sorted(os.listdir(spam_path)) if len(filename) > 20]

In [3]:
import email
import email.policy

def load_email(is_spam, filename, spam_path=SPAM_PATH):
    directory = "spam" if is_spam else "easy_ham"
    with open(os.path.join(spam_path, directory, filename), "rb") as f:
        return email.parser.BytesParser(policy=email.policy.default).parse(f)

In [4]:
ham_emails = [load_email(is_spam=False, filename=name) for name in ham_filenames]
spam_emails = [load_email(is_spam=True, filename=name) for name in spam_filenames]

In [5]:
print(spam_emails[1].get_content().strip())

1) Fight The Risk of Cancer!
http://www.adclick.ws/p.cfm?o=315&s=pk007

2) Slim Down - Guaranteed to lose 10-12 lbs in 30 days
http://www.adclick.ws/p.cfm?o=249&s=pk007

3) Get the Child Support You Deserve - Free Legal Advice
http://www.adclick.ws/p.cfm?o=245&s=pk002

4) Join the Web's Fastest Growing Singles Community
http://www.adclick.ws/p.cfm?o=259&s=pk007

5) Start Your Private Photo Album Online!
http://www.adclick.ws/p.cfm?o=283&s=pk007

Have a Wonderful Day,
Offer Manager
PrizeMama













If you wish to leave this list please use the link below.
http://www.qves.com/trim/?ilug@linux.ie%7C17%7C114258


-- 
Irish Linux Users' Group: ilug@linux.ie
http://www.linux.ie/mailman/listinfo/ilug for (un)subscription information.
List maintainer: listmaster@linux.ie


#### Some image are multipart (with images and attachments)

In [6]:
def get_email_structure(email):
    if isinstance(email, str):
        return email
    payload = email.get_payload() # get content
    if isinstance(payload, list):
        # recursively get the types of the subemails
        return "multipart({})".format(", ".join([
            get_email_structure(sub_email) for sub_email in payload
        ]))
    return email.get_content_type()

In [7]:
from collections import Counter

def structures_counter(emails):
    structures = Counter()
    for email in emails:
        structure = get_email_structure(email) # structure of email is the key
        structures[structure] += 1
    return structures

In [8]:
structures_counter(ham_emails).most_common()

[('text/plain', 2408),
 ('multipart(text/plain, application/pgp-signature)', 66),
 ('multipart(text/plain, text/html)', 8),
 ('multipart(text/plain, text/plain)', 4),
 ('multipart(text/plain)', 3),
 ('multipart(text/plain, application/octet-stream)', 2),
 ('multipart(text/plain, text/enriched)', 1),
 ('multipart(text/plain, application/ms-tnef, text/plain)', 1),
 ('multipart(multipart(text/plain, text/plain, text/plain), application/pgp-signature)',
  1),
 ('multipart(text/plain, video/mng)', 1),
 ('multipart(text/plain, multipart(text/plain))', 1),
 ('multipart(text/plain, application/x-pkcs7-signature)', 1),
 ('multipart(text/plain, multipart(text/plain, text/plain), text/rfc822-headers)',
  1),
 ('multipart(text/plain, multipart(text/plain, text/plain), multipart(multipart(text/plain, application/x-pkcs7-signature)))',
  1),
 ('multipart(text/plain, application/x-java-applet)', 1)]

In [9]:
structures_counter(spam_emails).most_common()

[('text/plain', 218),
 ('text/html', 183),
 ('multipart(text/plain, text/html)', 45),
 ('multipart(text/html)', 20),
 ('multipart(text/plain)', 19),
 ('multipart(multipart(text/html))', 5),
 ('multipart(text/plain, image/jpeg)', 3),
 ('multipart(text/html, application/octet-stream)', 2),
 ('multipart(text/plain, application/octet-stream)', 1),
 ('multipart(text/html, text/plain)', 1),
 ('multipart(multipart(text/html), application/octet-stream, image/jpeg)', 1),
 ('multipart(multipart(text/plain, text/html), image/gif)', 1),
 ('multipart/alternative', 1)]

### Split dataset to training and test sets

In [10]:
import numpy as np
from sklearn.model_selection import train_test_split

training_data = np.array(ham_emails + spam_emails)
target_data = np.array([0] * len(ham_emails) + [1] * len(spam_emails))

X_train, X_test, y_train, y_test = train_test_split(training_data, target_data, test_size=0.2, random_state=42)

  training_data = np.array(ham_emails + spam_emails)


In [11]:
import re
from html import unescape

def html_to_plain_text(html):
    text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
    text = re.sub('<a\s.*?>', ' HYPERLINK ', text, flags=re.M | re.S | re.I)
    text = re.sub('<.*?>', '', text, flags=re.M | re.S)
    text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)
    return unescape(text)

In [12]:
def email_to_text(email):
    html = None
    for part in email.walk():
        ctype = part.get_content_type()
        if not ctype in ("text/plain", "text/html"):
            continue
        try:
            content = part.get_content()
        except: # in case of encoding issues
            content = str(part.get_payload())
        if ctype == "text/plain":
            return content
        else:
            html = content
    if html:
        return html_to_plain_text(html)

#### *experimenting with stemming*

In [13]:
try:
    from nltk import PorterStemmer
    stemmer = PorterStemmer()
    for word in ("Computations", "Computation", "Computing", "Computed", "Compute", "Compulsive"):
        print(word, "=>", stemmer.stem(word))
except ImportError:
    print("Error: stemming requires the NLTK module.")
    stemmer = None

Computations => comput
Computation => comput
Computing => comput
Computed => comput
Compute => comput
Compulsive => compuls


#### *experimenting with url extraction*

In [14]:
try:
    import urlextract
    
    extractor = urlextract.URLExtract()
    text_with_url = "example github.com. https://www.googl.com"
    print(extractor.find_urls(text_with_url))
    
except ImportError:
    print("Error: urlextract library not installed")
    extractor = None

['github.com', 'https://www.googl.com']


In [15]:
from sklearn.base import BaseEstimator, TransformerMixin

class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, strip_headers=True, lower_case=True, remove_punctuation=True,
                replace_urls=True, replace_numbers=True, stemming=True):
        self.strip_headers = strip_headers
        self.lower_case = lower_case
        self.remove_punctuation = remove_punctuation
        self.replace_urls = replace_urls
        self.replace_numbers = replace_numbers
        self.stemming = stemming
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        X_transformed = []
        for email in X:
            text = email_to_text(email) or ""
            if self.lower_case:
                text = text.lower()
            if self.remove_punctuation:
                text = re.sub(r'\W+', ' ', text, flags=re.M) # matches any non [a-zA-Z0-9] and replaces with whitespace
            if self.replace_urls and extractor is not None:
                urls = extractor.find_urls(text)
                urls.sort(key=lambda: len(url), reverse=True)
                for url in urls:
                    text = text.replace(url, 'URL')
            if self.replace_numbers:
                text = re.sub(r'\d+(?:\.\d*(?:[eE]\d+))?', 'NUMBER', text) 
            word_counter = Counter(text.split(' '))
            if self.stemming and stemmer is not None:
                stemmed_words_counter = Counter()
                for word, count in word_counter.items():
                    stemmed_word = stemmer.stem(word)
                    stemmed_words_counter[stemmed_word] += count
                word_counter = stemmed_words_counter
            X_transformed.append(word_counter)
        return np.array(X_transformed)

In [16]:
c = EmailToWordCounterTransformer().fit_transform(X_train[:3])

In [17]:
from scipy.sparse import csr_matrix # Compressed Sparse Row (CSR)

# finds the most common words out of all the emails fed it with size vocab_size
# then creates a sparse vector of how many times those common words show up in each email (row)
class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, vocab_size=1000):
        self.vocab_size = vocab_size
    
    def fit(self, X, y=None):
        total_count = Counter()
        for word_count in X:
            for word, count in word_count.items():
                total_count[word] += min(count, 10)
        most_common = total_count.most_common()[:self.vocab_size]
        self.most_common_ = most_common
        self.vocab_ = {word: index + 1 for index, (word, count) in enumerate(most_common)}
        return self
    
    def transform(self, X, y=None):
        row_ind = []
        col_ind = []
        data = []
        
        for row, word_count in enumerate(X):
            for word, count in word_count.items():
                row_ind.append(row)
                col_ind.append(self.vocab_.get(word, 0))
                data.append(count)
                
        return csr_matrix((data, (row_ind, col_ind)), shape=(len(X), self.vocab_size + 1))

In [18]:
vocab_transformer = WordCounterToVectorTransformer(vocab_size=10)
vocab_transformer.fit_transform(c).toarray()

array([[  6,   0,   0,   0,   0,   1,   0,   0,   0,   0,   0],
       [113,  11,   9,   8,   3,   2,   1,   0,   1,   3,   0],
       [ 94,   0,   1,   2,   3,   2,   4,   5,   3,   1,   4]])

**The first column represents how many words were not part of the vocabulary of the transformer**

In [19]:
vocab_transformer.vocab_

{'the': 1,
 'of': 2,
 'and': 3,
 'to': 4,
 '': 5,
 'http': 6,
 'number': 7,
 'com': 8,
 'all': 9,
 'yahoo': 10}

In [20]:
from sklearn.pipeline import Pipeline

preprocess_pipeline = Pipeline([
    ("email_to_word_counter", EmailToWordCounterTransformer()),
    ("word_counter_to_vector", WordCounterToVectorTransformer())
])

X_train_transformed = preprocess_pipeline.fit_transform(X_train)

In [21]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

log_clf = LogisticRegression(random_state=42, solver="liblinear")
cross_val_score(log_clf, X_train_transformed, y_train, cv=3, scoring="accuracy").mean()

0.9862500000000001

In [22]:
from sklearn.metrics import precision_score, recall_score

X_test_transformed = preprocess_pipeline.transform(X_test)

log_clf = LogisticRegression(solver="liblinear", random_state=42)
log_clf.fit(X_train_transformed, y_train)

y_pred = log_clf.predict(X_test_transformed)

print("Precision: {:.2f}%".format(100 * precision_score(y_test, y_pred)))
print("Recall: {:.2f}%".format(100 * recall_score(y_test, y_pred)))

Precision: 97.89%
Recall: 97.89%
