# Downloading the training data set

In [None]:
!pip install contractions



In [None]:
from sklearn.datasets import fetch_20newsgroups
import numpy as np


In [None]:

# Group 1: Technology & Science
group_1 = ['comp.graphics', 'comp.os.ms-windows.misc', 'comp.sys.ibm.pc.hardware', 'comp.sys.mac.hardware',
           'comp.windows.x', 'sci.crypt', 'sci.electronics', 'sci.med', 'sci.space']

# Group 2: Sports, Politics, & Miscellaneous
group_2 = ['rec.autos', 'rec.motorcycles', 'rec.sport.baseball', 'rec.sport.hockey',
           'talk.politics.misc', 'talk.politics.guns', 'talk.politics.mideast', 'talk.religion.misc',
           'misc.forsale', 'alt.atheism', 'soc.religion.christian']

# Fetch the 20 Newsgroups training dataset
newsgroups_train = fetch_20newsgroups(subset='train', categories=group_1 + group_2, remove=('headers', 'footers', 'quotes'))

In [None]:
# Relabel the target: 0 for Group 1, 1 for Group 2
group_1_labels = [newsgroups_train.target_names.index(cat) for cat in group_1]
group_2_labels = [newsgroups_train.target_names.index(cat) for cat in group_2]

binary_labels = [0 if label in group_1_labels else 1 for label in newsgroups_train.target]

print(f'Number of training data points: {len(newsgroups_train.data)}')
print(f'First 500 class labels: {binary_labels[:500]}')

