## Torch CRF install

In [None]:
!pip install pytorch-crf

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Imports

In [None]:
import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from torchcrf import CRF
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, matthews_corrcoef

## Dataset and Customizing for input

In [None]:
class MixedTextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        boundary_idx = int(self.labels[idx])
        words = text.split()
        if len(words) > self.max_len - 2:
            if boundary_idx > self.max_len - 2:
                words = words[-(self.max_len - 2):]
                boundary_idx = 0
            else:
                words = words[:self.max_len - 2]
        word_labels = [0 if i <= boundary_idx else 1 for i in range(len(words))]
        truncated_text = " ".join(words)
        encoding = self.tokenizer(
            truncated_text,
            max_length=self.max_len,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
            return_special_tokens_mask=True
        )
        special_tokens_mask = encoding['special_tokens_mask'][0]
        token_labels = []
        current_word_idx = 0

        for is_special in special_tokens_mask:
            if is_special:
                token_labels.append(-100)
            else:
                if current_word_idx < len(word_labels):
                    token_labels.append(word_labels[current_word_idx])
                    current_word_idx += 1
                else:
                    token_labels.append(-100)
        token_labels = token_labels[:self.max_len]
        if len(token_labels) < self.max_len:
            token_labels.extend([-100] * (self.max_len - len(token_labels)))
        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "labels": torch.tensor(token_labels)
        }

## Define Main Model

In [None]:
class TransformerTaggerCRF(nn.Module):
    def __init__(self, model_name, num_labels):
        super(TransformerTaggerCRF, self).__init__()
        self.num_labels = num_labels
        self.transformer = AutoModel.from_pretrained(model_name)
        self.dropout = nn.Dropout(0.1)
        hidden_size = self.transformer.config.hidden_size
        self.classifier = nn.Linear(hidden_size, num_labels)
        nn.init.xavier_uniform_(self.classifier.weight)
        nn.init.constant_(self.classifier.bias, 0)
        self.crf = CRF(num_labels, batch_first=True)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.transformer(input_ids, attention_mask=attention_mask)
        sequence_output = self.dropout(outputs.last_hidden_state)
        logits = self.classifier(sequence_output)

        if labels is not None:
            mask = attention_mask.bool()
            crf_labels = labels.clone()
            crf_labels[crf_labels == -100] = 0
            loss = -self.crf(logits, crf_labels, mask=mask, reduction='mean')
            return loss
        else:
            mask = attention_mask.bool()
            predictions = self.crf.decode(logits, mask=mask)
            return torch.tensor(predictions)

## Metrics, Train and Evaluate Model

In [None]:
def compute_metrics(predictions, labels):
    predictions = np.array(predictions)
    labels = np.array(labels)

    accuracy = accuracy_score(labels, predictions)
    precision = precision_score(labels, predictions, average="binary", zero_division=0)
    recall = recall_score(labels, predictions, average="binary", zero_division=0)
    f1 = f1_score(labels, predictions, average="binary", zero_division=0)
    mcc = matthews_corrcoef(labels, predictions)
    return accuracy, precision, recall, f1, mcc

def train_model(model, data_loader, optimizer, scheduler, device):
    model.train()
    total_loss = 0

    for batch in data_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        optimizer.zero_grad()
        loss = model(input_ids, attention_mask, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        total_loss += loss.item()

    return total_loss / len(data_loader)

def evaluate_model(model, data_loader, device):
    model.eval()
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)
            outputs = model.transformer(input_ids, attention_mask=attention_mask)
            sequence_output = model.dropout(outputs.last_hidden_state)
            logits = model.classifier(sequence_output)
            mask = attention_mask.bool()
            predictions = model.crf.decode(logits, mask)
            for pred_seq, label_seq, mask_seq in zip(predictions, labels, attention_mask):
                pred_seq = torch.tensor(pred_seq, device=device)
                if len(pred_seq) < mask_seq.size(0):
                    padding = torch.zeros(mask_seq.size(0) - len(pred_seq), device=device)
                    pred_seq = torch.cat([pred_seq, padding])
                mask_seq = mask_seq.to(device)
                label_seq = label_seq.to(device)
                valid_indices = (mask_seq == 1) & (label_seq != -100)
                valid_pred = pred_seq[valid_indices].cpu().numpy()
                valid_label = label_seq[valid_indices].cpu().numpy()

                all_predictions.extend(valid_pred)
                all_labels.extend(valid_label)

    all_predictions = np.array(all_predictions)
    all_labels = np.array(all_labels)
    absolute_errors = np.abs(all_predictions - all_labels)
    mae = np.mean(absolute_errors)
    std_dev = np.std(absolute_errors)
    accuracy, precision, recall, f1, mcc = compute_metrics(all_predictions, all_labels)
    return accuracy, precision, recall, f1, mcc, mae, std_dev

## Model Selection and Setting up

In [None]:
MODEL_NAME = 'roberta-base'

