In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.metrics import f1_score, confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.metrics import roc_curve, roc_auc_score
import matplotlib.pyplot as plt
import numpy as np
import string

In [None]:
# Set random seed for repeatability
SEED = 42
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
np.random.seed(SEED)
os.environ['PYTHONHASHSEED'] = str(SEED)

In [None]:
# Character-level processing
CHARSET = string.ascii_lowercase + string.ascii_uppercase + string.digits + string.punctuation + ' '
CHAR_TO_INDEX = {ch: i + 1 for i, ch in enumerate(CHARSET)}  # 0 is reserved for padding
MAX_LEN = 1024  # Use only the first X characters

In [None]:
class TextDataset(Dataset):
    def __init__(self, folder_path, label):
        self.files = [os.path.join(folder_path, f) for f in os.listdir(folder_path)]
        self.label = label
    
    def __len__(self):
        return len(self.files)
    
    def __getitem__(self, idx):
        file_path = self.files[idx]
        with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
            text = file.read()[:MAX_LEN]
        
        encoded_text = [CHAR_TO_INDEX.get(ch, 0) for ch in text]  # Convert chars to indices
        if len(encoded_text) < MAX_LEN:
            encoded_text += [0] * (MAX_LEN - len(encoded_text))  # Pad to MAX_LEN
        
        return torch.tensor(encoded_text, dtype=torch.long), torch.tensor(self.label, dtype=torch.float32), file_path

In [None]:
class CharCNN(nn.Module):
    def __init__(self, vocab_size, embed_dim=16, num_filters=64, kernel_sizes=(7, 5, 3), num_classes=1, dropout_rate=0.5):
        super(CharCNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.conv_layers = nn.ModuleList([nn.Conv1d(embed_dim, num_filters, k) for k in kernel_sizes])
        self.batch_norm_layers = nn.ModuleList([nn.BatchNorm1d(num_filters) for _ in kernel_sizes])
        self.pool = nn.AdaptiveMaxPool1d(1)
        self.fc = nn.Linear(num_filters * len(kernel_sizes), num_classes)
        self.dropout = nn.Dropout(dropout_rate)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = self.embedding(x).permute(0, 2, 1)  # (batch, embed_dim, seq_len)
        x = [self.batch_norm_layers[i](torch.relu(conv(x))) for i, conv in enumerate(self.conv_layers)]
        x = [self.pool(layer).squeeze(-1) for layer in x]
        x = torch.cat(x, dim=1)
        x = self.dropout(x)
        x = self.sigmoid(self.fc(x))
        return x

In [None]:
# Define paths
# negative_folder = "enron/kaminski-nyt" # llm generated
# negative_folder = "enron/top11-o" # 11 enron senders
negative_folder = "enron/BEC-2-emails" # BEC-2 + llm generated + 11 enron senders 
# negative_folder = "enron/nyt-alt" # BEC-2 + 11 enron senders; to be deleted
# positive_folder = "enron/kaminski-v" # kaminski-v sender
# positive_folder = "enron/kaminski-nyt"
positive_folder = "enron/stclair-c" # stclair-c sender

In [None]:
# Prepare datasets
negative_dataset = TextDataset(negative_folder, 0)
positive_dataset = TextDataset(positive_folder, 1)

dataset = negative_dataset + positive_dataset
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# Model setup
vocab_size = len(CHAR_TO_INDEX) + 1  # +1 for padding character
model = CharCNN(vocab_size)
criterion = nn.BCELoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)

In [None]:
def train_model(model, train_loader, criterion, optimizer, scheduler, epochs=100, patience=10):
    model.train()
    best_loss = float('inf')
    patience_counter = 0
    for epoch in range(epochs):
        total_loss = 0
        for inputs, labels, file_paths in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs).squeeze()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        avg_loss = total_loss / len(train_loader)
        scheduler.step(avg_loss)
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.6f}, LR: {optimizer.param_groups[0]['lr']:.6f}")
        
        if avg_loss < best_loss:
            best_loss = avg_loss
            patience_counter = 0
        else:
            patience_counter += 1
        
        if patience_counter >= patience:
            print("Early stopping triggered.")
            break

In [None]:
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    all_labels = []
    all_predictions = []
    false_positives = []
    false_negatives = []

    with torch.no_grad():
        for inputs, labels, paths in test_loader:
            outputs = model(inputs).squeeze()
            predicted = (outputs >= 0.5).float()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            all_labels.extend(labels.tolist())
            all_predictions.extend(predicted.tolist())

            # Identify FP and FN
            for i in range(len(labels)):
                true_label = labels[i].item()
                pred_label = predicted[i].item()
                if true_label == 0 and pred_label == 1:
                    false_positives.append(paths[i])
                elif true_label == 1 and pred_label == 0:
                    false_negatives.append(paths[i])
    
    accuracy = 100 * correct / total
    f1 = f1_score(all_labels, all_predictions, average='macro')
    cm = confusion_matrix(all_labels, all_predictions)
    tn, fp, fn, tp = cm.ravel()
    false_positive_rate = fp / (fp + tn)
    
    print(f"Test Accuracy: {accuracy:.2f}%")
    print(f"F1 Score: {f1:.4f}")
    print(f"False Positive: {fp}")
    print(f"False Negatives: {fn}")

    # Print files corresponding to FPs and FNs
    print("\nFalse Positives:")
    for fp_path in false_positives:
        print(f"  {fp_path}")

    print("\nFalse Negatives:")
    for fn_path in false_negatives:
        print(f"  {fn_path}")

    # Display the confusion matrix
    cm_labels = np.array([1, 0])
    ConfusionMatrixDisplay.from_predictions(all_labels, all_predictions, colorbar=False, labels=cm_labels, cmap='binary')
    plt.show()

In [None]:
def evaluate_model_with_roc(model, test_loader):
    model.eval()
    all_labels = []
    all_probs = []

    with torch.no_grad():
        for inputs, labels, _ in test_loader:
            outputs = model(inputs).squeeze()
            probs = outputs.detach().cpu().numpy()
            all_probs.extend(probs)
            all_labels.extend(labels.cpu().numpy())

    # Compute ROC-AUC
    fpr, tpr, thresholds = roc_curve(all_labels, all_probs)
    auc_score = roc_auc_score(all_labels, all_probs)

    print(f"ROC AUC Score: {auc_score:.4f}")

    # Plot ROC Curve
    plt.figure()
    plt.plot(fpr, tpr, color='black', label=f'ROC curve (AUC = {auc_score:.4f})')
    plt.plot([0, 1], [0, 1], color='gray', linestyle='--', label='Random Classifier')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    # plt.title('ROC Curve - Char-CNN Classifier')
    plt.legend(loc='lower right')
    plt.grid(alpha=0.3)
    plt.tight_layout()
    plt.show()

In [None]:
%%time
train_model(model, train_loader, criterion, optimizer, scheduler, epochs=100, patience=10)

In [None]:
%%time
evaluate_model(model, test_loader)

In [None]:
"""
evaluate_model_with_roc(model, test_loader)
"""

In [None]:
"""
from torchviz import make_dot

# Create a dummy input and get model output
vocab_size = len(CHAR_TO_INDEX) + 1  # +1 for padding character
dummy_input = torch.randint(0, vocab_size, (1, MAX_LEN), dtype=torch.long)
output = model(dummy_input)

# Visualize the graph
dot = make_dot(output, params=dict(model.named_parameters()))
dot.format = 'png'
dot.render("char_cnn_model")
"""

In [None]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Number of parameters: {total_params}")

In [None]:
from torchinfo import summary

summary(model, input_size=(32, 1024), dtypes=[torch.long])