<a href="https://colab.research.google.com/github/mohanseetha/machine-learning/blob/main/spam_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tarfile
from pathlib import Path
import urllib.request

def fetch_spam_data():
    spam_root = "https://spamassassin.apache.org/old/publiccorpus/"
    ham_url = spam_root + "20030228_hard_ham.tar.bz2"
    spam_url = spam_root + "20030228_spam.tar.bz2"
    spam_path = Path() / "datasets" / "spam"
    spam_path.mkdir(parents=True, exist_ok=True)
    for dir_name, tar_name, url in (("easy_ham", "ham", ham_url), ("spam", "spam", spam_url)):
        if not (spam_path / dir_name).is_dir():
            path = (spam_path / tar_name).with_suffix(".tar.bz2")
            print("Downloading", path)
            urllib.request.urlretrieve(url, path)
            tar_bz2_file = tarfile.open(path)
            tar_bz2_file.extractall(path=spam_path)
            tar_bz2_file.close()
    return [spam_path / dir_name for dir_name in ("easy_ham", "spam")]

In [2]:
ham_dir, spam_dir = fetch_spam_data()

In [3]:
ham_filenames = [f for f in sorted(ham_dir.iterdir()) if len(f.name) > 20]
spam_filenames = [f for f in sorted(spam_dir.iterdir()) if len(f.name) > 20]

In [4]:
len(ham_filenames)

2500

In [5]:
len(spam_filenames)

500

In [6]:
import email
import email.policy

def load_email(filepath):
    with open(filepath, "rb") as f:
        return email.parser.BytesParser(policy=email.policy.default).parse(f)

In [7]:
ham_emails = [load_email(filepath) for filepath in ham_filenames]
spam_emails = [load_email(filepath) for filepath in spam_filenames]

In [8]:
print(ham_emails[-1].get_content().strip())

Hi,

I think you need to give us a little more detailed information.

On Wed, 04 Dec 2002, Gianni Ponzi wrote:

> I have a prob when trying to install Linux (tried RedHat, Suse) on my
> laptop. 

You get _exactly_ the same problem with Suse and RedHat, is that right?
What versions of these have you used?

> I can start the install but after about 2min, the whole pc just
> dies.

As in freezes, reboots?  Do you get any errors?

> I know it's not a Linux prob and here is what I have encountered:
> 
> I had the same problem when installing Win on it and eventually sorted it
> out by disabling the infrared port. 

Did you disable it in the BIOS or in windows?

> I'm guessing this might be same prob although I'm not sure. I am very new
> to Linux so it's not that easy for me to work it out. I did manage to
> follow the setup procedure at one stage (using images on disks) and it
> cuts out either as it's trying to verify what CD-Rom I have or just after
> (hence my suspicion of the infrared 

In [9]:
print(spam_emails[1].get_content().strip())

1) Fight The Risk of Cancer!
http://www.adclick.ws/p.cfm?o=315&s=pk007

2) Slim Down - Guaranteed to lose 10-12 lbs in 30 days
http://www.adclick.ws/p.cfm?o=249&s=pk007

3) Get the Child Support You Deserve - Free Legal Advice
http://www.adclick.ws/p.cfm?o=245&s=pk002

4) Join the Web's Fastest Growing Singles Community
http://www.adclick.ws/p.cfm?o=259&s=pk007

5) Start Your Private Photo Album Online!
http://www.adclick.ws/p.cfm?o=283&s=pk007

Have a Wonderful Day,
Offer Manager
PrizeMama













If you wish to leave this list please use the link below.
http://www.qves.com/trim/?ilug@linux.ie%7C17%7C114258


-- 
Irish Linux Users' Group: ilug@linux.ie
http://www.linux.ie/mailman/listinfo/ilug for (un)subscription information.
List maintainer: listmaster@linux.ie


In [10]:
def get_email_structure(email):
    if isinstance(email, str):
        return email
    payload = email.get_payload()
    if isinstance(payload, list):
        multipart = ", ".join([get_email_structure(sub_email) for sub_email in payload])
        return f"Multipart: {multipart}"
    return email.get_content_type()

In [11]:
from collections import Counter

def structures_counter(emails):
    structures = Counter()
    for email in emails:
        structure = get_email_structure(email)
        structures[structure] += 1
    return structures

In [12]:
structures_counter(ham_emails).most_common()

