<a class="anchor" id="Toc"><a>
# Table of Content
1. [Install libraries](#install_libraries)
1. [Import libraries](#import_libraries)
1. [Load and clean dataset](#load_and_clean_dataset)
1. [Split data, Vectorize(TF-IDF) and Binarize](#split_vectorize_&_binarize)
1. [Implement NSA](#evaluation)
1. [Validation](#validation)
1. [Evaluate](#optimization)


<a class="anchor" id="install_libraries"></a>
# Install libraries

In [78]:
# install requirements from requirements.txt
%pip install -r ../requirements.txt

Note: you may need to restart the kernel to use updated packages.


<a class="anchor" id="import_libraries"></a>
# Import libraries

In [79]:
# Basic data handling
import pandas as pd
import numpy as np
import re

# Machine learning & text processing
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    classification_report, confusion_matrix,
    precision_recall_curve, f1_score, auc
)

# Sparse matrix operations
import scipy.sparse as sp

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Utility
from dataclasses import dataclass
import os





<a class="anchor" id="load_and_clean_dataset"></a>
# Load and clean dataset

In [80]:
# Load dataset
df = pd.read_csv('../data/SMSSpamCollection', sep='\t', names=['label', 'message'])
#df.head()

# data cleaning function
'''we will remove html tags, urls, punctuation, numbers, emails, and extra whitespace.
but we will keep the option to not remove urls and emails if needed in future because they might be relevant for spam detection.'''

def clean_text(text: str, replace_url=True, replace_email=True):
    if not isinstance(text, str):
        return ""

    text = text.lower()

    # replace URLs
    if replace_url:
        text = re.sub(r'http\S+|www\S+', ' <URL> ', text)

    # replace emails
    if replace_email:
        text = re.sub(r'\S+@\S+', ' <EMAIL> ', text)

    # remove html tags (very rare in SMS)
    text = re.sub(r'<[^>]+>', ' ', text)

    # keep numbers, letters, placeholders
    text = re.sub(r'[^a-z0-9<> ]+', ' ', text)

    # collapse whitespace
    text = re.sub(r'\s+', ' ', text).strip()

    return text

# apply cleaning

df['cleaned_message'] = df['message'].apply(clean_text)

df['cleaned_message_with_url_email'] = df['message'].apply(lambda x: clean_text(x, replace_url=False, replace_email=False))
# encode labels
df['label'] = df['label'].map({'ham': 0, 'spam': 1})

# see the cleaned data and filter rows where cleaning made a difference to check the effectiveness of cleaning

mask = (
    (df['message'] != df['cleaned_message']) &
    (df['message'] != df['cleaned_message_with_url_email']) &
    (df['cleaned_message'] != df['cleaned_message_with_url_email'])
)

df_filtered = df[mask]
print(df_filtered.head(5))




     label                                            message  \
12       1  URGENT! You have won a 1 week FREE membership ...   
15       1  XXXMobileMovieClub: To use your credit, click ...   
135      1  Want 2 get laid tonight? Want real Dogging loc...   
136      0             I only haf msn. It's yijue@hotmail.com   
164      1  -PLS STOP bootydelious (32/F) is inviting you ...   

                                       cleaned_message  \
12   urgent you have won a 1 week free membership i...   
15   xxxmobilemovieclub to use your credit click th...   
135  want 2 get laid tonight want real dogging loca...   
136                                i only haf msn it s   
164  pls stop bootydelious 32 f is inviting you to ...   

                        cleaned_message_with_url_email  
12   urgent you have won a 1 week free membership i...  
15   xxxmobilemovieclub to use your credit click th...  
135  want 2 get laid tonight want real dogging loca...  
136              i only haf msn 

We checked if the cleaning mechanism is working and it is working as we want in message 136 the email is removed in cleaned_messages while in cleaned_message_with_url it is preserved and sanitized and that is exactly how we want this column to behave.

<a class="anchor" id="split_vectorize_&_binarize"></a>
# Split data, Vectorize (TF-IDF) and Binarize

We want to split the data into training(60% of ham) and the rest of ham and spam into half for validation and half for testing.

In [81]:
# Separete ham and spam
ham = df[df['label'] == 0]['cleaned_message']
spam = df[df['label'] == 1]['cleaned_message']
# those with url and email preserved
ham_with_url_email = df[df['label'] == 0]['cleaned_message_with_url_email']
spam_with_url_email = df[df['label'] == 1]['cleaned_message_with_url_email']

print(f"Total ham messages: {len(ham)}")
print(f"Total spam messages: {len(spam)}")

# split ham
def split_ham(ham: pd.Series, train_size=0.6, test_size= 0.5):
    ham_train, ham_temp = train_test_split(ham, train_size=train_size, random_state=42)
    ham_val, ham_test = train_test_split(ham_temp, test_size=test_size, random_state=42)
    return ham_train, ham_val, ham_test

ham_train, ham_val, ham_test = split_ham(ham)
ham_train_with_url_email, ham_val_with_url_email, ham_test_with_url_email = split_ham(ham_with_url_email)

# split spam
def split_spam(spam: pd.Series, test_size=0.5):
    spam_val, spam_test = train_test_split(spam, test_size=test_size, random_state=42)
    return spam_val, spam_test
spam_val, spam_test = split_spam(spam)
spam_val_with_url_email, spam_test_with_url_email = split_spam(spam_with_url_email)

print(f"Ham train: {len(ham_train)}, Ham val: {len(ham_val)}, Ham test: {len(ham_test)}")
print(f"Spam val: {len(spam_val)}, Spam test: {len(spam_test)}")

Total ham messages: 4825
Total spam messages: 747
Ham train: 2895, Ham val: 965, Ham test: 965
Spam val: 373, Spam test: 374


To vectorize the message we chose to use TfidfVectorizer which is a scikit-learn tool that turns a pile of text into numbers our model will be able to use.

In [95]:
# fit vectorizer only on ham_train using the TF-IDF vectorizer 
'''we chose decide to change som of the parameters of the TfidfVectorizer to better suit our SMS spam detection task:
- ngram_range=(1, 2): This allows the model to capture both unigrams and bigrams, which can be useful for understanding context in short messages.
- max_df=0.95: This setting ignores terms that appear in more than 95% of the documents, helping to eliminate very common words that may not be informative.
- min_df=2: This setting ignores terms that appear in less than 2 documents, which helps to reduce noise from very rare words that may not contribute significantly to the model's performance.
- sublinear_tf=True: This applies sublinear term frequency scaling, which can help to moderate the impact of very frequent terms.
'''
# Vectorizer for normal cleaned messages
vectorizer_plain = TfidfVectorizer(
    ngram_range=(1, 2),
    max_df=0.95,
    min_df=2,
    sublinear_tf=True
)

Xh_train_plain = vectorizer_plain.fit_transform(ham_train)
Xh_val_plain   = vectorizer_plain.transform(ham_val)
Xh_test_plain  = vectorizer_plain.transform(ham_test)
Xs_val_plain   = vectorizer_plain.transform(spam_val)
Xs_test_plain  = vectorizer_plain.transform(spam_test)

print("Vocabulary (plain):", len(vectorizer_plain.vocabulary_))
print("Train matrix shape (plain):", Xh_train_plain.shape)

# Vectorizer for messages with URL/email preserved
vectorizer_url = TfidfVectorizer(
    ngram_range=(1, 2),
    max_df=0.95,
    min_df=2,
    sublinear_tf=True
)

Xh_train_url = vectorizer_url.fit_transform(ham_train_with_url_email)
Xh_val_url   = vectorizer_url.transform(ham_val_with_url_email)
Xh_test_url  = vectorizer_url.transform(ham_test_with_url_email)
Xs_val_url   = vectorizer_url.transform(spam_val_with_url_email)
Xs_test_url  = vectorizer_url.transform(spam_test_with_url_email)

print("Vocabulary (url_email):", len(vectorizer_url.vocabulary_))
print("Train matrix shape (url_email):", Xh_train_url.shape)


Vocabulary (plain): 7012
Train matrix shape (plain): (2895, 7012)
Vocabulary (url_email): 7012
Train matrix shape (url_email): (2895, 7012)


The vocabulary is identical so we check if there is enough difference between the one will url and they one that dont. Other than that, 7012 feature spaces is quit good . 

In [98]:
df['has_url'] = df['message'].str.contains(r'http|www', case=False, na=False)
df['has_email'] = df['message'].str.contains(r'@', na=False)

print(df['has_url'].sum(), "messages with URLs")
print(df['has_email'].sum(), "messages with emails")

diff = df[df['cleaned_message'] != df['cleaned_message_with_url_email']]
print(len(diff), "messages differ")



110 messages with URLs
36 messages with emails
126 messages differ


it makes sense now since there is difference only in 126 messages only the is no big difference but we are going with the one with url just to make sure we take advantage of the presence of emails and urls.
Now we will binarize these because detectors operate on binary patterns.

In [103]:
# binarization function
def binarize_matrix(X: sp.csr_matrix, threshold=0.2) -> sp.csr_matrix:
    '''convert sparse matrix to binary based on threshold
    we chose 0.2 but we will experiment with different thresholds later '''
    X_bin = X.copy()
    X_bin.data = np.where(X_bin.data > threshold, 1, 0)
    X_bin.eliminate_zeros()
    return X_bin

# apply binarization
Xh_train_url_bin = binarize_matrix(Xh_train_url)
Xh_val_url_bin   = binarize_matrix(Xh_val_url)
Xh_test_url_bin  = binarize_matrix(Xh_test_url)
Xs_val_url_bin   = binarize_matrix(Xs_val_url)
Xs_test_url_bin  = binarize_matrix(Xs_test_url)

# sanity check
print("Binarized train matrix shape (url_email):", Xh_train_url_bin.shape)

Binarized train matrix shape (url_email): (2895, 7012)


the shapes match our TF-IDF shapes so that is good.

## Train and predict NSA

In [83]:
@dataclass
class Detector:
    idx: np.ndarray   # active bits indices
    radius: int       # required overlap threshold
    
class VDetectorNSA_Binary:
    """
    Binary Negative Selection Algorithm using r-overlap matching rule.
    Detectors are binary sparse vectors that should NOT match any self sample
    with overlap >= radius.
    """
    def __init__(self,
                 k: int,                 # active bits per detector
                 r_min: int, r_max: int, # min/max overlap radius
                 max_detectors: int,     # how many detectors to accept
                 max_tries: int,         # max attempts to generate
                 batch_size: int,        # batch size for overlap calc
                 sampling: str = "antiprofile",
                 random_state: int = 42):
        self.k = k
        self.r_min = r_min
        self.r_max = r_max
        self.max_detectors = max_detectors
        self.max_tries = max_tries
        self.batch_size = batch_size
        self.sampling = sampling
        self.random_state = random_state

        self.dim = None
        self.detectors: list[Detector] = []
        self.p_detect = None
        np.random.seed(self.random_state)

    # --------------------------------------------------------------
    # internal helpers
    # --------------------------------------------------------------
    def _build_antiprofile_probs(self, X_ham_train: sp.csr_matrix) -> None:
        """Build antiprofile sampling probabilities from self data."""
        assert sp.issparse(X_ham_train)
        p_ham = (X_ham_train.sum(axis=0) / X_ham_train.shape[0]).A1
        p = np.clip(1.0 - p_ham, 1e-8, 1.0)
        self.p_detect = p / p.sum()

    def _sample_indices(self) -> np.ndarray:
        if self.sampling == "antiprofile" and self.p_detect is not None:
            return np.sort(np.random.choice(self.dim, size=min(self.k, self.dim),
                                            replace=False, p=self.p_detect))
        return np.sort(np.random.choice(self.dim, size=min(self.k, self.dim),
                                        replace=False))

    @staticmethod
    def _vec_from_idx(idx: np.ndarray, dim: int) -> sp.csr_matrix:
        """Construct a 1×dim sparse row vector with 1s at idx."""
        data = np.ones(len(idx), dtype=np.uint8)
        rows = np.zeros(len(idx), dtype=np.int32)
        return sp.csr_matrix((data, (rows, idx)), shape=(1, dim))

    @staticmethod
    def _max_overlap(X: sp.csr_matrix, detector_vec: sp.csr_matrix, batch_size: int) -> int:
        """Return the maximum overlap between detector and any row in X."""
        best = 0
        n = X.shape[0]
        for s in range(0, n, batch_size):
            e = min(s + batch_size, n)
            overlap = (X[s:e] @ detector_vec.T).A.ravel()
            if overlap.size:
                best = max(best, int(overlap.max()))
        return best

    # --------------------------------------------------------------
    # main API
    # --------------------------------------------------------------
    def fit(self, X_ham_train: sp.csr_matrix):
        assert sp.issparse(X_ham_train)
        self.dim = X_ham_train.shape[1]
        if self.sampling == "antiprofile":
            self._build_antiprofile_probs(X_ham_train)

        accepted, tries = 0, 0
        while accepted < self.max_detectors and tries < self.max_tries:
            tries += 1
            idx = self._sample_indices()
            det_vec = self._vec_from_idx(idx, self.dim)

            # choose a random radius within the allowed range
            r = np.random.randint(self.r_min, self.r_max + 1)

            # reject if detector matches any ham within radius
            m_o = self._max_overlap(X_ham_train, det_vec, self.batch_size)
            if m_o >= r:      # too similar to self, reject
                continue

            # accept detector
            self.detectors.append(Detector(idx=idx, radius=r))
            accepted += 1

        print(f"Generated {accepted} detectors after {tries} tries.")
        return self

    def predict(self, X_bin: sp.csr_matrix, k_hits: int = 1, return_score: bool = False):
        """Predict non-self (1=spam) vs self (0=ham)."""
        assert self.detectors, "Model not fitted."
        n = X_bin.shape[0]
        predictions = np.zeros(n, dtype=np.uint8)

        # build detector matrix
        rows, cols, data, r_list = [], [], [], []
        for i, det in enumerate(self.detectors):
            cols.extend(det.idx.tolist())
            rows.extend([i] * len(det.idx))
            data.extend([1] * len(det.idx))
            r_list.append(det.radius)
        det_matrix = sp.csr_matrix((data, (rows, cols)), shape=(len(self.detectors), self.dim))
        r_array = np.array(r_list)

        scores = np.zeros(n, dtype=np.int32) if return_score else None
        for s in range(0, n, self.batch_size):
            e = min(s + self.batch_size, n)
            overlaps = (X_bin[s:e] @ det_matrix.T).A
            hits = (overlaps >= r_array)
            if return_score:
                scores[s:e] = hits.sum(axis=1)
            if k_hits == 1:
                predictions[s:e] = hits.any(axis=1).astype(np.uint8)
            else:
                predictions[s:e] = (hits.sum(axis=1) >= k_hits).astype(np.uint8)
        return predictions if not return_score else (predictions, scores)

## Evaluate

In [84]:
# function to binarize sparse matrix
def binarize_sparse_matrix(X: sp.csr_matrix, tau: float= 0.05) -> sp.csr_matrix:
    '''
    Binarize sparse matrix X using threshold tau.
    Values greater than tau are set to 1, others to 0.
    '''
    assert sp.issparse(X), "Input matrix must be a sparse matrix."
    X_bin = X.copy()
    X_bin.data = np.where(X_bin.data > tau, 1, 0)
    return X_bin

In [85]:
Xh_train_bin = binarize_sparse_matrix(Xh_train_tfidf, tau=0.02)
Xh_val_bin = binarize_sparse_matrix(Xh_val_tfidf, tau=0.02)
Xh_test_bin = binarize_sparse_matrix(Xh_test_tfidf, tau=0.02)
Xs_val_bin = binarize_sparse_matrix(Xs_val_tfidf, tau=0.02)
Xs_test_bin = binarize_sparse_matrix(Xs_test_tfidf, tau=0.02)

In [86]:
nsa = VDetectorNSA_Binary(
    k=20,
    r_min=1, r_max=3,
    max_detectors=2000,
    max_tries=50000,
    sampling="antiprofile",
    random_state=42,
    batch_size=1000
).fit(Xh_train_bin)


Generated 2000 detectors after 5783 tries.


In [87]:
def nsa_diagnostics(nsa, Xh_val_bin, Xs_val_bin):
    print(f"#detectors: {len(nsa.detectors)}  | dim: {nsa.dim}")

    # per-detector stats on validation
    rows, cols, data, r_list = [], [], [], []
    for i, det in enumerate(nsa.detectors):
        cols.extend(det.idx.tolist())
        rows.extend([i]*len(det.idx))
        data.extend([1]*len(det.idx))
        r_list.append(det.radius)
    D = sp.csr_matrix((data, (rows, cols)), shape=(len(nsa.detectors), nsa.dim))
    r = np.array(r_list)

    # how many ham/spam each detector hits
    ham_hits = (Xh_val_bin @ D.T).A >= r
    spam_hits = (Xs_val_bin @ D.T).A >= r
    print(f"Mean ham-hit rate per detector: {ham_hits.mean():.6f}")
    print(f"Mean spam-hit rate per detector: {spam_hits.mean():.6f}")

    # coverage: fraction of samples hit by at least one detector
    ham_cov = ham_hits.any(axis=1).mean()
    spam_cov = spam_hits.any(axis=1).mean()
    print(f"HAM coverage(any hit):  {ham_cov:.4f}")
    print(f"SPAM coverage(any hit): {spam_cov:.4f}")
nsa_diagnostics(nsa, Xh_val_bin, Xs_val_bin)

#detectors: 2000  | dim: 7012
Mean ham-hit rate per detector: 0.000021
Mean spam-hit rate per detector: 0.000052
HAM coverage(any hit):  0.0394
SPAM coverage(any hit): 0.0965


In [88]:
'''-----Validation phase----- 
we will tune 3 parameters here: k_hits, tau, k and r'''
X_eval = sp.vstack([Xh_val_bin, Xs_val_bin])
y_val = np.hstack([
    np.zeros(Xh_val_bin.shape[0], dtype=np.uint8),
    np.ones(Xs_val_bin.shape[0], dtype=np.uint8)
])


In [89]:
best_f1, best_hits = 0, None
for hits in [1, 2, 3, 5, 7, 10]:
    y_pred = nsa.predict(X_val, k_hits=hits)
    report = classification_report(y_val, y_pred, output_dict=True)
    f1 = report['1']['f1-score']
    print(f"k_hits={hits}: F1-score={f1:.4f}")
    if f1 > best_f1:
        best_f1 = f1
        best_hits = hits
print(f"Best k_hits: {best_hits} with F1-score: {best_f1:.4f}")

ValueError: dimension mismatch

### Visualization of Evaluation

In [None]:
# === Predict ===
y_pred, scores = nsa.predict(X_eval, k_hits=best_hits, return_score=True)

# === Confusion Matrix ===
cm = confusion_matrix(y_val, y_pred)
cm_df = pd.DataFrame(cm, index=["True Ham", "True Spam"],
                     columns=["Pred Ham", "Pred Spam"])

plt.figure(figsize=(6,5))
sns.heatmap(cm_df, annot=True, fmt='d', cmap='Blues')
plt.title("Confusion Matrix (k_hits=1)")
plt.ylabel("Actual")
plt.xlabel("Predicted")
plt.tight_layout()
plt.show()

# === Classification Report ===
print("\nClassification Report:\n")
print(classification_report(y_val, y_pred, target_names=["Ham", "Spam"]))

# === Precision–Recall Curve ===
prec, rec, _ = precision_recall_curve(y_val, scores)
pr_auc = auc(rec, prec)

plt.figure(figsize=(7,5))
plt.plot(rec, prec, color="darkorange", linewidth=2,
         label=f"PR Curve (AUC = {pr_auc:.3f})")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision–Recall Curve (Spam as Positive)")
plt.legend(loc="lower left")
plt.grid(True)
plt.tight_layout()
plt.show()

print(f"\nPrecision–Recall AUC: {pr_auc:.3f}")

AssertionError: Model not fitted.