def setup_training(train_texts, train_labels, dev_texts, dev_labels, model_name=MODEL_NAME, batch_size=8, max_len=512):
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    train_dataset = MixedTextDataset(train_texts, train_labels, tokenizer, max_len)
    dev_dataset = MixedTextDataset(dev_texts, dev_labels, tokenizer, max_len)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    dev_loader = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = TransformerTaggerCRF(model_name, num_labels=2)
    model.to(device)

    optimizer_grouped_parameters = [
        {"params": model.transformer.embeddings.parameters(), "lr": 1e-6},
        {"params": model.transformer.encoder.layer[:6].parameters(), "lr": 5e-6},
        {"params": model.transformer.encoder.layer[6:].parameters(), "lr": 1e-5},
        {"params": model.classifier.parameters(), "lr": 1e-4},
        {"params": model.crf.parameters(), "lr": 1e-4},
    ]
    optimizer = torch.optim.AdamW(optimizer_grouped_parameters, lr=5e-6, weight_decay=0.01)
    train_steps = len(train_loader) * 3
    scheduler = torch.optim.lr_scheduler.LinearLR(optimizer, start_factor=0.1, total_iters=train_steps)
    return model, train_loader, dev_loader, optimizer, scheduler, device

In [None]:
train_df = pd.read_csv('/content/drive/MyDrive/SCI AIGC/sentence_level_train_hc3.csv')
dev_df = pd.read_csv('/content/drive/MyDrive/SCI AIGC/sentence_level_dev_hc3.csv')

train_texts = train_df["text"].tolist()
train_labels = train_df["label"].tolist()
dev_texts = dev_df["text"].tolist()
dev_labels = dev_df["label"].tolist()

model, train_loader, dev_loader, optimizer, scheduler, device = setup_training(train_texts, train_labels, dev_texts, dev_labels, MODEL_NAME)


In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
dataset = MixedTextDataset([train_texts[0]], [train_labels[0]], tokenizer, max_len=512)
sample = dataset[0]
print(f"Input sequence length: {len(sample['input_ids'])}")
print(f"Label sequence length: {len(sample['labels'])}")

In [None]:
device

## Train Model and Save it

In [None]:
import time

epochs = 3
best_f1 = 0.0
best_epoch = 0
best_model_path = "roberta_best_TC.pth"

for epoch in range(epochs):
    start_time = time.time()
    train_loss = train_model(model, train_loader, optimizer, scheduler, device)
    val_accuracy, val_precision, val_recall, val_f1, val_mcc, val_mae, val_std_dev = evaluate_model(model, dev_loader, device)
    end_time = time.time()
    epoch_duration = end_time - start_time

    print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}")
    print(f"Validation: Accuracy: {val_accuracy:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, "
          f"F1 Score: {val_f1:.4f}, MCC: {val_mcc:.4f}, MAE: {val_mae:.2f}±{val_std_dev:.2f}")
    print(f"Time taken for epoch {epoch + 1}: {epoch_duration:.4f} seconds")

    if val_f1 > best_f1:
        best_f1 = val_f1
        best_epoch = epoch + 1
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
            'loss': train_loss,
        }, best_model_path)
        print(f"Model saved at epoch {epoch + 1} with F1 Score: {val_f1:.4f}")

print(f"Best model saved at epoch {best_epoch}, saved to {best_model_path}")


## Load Model

In [None]:
checkpoint = torch.load(best_model_path)
model.load_state_dict(checkpoint['model_state_dict'])

## Model on Test Data

In [None]:
model.eval()

test_df = pd.read_csv('/content/drive/MyDrive/SCI AIGC/sentence_level_test_hc3.csv')
test_texts = test_df["text"].tolist()
test_labels = test_df["label"].tolist()

test_dataset = MixedTextDataset(test_texts, test_labels, tokenizer, max_len=512)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

test_accuracy, test_precision, test_recall, test_f1, test_mcc, test_mae, test_std_dev = evaluate_model(model, test_loader, device)

print(f"Test Results: Accuracy: {test_accuracy:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}, "
      f"F1 Score: {test_f1:.4f}, MCC: {test_mcc:.4f}, MAE: {test_mae:.2f}±{test_std_dev:.2f}")


In [None]:
test_df

In [None]:
import pandas as pd
import torch

def predict_boundary(model, text, tokenizer, max_len, device):
    model.eval()

    encoding = tokenizer(
        text,
        max_length=max_len,
        padding="max_length",
        truncation=True,
        return_tensors="pt",
        return_special_tokens_mask=True
    )

    input_ids = encoding["input_ids"].to(device)
    attention_mask = encoding["attention_mask"].to(device)

    with torch.no_grad():
        outputs = model.transformer(input_ids, attention_mask=attention_mask)
        sequence_output = model.dropout(outputs.last_hidden_state)
        logits = model.classifier(sequence_output)
        mask = attention_mask.bool()
        predictions = model.crf.decode(logits, mask)

    pred_labels = predictions[0]
    boundary_index = next((i for i, label in enumerate(pred_labels) if label == 1), len(pred_labels))

    return boundary_index


In [None]:
test_df["predicted_boundary"] = test_df["text"].apply(lambda x: predict_boundary(model, x, tokenizer, max_len=512, device=device))

In [None]:
test_df

In [None]:
test_df.to_csv('Transformer_CRF_RoBERTa_HC3.csv', index = False)

In [None]:
mae = (test_df['label'] - test_df['predicted_boundary']).abs().mean()
mae

In [None]:
difference = (test_df['label'] - test_df['predicted_boundary']).abs()
mae = difference.mean()
sd = difference.std()
print(f"MAE ± SD: {mae:.2f} ± {sd:.2f}")