[('text/plain', 2408),
 ('Multipart: text/plain, application/pgp-signature', 66),
 ('Multipart: text/plain, text/html', 8),
 ('Multipart: text/plain, text/plain', 4),
 ('Multipart: text/plain', 3),
 ('Multipart: text/plain, application/octet-stream', 2),
 ('Multipart: text/plain, text/enriched', 1),
 ('Multipart: text/plain, application/ms-tnef, text/plain', 1),
 ('Multipart: Multipart: text/plain, text/plain, text/plain, application/pgp-signature',
  1),
 ('Multipart: text/plain, video/mng', 1),
 ('Multipart: text/plain, Multipart: text/plain', 1),
 ('Multipart: text/plain, application/x-pkcs7-signature', 1),
 ('Multipart: text/plain, Multipart: text/plain, text/plain, text/rfc822-headers',
  1),
 ('Multipart: text/plain, Multipart: text/plain, text/plain, Multipart: Multipart: text/plain, application/x-pkcs7-signature',
  1),
 ('Multipart: text/plain, application/x-java-applet', 1)]

In [13]:
structures_counter(spam_emails).most_common()

[('text/plain', 218),
 ('text/html', 183),
 ('Multipart: text/plain, text/html', 45),
 ('Multipart: text/html', 20),
 ('Multipart: text/plain', 19),
 ('Multipart: Multipart: text/html', 5),
 ('Multipart: text/plain, image/jpeg', 3),
 ('Multipart: text/html, application/octet-stream', 2),
 ('Multipart: text/plain, application/octet-stream', 1),
 ('Multipart: text/html, text/plain', 1),
 ('Multipart: Multipart: text/html, application/octet-stream, image/jpeg', 1),
 ('Multipart: Multipart: text/plain, text/html, image/gif', 1),
 ('multipart/alternative', 1)]

In [14]:
for header, value in spam_emails[0].items():
    print(header, ":", value)

Return-Path : <12a1mailbot1@web.de>
Delivered-To : zzzz@localhost.spamassassin.taint.org
Received : from localhost (localhost [127.0.0.1])	by phobos.labs.spamassassin.taint.org (Postfix) with ESMTP id 136B943C32	for <zzzz@localhost>; Thu, 22 Aug 2002 08:17:21 -0400 (EDT)
Received : from mail.webnote.net [193.120.211.219]	by localhost with POP3 (fetchmail-5.9.0)	for zzzz@localhost (single-drop); Thu, 22 Aug 2002 13:17:21 +0100 (IST)
Received : from dd_it7 ([210.97.77.167])	by webnote.net (8.9.3/8.9.3) with ESMTP id NAA04623	for <zzzz@spamassassin.taint.org>; Thu, 22 Aug 2002 13:09:41 +0100
From : 12a1mailbot1@web.de
Received : from r-smtp.korea.com - 203.122.2.197 by dd_it7  with Microsoft SMTPSVC(5.5.1775.675.6);	 Sat, 24 Aug 2002 09:42:10 +0900
To : dcek1a1@netsgo.com
Subject : Life Insurance - Why Pay More?
Date : Wed, 21 Aug 2002 20:31:57 -1600
MIME-Version : 1.0
Message-ID : <0103c1042001882DD_IT7@dd_it7>
Content-Type : text/html; charset="iso-8859-1"
Content-Transfer-Encoding : qu

In [15]:
spam_emails[0]["subject"]

'Life Insurance - Why Pay More?'

In [16]:
import numpy as np
from sklearn.model_selection import train_test_split

X = np.array(ham_emails + spam_emails, dtype=object)
y = np.array([0] * len(ham_emails) + [1] * len(spam_emails))

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [17]:
import re
from html import unescape

def html_to_plain_text(html):
    text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
    text = re.sub('<a\s.*?>', ' HYPERLINK ', text, flags=re.M | re.S | re.I)
    text = re.sub('<.*?>', '', text, flags=re.M | re.S)
    text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)
    return unescape(text)

In [18]:
html_spam_mails = [email for email in X_train[y_train==1] if get_email_structure(email) == "text/html"]
sample_html_spam = html_spam_mails[0]
print(sample_html_spam.get_content().strip()[:1000], "...")