Number of training data points: 11314
First 500 class labels: [1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 1, 1, 0, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 1, 0, 0, 1, 1, 1

# Preprocessing Text

In [None]:
import re
import nltk
nltk.download('all')

from nltk.tokenize import word_tokenize

[nltk_data] Downloading collection 'all'
[nltk_data]    | 
[nltk_data]    | Downloading package abc to /root/nltk_data...
[nltk_data]    |   Package abc is already up-to-date!
[nltk_data]    | Downloading package alpino to /root/nltk_data...
[nltk_data]    |   Package alpino is already up-to-date!
[nltk_data]    | Downloading package averaged_perceptron_tagger to
[nltk_data]    |     /root/nltk_data...
[nltk_data]    |   Package averaged_perceptron_tagger is already up-
[nltk_data]    |       to-date!
[nltk_data]    | Downloading package averaged_perceptron_tagger_eng to
[nltk_data]    |     /root/nltk_data...
[nltk_data]    |   Package averaged_perceptron_tagger_eng is already
[nltk_data]    |       up-to-date!
[nltk_data]    | Downloading package averaged_perceptron_tagger_ru to
[nltk_data]    |     /root/nltk_data...
[nltk_data]    |   Package averaged_perceptron_tagger_ru is already
[nltk_data]    |       up-to-date!
[nltk_data]    | Downloading package averaged_perceptron_tagger_r

# Preprocessing

In [None]:
import numpy as np
import re
from collections import defaultdict
from nltk.stem import PorterStemmer
from nltk.corpus import stopwords
import contractions

class TextPreprocessor:
    def __init__(self, labels, threshold =0.5):
        self.stopwords = set(stopwords.words('english'))
        self.vocabulary = {}  # Use a dictionary for fast lookups {word: index}
        self.gini_scores = {}
        self.labels = labels
        self.stemmer = PorterStemmer()  # Initialize the Porter Stemmer
        self.word_counts = defaultdict(int)  # Track the number of documents containing each word
        self.threshold = threshold  # Gini index threshold for filtering

    def preprocess(self, tokens):
        """Preprocesses tokens by expanding contractions, converting to lowercase, removing stopwords, applying stemming, etc."""
        cleaned_tokens = []
        for token in tokens:
            # Expand contractions
            token = contractions.fix(token)

            # Remove URLs and emails
            token = re.sub(r'http\S+|www\S+|https\S+', '', token, flags=re.MULTILINE)
            token = re.sub(r'\S+@\S+', '', token)

            # Remove @mentions and hashtags
            token = re.sub(r'@\w+', '', token)
            token = re.sub(r'#\w+', '', token)

            # Remove non-alphanumeric characters and lowercase
            cleaned_token = re.sub(r'[^a-z0-9]', '', token.lower())

            # Remove stopwords, short words, and digits
            if cleaned_token and cleaned_token not in self.stopwords and len(cleaned_token) > 2 and not cleaned_token.isdigit():
                cleaned_token = self.stemmer.stem(cleaned_token)
                cleaned_tokens.append(cleaned_token)

        return cleaned_tokens


    def build_vocabulary(self, preprocessed_docs):
        """Builds a vocabulary from preprocessed documents."""
        word_counts = defaultdict(int)

        for doc in preprocessed_docs:
            for word in doc:
                word_counts[word] += 1

        self.word_counts = word_counts

        # original vocabulary without filtering
        self.vocabulary = {word: idx for idx, word in enumerate(word_counts.keys())}

        print(f"Vocabulary: {len(self.vocabulary)} words")

        # Store the filtered vocabulary and word counts
        self.vocabulary = {word: idx for idx, word in enumerate(word for word, count in word_counts.items())}
        self.word_counts = word_counts


        return self.vocabulary


    def calculate_gini_and_filter(self, docs):
        """
        Calculates Gini index for words and filters the vocabulary.
        """
        # Identify the unique class labels
        classes = np.unique(self.labels)
        num_classes = len(classes)

        # Initialize word counts for each class
        word_counts = defaultdict(lambda: np.zeros(num_classes, dtype=int))

        # Count word occurrences for each class
        for doc, label in zip(docs, self.labels):
            for word in doc:
                if word in self.vocabulary:
                    word_counts[word][label] += 1

        # Calculate Gini index for each word
        gini_scores = {}
        for word, counts in word_counts.items():
            total = np.sum(counts)  # Total occurrences of the word across all classes
            if total > 0:
                probabilities = counts / total
                gini = 1 - np.sum(probabilities ** 2)  # Gini index formula
                gini_scores[word] = gini

        self.gini_scores = gini_scores

        print(f"Initial Vocabulary Size: {len(self.vocabulary)}")

        # Filter the vocabulary based on the Gini index threshold
        filtered_words = [word for word, gini in gini_scores.items() if gini < self.threshold]

        # Rebuild the vocabulary with new contiguous indices
        self.vocabulary = {word: idx for idx, word in enumerate(filtered_words)}

        print(f"Filtered Vocabulary Size after gini index: {len(self.vocabulary)}")


    def vectorize(self, doc):
        """Converts a preprocessed document into a Bag-of-Words vector."""

        vector = np.zeros(len(self.vocabulary))
        for word in doc:
            if word in self.vocabulary:
                vector[self.vocabulary[word]] += 1
        return vector

    def preprocess_and_vectorize(self, tokenized_docs):
        """Preprocesses and vectorizes a list of tokenized documents."""

        preprocessed_docs = [self.preprocess(doc) for doc in tokenized_docs]

        if not self.vocabulary:
            # Build the vocabulary once
            self.build_vocabulary(preprocessed_docs)
            self.calculate_gini_and_filter(preprocessed_docs)

        # Vectorize documents efficiently
        vectors = np.array([self.vectorize(doc) for doc in preprocessed_docs])

        return vectors, preprocessed_docs

# Multinomial Naive Bayes

## Class definition

In [None]:
class MultinomialNaiveBayes:
    def __init__(self):
        """
        Initialize the Multinomial Naive Bayes classifier.
        alpha: Laplace smoothing parameter.
        """
        self.class_log_prior = {}  # Log prior probabilities log(P(class))
        self.word_log_probs = {}   # Log conditional probabilities log(P(word | class))
        self.vocabulary = {}       # Vocabulary from the preprocessor (word -> index)
        self.classes = []          # Class labels

    def fit(self, X, y, vocabulary):
        """
        Train the Naive Bayes classifier using the provided vocabulary.
        Parameters:
        X (np.ndarray): Array of document vectors (Bag-of-Words representation)
        y (list of int): Class labels
        vocabulary (dict): Vocabulary from the preprocessor (word -> index)
        """
        self.vocabulary = vocabulary  # Use the preprocessor's vocabulary
        n_docs, vocab_size = X.shape
        self.classes = np.unique(y)  # Find unique class labels

        # Calculate log class priors log(P(class))
        for cls in self.classes:
            class_count = np.sum(y == cls)
            self.class_log_prior[cls] = np.log(class_count / n_docs)

        # Initialize word counts and total word counts per class
        word_counts = {cls: np.zeros(vocab_size) for cls in self.classes}
        total_words = {cls: 0 for cls in self.classes}

        # Count word occurrences for each class
        for doc, label in zip(X, y):
            word_counts[label] += doc  # Add word frequencies to class counts
            total_words[label] += np.sum(doc)  # Sum of words in the document

        # Calculate log conditional probabilities log(P(word | class)) with Laplace smoothing
        self.word_log_probs = {cls: np.zeros(vocab_size) for cls in self.classes}
        for cls in self.classes:
            self.word_log_probs[cls] = np.log(
                (word_counts[cls] + 1) /
                (total_words[cls] + vocab_size)
            )

    def predict(self, X):
        """
        Predict the class labels for the given document vectors.
        Parameters:
        X (np.ndarray): Array of document vectors (Bag-of-Words representation)
        Returns:
        list of int: Predicted class labels
        """
        predictions = []
        for doc in X:
            class_scores = {}
            for cls in self.classes:
                # Start with the log prior log(P(class))
                score = self.class_log_prior[cls]

                # Add log(P(word | class)) * count(word) for each word in the document
                score += np.sum(doc * self.word_log_probs[cls])

                class_scores[cls] = score

            # Predict the class with the highest score
            predictions.append(max(class_scores, key=class_scores.get))
        return predictions

## Train & Validation split

In [None]:
def train_val_split(documents, labels, val_ratio=0.2, random_seed=None):
    """
    Custom function to split documents and labels into training and validation sets.

    Parameters:
    - documents (list of list of str): List of documents.
    - labels (list of int): List of corresponding labels.
    - val_ratio (float): Fraction of data to be used as the validation set (default 0.2).
    - random_seed (int): Random seed for reproducibility (default None).

    Returns:
    - train_docs (list of list of str): Training documents.
    - val_docs (list of list of str): Validation documents.
    - train_labels (list of int): Training labels.
    - val_labels (list of int): Validation labels.
    """
    # Set the random seed for reproducibility
    if random_seed is not None:
        np.random.seed(random_seed)

    # Convert documents and labels to numpy arrays
    documents = np.array(documents, dtype=object)
    labels = np.array(labels)

    # Shuffle the indices randomly
    shuffled_indices = np.random.permutation(len(documents))

    # Calculate the split index for the validation set
    val_size = int(len(documents) * val_ratio)
    val_indices = shuffled_indices[:val_size]
    train_indices = shuffled_indices[val_size:]

    # Split the documents and labels based on the indices
    train_docs, val_docs = documents[train_indices], documents[val_indices]
    train_labels, val_labels = labels[train_indices], labels[val_indices]

    # Return the splits as lists
    return train_docs.tolist(), val_docs.tolist(), train_labels.tolist(), val_labels.tolist()



### Split data into test and validation set

In [None]:
# Split the dataset into 80% tarining and 20% validation
train_docs, val_docs, train_labels, val_labels = train_val_split(newsgroups_train.data, binary_labels, val_ratio=0.2, random_seed=42)

# Display sizes of the splits to verify
print(f"Training set size: {len(train_docs)}")
print(f"Validation set size: {len(val_docs)}")

Training set size: 9052
Validation set size: 2262


## Preprocessing the train and validation data for multinomial naive bayes

In [None]:
# Initialize the TextPreprocessor with the training labels and a Gini index threshold of 0.3
preprocessor = TextPreprocessor(labels=train_labels, threshold=0.3)

# Tokenize documents using NLTK
tokenized_docs = [word_tokenize(doc) for doc in train_docs]
tokenized_val_docs = [word_tokenize(doc) for doc in val_docs]

# Preprocess and vectorize the tokenized documents
doc_vector, preprocessed_docs = preprocessor.preprocess_and_vectorize(tokenized_docs)

# Preprocess and vectorize the tokenized validation documents
val_vectors, preprocessed_val_docs = preprocessor.preprocess_and_vectorize(tokenized_val_docs)

Vocabulary: 88265 words
Initial Vocabulary Size: 88265
Filtered Vocabulary Size after gini index: 79839


## Training the multinomial naive bayes model

In [None]:
# Initialize and train the Multinomial Naive Bayes classifier
nb = MultinomialNaiveBayes()
nb.fit(doc_vector, np.array(train_labels), preprocessor.vocabulary)

## Evaluation functions for performace metrics

In [None]:
def evaluate_metrics(predictions, labels):
    """Evaluates accuracy, precision, recall, and F1-score."""
    correct_predictions = sum([1 for true, pred in zip(labels, predictions) if true == pred])
    accuracy = correct_predictions / len(labels)

    # Initialize counters for Precision, Recall, and F1-score calculations
    true_positive = false_positive = false_negative = 0

    for true, pred in zip(labels, predictions):
        if pred == 1:
            if true == 1:
                true_positive += 1
            else:
                false_positive += 1
        elif true == 1:
            false_negative += 1

    precision = true_positive / (true_positive + false_positive) if (true_positive + false_positive) > 0 else 0
    recall = true_positive / (true_positive + false_negative) if (true_positive + false_negative) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    return accuracy, precision, recall, f1

## Validation

In [None]:
# Predict the validation set labels
val_predictions = nb.predict(val_vectors)

# Evaluate the classifier's performance
accuracy, precision, recall, f1 = evaluate_metrics(val_predictions, val_labels)

# Display the evaluation metrics
print("*"*10 + "VALIDATION METRICS FOR NAIVE BAYES CLASSIFIER"+ "*"*10 +"\n")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")


**********VALIDATION METRICS FOR NAIVE BAYES CLASSIFIER**********

Accuracy: 0.8948
Precision: 0.8768
Recall: 0.9336
F1-Score: 0.9043


# Logistic Regression

## Class definition for the Logistic Regression Model

In [None]:
class LogisticRegression:
    def __init__(self, learning_rate=0.01, n_iters=1000, reqularization=0.01, tolerance=1e-5, patience=5):
        """
        Initialize the Logistic Regression model.
        learning_rate: Step size for gradient descent.
        """
        self.lr = learning_rate
        self.n_iters = n_iters
        self.regularization = reqularization
        self.tolerance = tolerance
        self.patience = patience

        self.weights = None
        self.losses = []


    def _sigmoid(self, z):
        """Apply the sigmoid function."""
        return 1 / (1 + np.exp(-z))

    def compute_loss(self, y_true, y_pred):
        """Compute binary cross-entropy loss."""

        # Ensure inputs are NumPy arrays
        y_true = np.array(y_true)
        y_pred = np.array(y_pred)
        epsilon = 1e-9  # To prevent log(0)

        return -np.mean(y_true * np.log(y_pred + epsilon) +
                        (1 - y_true) * np.log(1 - y_pred + epsilon)) + self.regularization * 0.5 * np.sum(self.weights ** 2)

    def fit(self, X, y):
        """
        Train the Logistic Regression model using gradient descent with convergence check.
        X: Document-term matrix (shape: [n_samples, n_features]).
        y: Target labels (shape: [n_samples]).
        tol: Tolerance for convergence (default: 1e-6).
        patience: Number of iterations with minimal improvement before stopping early.
        """
        n_samples, n_features = X.shape

        # Initialize weights and bias
        self.weights = np.zeros(1 + n_features)

        # Add bias term to the input data (column of ones)
        X = np.c_[np.ones((n_samples, 1)), X]

        prev_loss = float('inf')
        no_improvement = 0

        for i in range(self.n_iters):
            Z = np.dot(X, self.weights)
            y_pred = self._sigmoid(Z)

            # Compute the current loss
            loss = self.compute_loss(y, y_pred)

            # Print progress every 50 iterations
            if i % 50 == 0:
                print(f"Iteration {i} - Loss: {loss:.6f}")
                self.losses.append((i,loss))

            # Check for convergence (loss change < tolerance)
            if abs(prev_loss - loss) < self.tolerance:
                no_improvement += 1  # Increment if improvement is minimal
                if no_improvement >= self.patience:
                    print(f"Convergence achieved after {i} iterations.")
                    break  # Stop early if convergence is detected
            else:
                no_improvement = 0  # Reset if improvement is significant

            # Store the current loss as the previous loss for the next iteration
            prev_loss = loss

            # Compute gradients
            gradient = np.dot(X.T, (y_pred - y)) / n_samples + self.regularization * self.weights

            # Update parameters
            self.weights -= self.lr * gradient

        print(f"Training completed in {i} iterations.")



    def predict(self, X):
        """
        Predict class labels for the given input data.
        X: Document-term matrix (shape: [n_samples, n_features]).
        Returns:
        np.ndarray: Predicted binary class labels (0 or 1).
        """
        # Add bias in the input data
        X = np.c_[np.ones((X.shape[0], 1)), X]

        linear_model = np.dot(X, self.weights)
        y_pred = self._sigmoid(linear_model)

        return (y_pred > 0.5).astype(int)

## Preprocessing the train and validation datasets

In [None]:
# Assume you have your training and test datasets
X_train, y_train = train_docs, np.array(train_labels)
X_val, y_val = val_docs, np.array(val_labels)

# Initialize the preprocessor
logistic_preprocessor = TextPreprocessor(labels=y_train, threshold=0.3)

# Tokenize the training and test datasets
tokenized_train_docs = [word_tokenize(doc) for doc in X_train]
tokenized_val_docs = [word_tokenize(doc) for doc in X_val]

# Preprocess and vectorize the training and test datasets
X_train_vectors, _ = logistic_preprocessor.preprocess_and_vectorize(tokenized_train_docs)
X_val_vectors, _ = logistic_preprocessor.preprocess_and_vectorize(tokenized_val_docs)


## Training the model with different hyperparameters

In [None]:
# Initialize and train the Logistic Regression model
model = LogisticRegression(learning_rate=0.1, n_iters=1000, reqularization=0.01, tolerance=1e-5, patience=3)
model.fit(X_train_vectors, y_train)

## Validation of the logistic regression model

In [None]:
# Predict on the validation set
y_pred = model.predict(X_val_vectors)

# Calculate the metrics
accuracy, precision, recall, f1 = evaluate_metrics(y_pred, y_val)

# Display the evaluation metrics
print("*"*10 + "VALIDATION METRICS FOR LOGISTIC REGRESSION CLASSIFIER"+ "*"*10 +"\n")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")


# Testing

## Getting the test data set

In [None]:
# pull the test data from the dataset
newsgroups_test = fetch_20newsgroups(subset='test', categories=group_1 + group_2, remove=('headers', 'footers', 'quotes'))
test_documents = newsgroups_test.data

# Relabel the target: 0 for Group 1, 1 for Group 2
group_1_labels = [newsgroups_test.target_names.index(cat) for cat in group_1]
group_2_labels = [newsgroups_test.target_names.index(cat) for cat in group_2]

test_binary_labels = [0 if label in group_1_labels else 1 for label in newsgroups_test.target]

print(f'Number of test data points: {len(newsgroups_test.data)}')
print(f'First 500 class labels: {test_binary_labels[:500]}')

## Testing Multinomial Naive Bayes

In [None]:
# Tokenize the test documents
tokenized_test_docs = [word_tokenize(doc) for doc in test_documents]

# Preprocess and vectorize the tokenized test documents
test_doc_vector, preprocessed_test_docs = preprocessor.preprocess_and_vectorize(tokenized_test_docs)

# Predict the test set labels using the Naive Bayes classifier
predictions = nb.predict(test_doc_vector)

# Calculate metrics
accuracy, precision, recall, f1 = evaluate_metrics(predictions, test_binary_labels)

# Display the evaluation metrics
print("*"*10 + "TEST METRICS FOR NAIVE BAYES CLASSIFIER"+ "*"*10 +"\n")
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)
print("F1 Score: ", f1)

## Testing the logistic Regression

In [None]:
# Make predictions on the test set using the Logistic Regression model
predictions = model.predict(test_doc_vector)

# Calculate metrics
accuracy, precision, recall, f1 = evaluate_metrics(predictions, test_binary_labels)

# Display the evaluation metrics
print("*"*10 + "TEST METRICS FOR LOGISTIC REGRESSION CLASSIFIER"+ "*"*10 +"\n")
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)
print("F1 Score: ", f1)

# Miscellaneous

In [None]:
import matplotlib.pyplot as plt

# Plot the loss values stored during training
iterations, losses = zip(*model.losses)  # Assuming model.losses stores (iteration, loss)
plt.plot(iterations, losses)
plt.xlabel("Iteration")
plt.ylabel("Loss")
plt.title(f"Loss vs Iterations [ learning_rate={model.lr}, \n regularization={model.regularization}, tolerance={model.tolerance}, patience={model.patience} ]")
plt.show()
