In [None]:
#import lib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import os

import re
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
import string

from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer

import torch
from torch.utils.data import DataLoader, TensorDataset
from torch import nn

# download data

In [None]:
from google.colab import drive
drive.mount('/drive') #mount drive first then read data from Colab

for dirname, _, filenames in os.walk('/drive/MyDrive/Colab Notebooks'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

df = pd.read_csv("/drive/MyDrive/Colab Notebooks/emails.csv")
df.head()


In [None]:
#drop duplicate row
print(f"number of duplicate: {df.duplicated().sum()}")

df.drop_duplicates(inplace=True)
print(f"Successfully deleted duplicates {df.shape}")

#check data
df['spam'].value_counts()

In [None]:
#visualize data
# Count the number of spam and non-spam emails
spam_count = df['spam'].value_counts()[1]
non_spam_count = df['spam'].value_counts()[0]

# Create a bar chart
plt.bar(['Spam', 'Not Spam'], [spam_count, non_spam_count])
plt.title('Spam vs Not Spam')
plt.xlabel('Email Type')
plt.ylabel('Count')
plt.show()

In [None]:
# Download packages
nltk.download('stopwords')
nltk.download('punkt')

In [None]:
randomSample = df["text"].sample(3)
for text in randomSample:
  print(text,'\n')

In [None]:
# Preprocessing function
def preprocessing_text(text):
  # Lowercase the text
  text = text.lower()

  # Tokenize the text
  tokens = word_tokenize(text)

  # Remove punctuation and non-alphanumeric characters
  tokens = [word for word in tokens if word.isalnum()]

  # Remove stopwords
  stop_words = set(stopwords.words('english'))
  tokens = [word for word in tokens if word not in stop_words]

  # Initialize steming
  stemmer = PorterStemmer()
  tokens = [stemmer.stem(word) for word in tokens]

  # Join tokens back into a processed text
  processed_text = ' '.join(tokens)

  return processed_text

# Download punkt_tab
nltk.download('punkt_tab')

# Assign a new column with processed text
df['processed_text'] = df['text'].apply(preprocessing_text)
df['processed_text'].sample(5)

In [None]:
x_text = df['processed_text']
y = df['spam']

x_train, x_test, y_train, y_test = train_test_split(x_text, y,
                                                    test_size=0.2,
                                                    random_state=42)

In [None]:
spam_counts = [y_train.value_counts()[1], y_test.value_counts()[1]]
non_spam_counts = [y_train.value_counts()[0], y_test.value_counts()[0]]

x_labels = ["Train", "Test"]

plt.bar(x_labels, spam_counts, label='Spam')
plt.bar(x_labels, non_spam_counts, bottom=spam_counts, label='Not Spam')
plt.title('Spam vs Not Spam in Train and Test Sets')
plt.xlabel('Dataset')
plt.ylabel('Count')
plt.legend()
plt.show()

In [None]:
vectorize = CountVectorizer()#vectorize the data
x_train_vectorized = vectorize.fit_transform(x_train)
x_test_vectorized = vectorize.transform(x_test)

print(f"X_train_vec: {x_train_vectorized.toarray().shape}")
print(f"X_test_vec: {x_test_vectorized.toarray().shape}")

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
#convert data to pytorch tensor
X_train_tensor = torch.tensor(x_train_vectorized.toarray(),
                             dtype=torch.float32,
                             device=device)
y_train_tensor = torch.tensor(y_train.values,
                             dtype=torch.float32,
                             device=device)
X_test_tensor = torch.tensor(x_test_vectorized.toarray(),
                             dtype=torch.float32,
                             device=device)
y_test_tensor = torch.tensor(y_test.values,
                             dtype=torch.float32,
                             device=device)

In [None]:
train_set = TensorDataset(X_train_tensor, y_train_tensor)
test_set = TensorDataset(X_test_tensor, y_test_tensor)

batch_size = 64
train_loader = DataLoader(train_set,
                          batch_size=batch_size,
                          shuffle=True)
test_loader = DataLoader(test_set,
                         batch_size=batch_size,
                         shuffle=False)

In [None]:
class SpamFilter(nn.Module):
  def __init__(self, input_size):
    super(SpamFilter, self).__init__()

    self.layer1 = nn.Linear(input_size, 128)
    self.layer2 = nn.Linear(128, 64)
    self.layer3 = nn.Linear(64, 1)

  def forward(self, x):
    out = torch.relu(self.layer1(x))
    out = torch.relu(self.layer2(out))
    out = torch.sigmoid(self.layer3(out))
    return out


In [None]:
#init model
input_size = x_train_vectorized.shape[1]
model_spam_filter = SpamFilter(input_size)

#loss
criterion = nn.BCELoss()

# optimizer
optimizer = torch.optim.Adam(params=model_spam_filter.parameters(),
                            lr=0.001)

In [None]:
#training loop
for epoch in range(10):
  model_spam_filter.train()  # Set the model to training mode
  running_loss = 0.0

  for inputs, labels in train_loader:
      optimizer.zero_grad()

      outputs = model_spam_filter(inputs)

      loss = criterion(outputs, labels.unsqueeze(1))
      loss.backward()

      optimizer.step()

      running_loss += loss.item()


  print(f'Epoch {epoch+1}/{10}, Loss: {running_loss}')

In [None]:
model_spam_filter.eval()

correct = 0
total = 0

with torch.inference_mode():
  for input, labels in test_loader:
    outputs = model_spam_filter(input)
    predicted = (outputs > 0.5).float()

    total += labels.size(0)
    correct += (predicted == labels.unsqueeze(1)).sum().item()

accuracy = correct / total
print(f'Accuracy: {accuracy * 100} %')

In [None]:
spam_email = """Subject: Urgent: Claim Your Prize Now!

Congratulations! You have been selected as the lucky winner of our grand prize giveaway! Claim your prize now by clicking on the link below. Don't miss out on this amazing opportunity!

Click here to claim your prize: superlottery@gmail.com

Hurry, this offer is only available for a limited time!

Best Regards,
Spammy Marketing Team
"""

non_spam_email = """Subject: Meeting Agenda for Tomorrow

Hi Team,

I hope this email finds you well. I wanted to remind everyone about the meeting scheduled for tomorrow at 10:00 AM. Below is the agenda:

1. Review of project milestones
2. Discussion on upcoming deadlines
3. Any other business

Please come prepared with any updates or questions you may have. Looking forward to a productive meeting.

Best regards,
Mr. Bannerjee
"""
def classify_email(email_text):
    # Preprocess the email
    preprocessed_email = preprocessing_text(email_text)

    #transform email
    vectorized_email = vectorize.transform([preprocessed_email])

    #convert to tensor
    email_tensor = torch.tensor(vectorized_email.toarray(),
                               dtype=torch.float32,
                               device=device)

    output = model_spam_filter(email_tensor)

    if output > 0.5:
        return "spam"
    else:
        return "not spam"

print(f"email 1 is : {classify_email(spam_email)}")
print(f"email 2 is : {classify_email(non_spam_email)}")

In [None]:
# Saving PyTorch Model
from pathlib import Path

# Create model's directory
MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents=True, exist_ok=True)

# Create model save path
MODEL_NAME = "Spam_Classification.pth"
MODEL_SAVE_PATH = MODEL_PATH/MODEL_NAME

# Save the model's state dict
print(f"Saving model to {MODEL_SAVE_PATH}")
torch.save(obj=model_spam_filter.state_dict(), f=MODEL_SAVE_PATH)