In [1]:
import numpy as np
import pandas as pd
import os

In [2]:
SPAM_URL = 'https://spamassassin.apache.org/old/publiccorpus/20050311_spam_2.tar.bz2'
EASY_HAM_URL = 'https://spamassassin.apache.org/old/publiccorpus/20030228_easy_ham_2.tar.bz2'
HARD_HAM_URL = 'https://spamassassin.apache.org/old/publiccorpus/20030228_hard_ham.tar.bz2'

In [5]:
DATASETS_DIR = 'datasets'
TAR_DIR = os.path.join(DATASETS_DIR,'tar')

In [4]:
import shutil
if os.path.isdir(DATASETS_DIR):
    shutil.rmtree(DATASETS_DIR)

In [6]:
from urllib.request import urlretrieve
import tarfile
import shutil

def download_dataset(url):
    """download and unzip data from a url into the specified path"""
    
    # create directory if it doesn't exist
    if not os.path.isdir(TAR_DIR):
        os.makedirs(TAR_DIR)
    
    filename = url.rsplit('/', 1)[-1]
    tarpath = os.path.join(TAR_DIR, filename)
    
    # download the tar file if it doesn't exist
    try:
        tarfile.open(tarpath)
    except:
        urlretrieve(url, tarpath)
    
    with tarfile.open(tarpath) as tar:
        dirname = os.path.join(DATASETS_DIR, tar.getnames()[0])
        if os.path.isdir(dirname):
            shutil.rmtree(dirname)
        tar.extractall(path=DATASETS_DIR)
        
        cmds_path = os.path.join(dirname, 'cmds')
        if os.path.isfile(cmds_path):
            os.remove(cmds_path)
    
    return dirname

In [7]:
spam_dir = download_dataset(SPAM_URL)
easy_ham_dir = download_dataset(EASY_HAM_URL)
hard_ham_dir = download_dataset(HARD_HAM_URL)

In [8]:
import numpy as np
import glob

def load_dataset(dirpath):
    """load emails from the specified directory"""
    
    files = []
    filepaths = glob.glob(dirpath + '/*')
    for path in filepaths:
        with open(path, 'rb') as f:
            byte_content = f.read()
            str_content = byte_content.decode('utf-8', errors='ignore')
            files.append(str_content)
    return files

In [9]:
spam = load_dataset(spam_dir)
easy_ham = load_dataset(easy_ham_dir)
hard_ham = load_dataset(hard_ham_dir)

In [10]:
dataset = spam+easy_ham+hard_ham

In [11]:
target = np.concatenate([np.ones(len(spam)),np.zeros(len(easy_ham)+len(hard_ham))])

In [36]:
target.shape

(3046,)

In [16]:
import email
from email import policy
from email.parser import BytesParser
from bs4 import BeautifulSoup
import re
import string

In [17]:
def extract_body(email_string):
    # Parse the email string into an EmailMessage object
    msg = email.message_from_string(email_string, policy=policy.default)
    
    # Initialize an empty string to hold the email body
    body = ""
    
    # Check if the email message is multipart
    if msg.is_multipart():
        # Iterate through the parts
        for part in msg.iter_parts():
            # If the part is text/plain or text/html, extract the content
            if part.get_content_type() == 'text/plain':
                charset = part.get_content_charset()
                try:
                    body += part.get_payload(decode=True).decode(charset, errors='ignore')
                except (TypeError, UnicodeDecodeError):
                    # Fallback to a default decoding or skip this part
                    body += part.get_payload(decode=True).decode('utf-8', errors='ignore')
            elif part.get_content_type() == 'text/html':
                # Optionally handle HTML content
                html_body = part.get_payload(decode=True)
                charset = part.get_content_charset()
                try:
                    html_body = html_body.decode(charset, errors='ignore')
                except (TypeError, UnicodeDecodeError):
                    # Fallback to a default decoding or skip this part
                    html_body = html_body.decode('utf-8', errors='ignore')
                body += html_body
    else:
        # If the email is not multipart, get the payload directly
        charset = msg.get_content_charset()
        try:
            body = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
        except (TypeError, UnicodeDecodeError):
            # Fallback to a default decoding or skip this part
            body = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
    
    return body




In [18]:
def remove_html_tags(html_content):
    # Parse the HTML content
    soup = BeautifulSoup(html_content, 'html.parser')
    # Extract and return the text without HTML tags
    return soup.get_text()

In [19]:
def clean_text(text):
    # Convert text to lowercase
    text = text.lower()
    
    # Replace new lines with spaces
    text = text.replace('\n', ' ')
    
    # Replace URLs with 'URL'
    text = re.sub(r'http[s]?://\S+', 'URL', text)
    
    # Replace numbers with 'num'
    text = re.sub(r'\b\d+\b', 'num ', text)
    
    # Remove punctuation
    text = re.sub(f"[{string.punctuation}]", '', text)
    
    # Replace multiple spaces with a single space
    text = re.sub(r'\s+', ' ', text).strip()
    
    return text


In [32]:
X = []

In [33]:
for i in range(0,len(dataset)):
    cleaned_data = clean_text(remove_html_tags(extract_body(dataset[i])))
    X.append(cleaned_data)

  soup = BeautifulSoup(html_content, 'html.parser')


In [34]:
len(X)

3046

In [39]:
cleaned_X = X

In [38]:
from sklearn.feature_extraction.text import CountVectorizer

In [40]:
from sklearn.feature_extraction.text import CountVectorizer


# Create a CountVectorizer instance
vectorizer = CountVectorizer()

# Fit and transform the data
X = vectorizer.fit_transform(cleaned_X)

# Get feature names (vocabulary)
feature_names = vectorizer.get_feature_names_out()

# Convert to dense matrix
dense_matrix = X.toarray()

# Print results
print("Vocabulary:", feature_names)
print("Dense Matrix:")
print(dense_matrix)


Vocabulary: ['000101c231b8fedc77400200a8c0michaels' '0002august02' '000above' ...
 '２７日' '３７日' '𤣨χaqupi']
Dense Matrix:
[[0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 ...
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]


In [41]:
dense_matrix[0]

array([0, 0, 0, ..., 0, 0, 0], dtype=int64)

In [42]:
dense_matrix.shape

(3046, 46871)

In [43]:
from sklearn.model_selection import train_test_split
X_train,X_test, y_train, y_test = train_test_split(dense_matrix, target, test_size = 0.2, stratify=target, random_state = 42)

In [45]:
from sklearn.linear_model import SGDClassifier
sgd = SGDClassifier()
#sgd.fit(X_train, y_train)

In [47]:
from sklearn.model_selection import StratifiedKFold

skf = StratifiedKFold(n_splits=3)
scores = cross_val_score(sgd, X_train, y_train, cv=skf)

In [48]:
scores

array([0.94581281, 0.95566502, 0.96182266])

In [49]:
from sklearn.ensemble import RandomForestClassifier
rnd = RandomForestClassifier()

In [50]:
from sklearn.model_selection import StratifiedKFold

skf = StratifiedKFold(n_splits=3)
scores_rnd = cross_val_score(rnd, X_train, y_train, cv=skf)

In [51]:
scores_rnd

array([0.96674877, 0.96305419, 0.96674877])

In [52]:
from sklearn.ensemble import AdaBoostClassifier
adb = AdaBoostClassifier()

In [53]:
from sklearn.model_selection import StratifiedKFold

skf = StratifiedKFold(n_splits=3)
scores_adb = cross_val_score(adb, X_train, y_train, cv=skf)



KeyboardInterrupt: 

In [None]:
scores_adb