<HTML>
<BODY BGCOLOR="#ffffff">
<P>
<<HTML>
<TABLE WIDTH=400 BORDER=0 CELLPADDING=0 CELLSPACING=0>
  <TR>
    <TD ALIGN="LEFT" VALIGN="TOP"><FONT FACE="Tahoma, Arial, Verdana" SIZE=2></FONT>
      <H2>
	<FONT COLOR="#FF0000">GET HIGH...LEGALLY!</FONT>
      </H2>
      <P>
      <B>IT REALLY WORKS!<BR>
      PASSES ALL DRUG TESTS!<BR>
      EXTREMELY POTENT!</B>
      <P>
      <A HREF="http://www.greenmatrix.net/herb/index.html"><B>CLICK HERE for more
      info on Salvia Divinorum</B></A>
      <P>
      <B> <A HREF="http://www.greenmatrix.net/herb/5x.html">CLICK HERE for SALVIA
      5X EXTRACT!</A> <BR>
      <P>
      <B> <A HREF="http://www.greenmatrix.net/herb/13x.html">CLICK HERE for SALVIA
      13X</A>. The most POTENT, LEGAL, SMOKABLE herb on the planet! 13 times the
      power of Salvia Divinorum!<BR>
      <P>
      <P>
    ...


In [19]:
print(html_to_plain_text(sample_html_spam.get_content().strip())[:1000], "...")


	GET HIGH...LEGALLY!
      IT REALLY WORKS!
      PASSES ALL DRUG TESTS!
      EXTREMELY POTENT!
       HYPERLINK CLICK HERE for more
      info on Salvia Divinorum
        HYPERLINK CLICK HERE for SALVIA
      5X EXTRACT!
        HYPERLINK CLICK HERE for SALVIA
      13X. The most POTENT, LEGAL, SMOKABLE herb on the planet! 13 times the
      power of Salvia Divinorum!
      Removal Information:
      We are strongly against sending unsolicited emails to those who do not wish
      to receive our special mailings. You have opted in to one or more of our
      affiliate sites requesting to be notified of any special offers we may run
      from time to time. This is NOT unsolicited email. If you do not wish to receive
      further mailings, please
       HYPERLINK click here to be removed
      from the list. Please accept our apologies if you have been sent this
      email in error. We honor all removal requests within  ...


In [20]:
def email_to_text(email):
    html = None
    for part in email.walk():
        ctype = part.get_content_type()
        if not ctype in ("text/plain", "text/html"):
            continue
        try:
            content = part.get_content()
        except:
            content = str(part.get_payload())
        if ctype == "text/plain":
            return content
        else:
            html = content
    if html:
        return html_to_plain_text(html)

In [21]:
print(email_to_text(sample_html_spam)[:100], "...")


	GET HIGH...LEGALLY!
      IT REALLY WORKS!
      PASSES ALL DRUG TESTS!
      EXTREMELY POTENT!
   ...


In [22]:
import nltk

stemmer = nltk.PorterStemmer()
for word in ("Computations", "Computation", "Computing", "Computed", "Compute", "Compulsive"):
    print(word, "=>", stemmer.stem(word))

Computations => comput
Computation => comput
Computing => comput
Computed => comput
Compute => comput
Compulsive => compuls


In [23]:
%pip install -q -U urlextract

In [24]:
import urlextract

url_extractor = urlextract.URLExtract()
some_text = "for tesing github.com and also https://youtu.be/random"
url_extractor.find_urls(some_text)

['github.com', 'https://youtu.be/random']

In [25]:
from sklearn.base import BaseEstimator, TransformerMixin

class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, strip_headers=True, lower_case=True,
                 remove_punctuation=True, replace_urls=True,
                 replace_numbers=True, stemming=True):
        self.strip_headers = strip_headers
        self.lower_case = lower_case
        self.remove_punctuation = remove_punctuation
        self.replace_urls = replace_urls
        self.replace_numbers = replace_numbers
        self.stemming = stemming

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        X_transformed = []
        for email in X:
            text = email_to_text(email) or ""
            if self.lower_case:
                text = text.lower()
            if self.replace_urls and url_extractor is not None:
                urls = list(set(url_extractor.find_urls(text)))
                urls.sort(key=lambda url: len(url), reverse=True)
                for url in urls:
                    text = text.replace(url, " URL ")
            if self.replace_numbers:
                text = re.sub(r'\d+(?:\.\d*)?(?:[eE][+-]?\d+)?', 'NUMBER', text)
            if self.remove_punctuation:
                text = re.sub(r'\W+', ' ', text, flags=re.M)
            word_counts = Counter(text.split())
            if self.stemming and stemmer is not None:
                stemmed_word_counts = Counter()
                for word, count in word_counts.items():
                    stemmed_word = stemmer.stem(word)
                    stemmed_word_counts[stemmed_word] += count
                word_counts = stemmed_word_counts
            X_transformed.append(word_counts)
        return np.array(X_transformed)

