<a href="https://colab.research.google.com/drive/1MOfgIh1XEKNPeov-RDPYmhe0xuPSyqL0?usp=drive_link" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spam Identifier

This classifier is designed to identify spam messages using artificial intelligence. It is implemented in Python, primarily utilizing the PyTorch and scikit-learn libraries for machine learning and natural language processing tasks.

This cell imports the necessary libraries for the spam classifier. `torch` and `torch.nn` are used for creating the neural network model. `Dataset` and `DataLoader` from `torch.utils.data` are used for creating a custom dataset and loading data in batches. `CountVectorizer` from `sklearn.feature_extraction.text` is used for converting text data into numerical vectors. The `csv` library is used for reading the spam dataset. `time` is used for training time calculation, while `re` and `sys` are here only to format the training log better.

In [6]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.feature_extraction.text import CountVectorizer
import csv
import time
import re, sys
import pandas as pd

This cell defines the `SpamClassifier` class which is a subclass of `nn.Module`. This class represents the neural network model for the spam classifier. The model consists of two linear layers and uses sigmoid activation function.

In [7]:
class SpamClassifier(nn.Module):
    def __init__(self, input_size):
        super(SpamClassifier, self).__init__()

        self.linear1 = nn.Linear(input_size, 16)
        self.linear2 = nn.Linear(16, 1)
        self.activation = nn.Sigmoid()

    def forward(self, x):
        x = self.linear1(x)
        x = self.activation(x)
        x = self.linear2(x)
        x = self.activation(x)

        return x

This cell defines the `SpamDataset` class which is a subclass of `Dataset`. This class represents the spam dataset. It reads the data from a CSV file and converts the text messages into numerical vectors using `CountVectorizer`. The labels are also converted into integers.

In [8]:
class SpamDataset(Dataset):
    def __init__(self, csv_file):
        self.data = []
        self.vectorizer = CountVectorizer()

        messages = []
        labels = []
        with open(csv_file, "r") as f:
            csv_reader = csv.reader(f)
            for row in csv_reader:
                if len(row) == 2:
                    label, message = row
                    messages.append(message)
                    labels.append(int(label == 'spam'))  # Convert label to integer

        # Convert messages to vectors
        message_vectors = self.vectorizer.fit_transform(messages).toarray()

        for vector, label in zip(message_vectors, labels):
            self.data.append((vector, label))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        message_vector, label = self.data[idx]
        return torch.tensor(message_vector, dtype=torch.float32), torch.tensor(label, dtype=torch.float32)

This cell defines the `train` function which trains the model on the training data. It uses a specified optimizer and loss function. The training is done for a specified number of epochs.

In [9]:
class Reprinter:

    def __init__(self):
        self.text = ''

    def clear_line(self):
        """Clears the line before printing the new text."""
        sys.stdout.write('\033[F')  # Move cursor up one line
        sys.stdout.write('\r' + ' ' * len(self.text))

    def __call__(self, text):
        """Prints `text` and clears the previous line."""
        self.clear_line()
        print(text, end='', flush=True)
        self.text = text
        sys.stdout.flush()



def train(model, train_data, train_loader, optimizer, loss_fn, epochs):
    reprint = Reprinter()
    start_time = time.time()  # Record the start time of training

    for epoch in range(epochs):
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            data = data.to(device)
            target = target.to(device)

            optimizer.zero_grad()
            output = model(data).squeeze()  # Remove the extra dimension from output
            loss = loss_fn(output, target)
            loss.backward()
            optimizer.step()

            if batch_idx % 10 == 0:
                reprint("Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                    epoch, batch_idx * len(data), len(train_data), 100.0 * batch_idx / len(train_loader), loss.item()
                ))

    end_time = time.time()  # Record the end time of training
    elapsed_time = end_time - start_time  # Calculate the elapsed time
    print(f"\nTraining took approximately {elapsed_time:.2f} seconds")


# Define the predict function
def predict(model, message):
    # Convert the input message to a tensor
    message_vector = torch.tensor(train_dataset.vectorizer.transform([message]).toarray(), dtype=torch.float32).to(device)

    # Move the model to the appropriate device
    model = model.to(device)

    # Make the prediction
    output = model(message_vector)
    confidence = output.item() * 100.0

    return confidence


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
  deviceName = "GPU"
else:
  deviceName = "CPU"

This cell above defines the `predict` function which makes a prediction on a given message using the trained model. The cell below sets up and trains the model using the training data from the selected dataset.

In [10]:
# @title Training Settings
choice = "Small (469 KB)" # @param ["Small (469 KB)", "Medium (10 MB)"]
model_name = "spam_classifier_medium" # @param {type:"string"}
train_now = True # @param {type:"boolean"}
if train_now:
  print("Using", deviceName)
  print("____________________________________________________________")

csv.field_size_limit(sys.maxsize)
if choice=="Small (469 KB)":
  train_dataset = SpamDataset("/content/sample_data/spam.csv")
elif choice=="Medium (10 MB)":
  train_dataset = SpamDataset("/content/sample_data/spam_20.csv")
elif choice=="Large (36 MB)":
  train_dataset = SpamDataset("spam_large.csv")
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

input_size = len(train_dataset.vectorizer.get_feature_names_out())
print(f"INPUT_SIZE FOR {model_name}: {input_size}")
model = SpamClassifier(input_size)
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.BCELoss()

if train_now:
  train(model, train_dataset, train_loader, optimizer, loss_fn, epochs=100)

  # Save the model
  torch.save(model.state_dict(), f"{model_name}.pt")

Using GPU
____________________________________________________________
INPUT_SIZE FOR spam_classifier_medium: 8709
Training took approximately 58.79 seconds


# Try The Model Out

Feel free to try the model out in the cell below! Simply enter your message in the text field on the right.

In [17]:
# @title Inference { form-width: "50%" }
text_input = "Can you send me the report by tomorrow morning?" # @param {type:"string"}
accuracy = 4 # @param {type:"slider", min:1, max:13, step:1}
model_select = "spam_classifier_small" # @param ["spam_classifier_medium", "spam_classifier_small"]

# Load the model
if model_select == "spam_classifier_small":
  train_dataset = SpamDataset("/content/sample_data/spam.csv")
elif model_select == "spam_classifier_medium":
  train_dataset = SpamDataset("/content/sample_data/spam_20.csv")
model = SpamClassifier(input_size)
if model_select == "spam_classifier_small":
  model.load_state_dict(torch.load("/content/sample_data/spam_classifier_small.pt", map_location=torch.device('cpu'), weights_only=True))
elif model_select == "spam_classifier_medium":
  model.load_state_dict(torch.load("/content/sample_data/spam_classifier_medium.pt", map_location=torch.device('cpu'), weights_only=True))


# Make a prediction
message = text_input
confidence = predict(model, message)
confidence = round(confidence, accuracy)
print(f"Confidence rate: {confidence}%")

Confidence rate: 0.0%