In [26]:
X_few = X_train[:3]
X_few_wordcounts = EmailToWordCounterTransformer().fit_transform(X_few)
X_few_wordcounts

array([Counter({'chuck': 1, 'murcko': 1, 'wrote': 1, 'stuff': 1, 'yawn': 1, 'r': 1}),
       Counter({'the': 11, 'of': 9, 'and': 8, 'all': 3, 'christian': 3, 'to': 3, 'by': 3, 'jefferson': 2, 'i': 2, 'have': 2, 'superstit': 2, 'one': 2, 'on': 2, 'been': 2, 'ha': 2, 'half': 2, 'rogueri': 2, 'teach': 2, 'jesu': 2, 'some': 1, 'interest': 1, 'quot': 1, 'url': 1, 'thoma': 1, 'examin': 1, 'known': 1, 'word': 1, 'do': 1, 'not': 1, 'find': 1, 'in': 1, 'our': 1, 'particular': 1, 'redeem': 1, 'featur': 1, 'they': 1, 'are': 1, 'alik': 1, 'found': 1, 'fabl': 1, 'mytholog': 1, 'million': 1, 'innoc': 1, 'men': 1, 'women': 1, 'children': 1, 'sinc': 1, 'introduct': 1, 'burnt': 1, 'tortur': 1, 'fine': 1, 'imprison': 1, 'what': 1, 'effect': 1, 'thi': 1, 'coercion': 1, 'make': 1, 'world': 1, 'fool': 1, 'other': 1, 'hypocrit': 1, 'support': 1, 'error': 1, 'over': 1, 'earth': 1, 'six': 1, 'histor': 1, 'american': 1, 'john': 1, 'e': 1, 'remsburg': 1, 'letter': 1, 'william': 1, 'short': 1, 'again': 1, 'becom

In [27]:
from scipy.sparse import csr_matrix

class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, vocabulary_size=1000):
        self.vocabulary_size = vocabulary_size
    def fit(self, X, y=None):
        total_count = Counter()
        for word_count in X:
            for word, count in word_count.items():
                total_count[word] += min(count, 10)
        most_common = total_count.most_common()[:self.vocabulary_size]
        self.vocabulary_ = {word: index + 1
                            for index, (word, count) in enumerate(most_common)}
        return self
    def transform(self, X, y=None):
        rows = []
        cols = []
        data = []
        for row, word_count in enumerate(X):
            for word, count in word_count.items():
                rows.append(row)
                cols.append(self.vocabulary_.get(word, 0))
                data.append(count)
        return csr_matrix((data, (rows, cols)),
                          shape=(len(X), self.vocabulary_size + 1))

In [28]:
vocab_transformer = WordCounterToVectorTransformer(vocabulary_size=10)
X_few_vectors = vocab_transformer.fit_transform(X_few_wordcounts)
X_few_vectors.toarray()

array([[ 6,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0],
       [99, 11,  9,  8,  3,  1,  3,  1,  3,  2,  3],
       [67,  0,  1,  2,  3,  4,  1,  2,  0,  1,  0]])

In [29]:
vocab_transformer.vocabulary_

{'the': 1,
 'of': 2,
 'and': 3,
 'to': 4,
 'url': 5,
 'all': 6,
 'in': 7,
 'christian': 8,
 'on': 9,
 'by': 10}

In [30]:
from sklearn.pipeline import Pipeline

preprocess_pipeline = Pipeline([
    ("email_to_wordcount", EmailToWordCounterTransformer()),
    ("wordcount_to_vector", WordCounterToVectorTransformer()),
])

X_train_transformed = preprocess_pipeline.fit_transform(X_train)

In [31]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

log_clf = LogisticRegression(max_iter=1000, random_state=42)
score = cross_val_score(log_clf, X_train_transformed, y_train, cv=3)
score.mean()

np.float64(0.985)

In [32]:
from sklearn.metrics import precision_score, recall_score

X_test_transformed = preprocess_pipeline.transform(X_test)

log_clf.fit(X_train_transformed, y_train)

y_pred = log_clf.predict(X_test_transformed)

print(f"Precision: {precision_score(y_test, y_pred):.2%}")
print(f"Recall: {recall_score(y_test, y_pred):.2%}")

Precision: 96.88%
Recall: 97.89%
