## Æther: Pipeline


In [None]:
import os
import torch
import pandas as pd
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModel
import torch.nn as nn
from sklearn.model_selection import train_test_split
from tqdm import tqdm

## Dataset


In [None]:
class PlagiarismDataset(Dataset):
    """
    PyTorch Dataset implementation for code plagiarism detection

    This class handles pairs of code files, reading them from specific
    directories, tokenizing them together, and preparing them for model input.

    Attributes:
        file_pairs (list): List of tuples containing pairs of files IDS (id1, id2)
        labels (list): List of binary labels (0 for no plagiarism, 1 for plagiarism)
        base_dir (Path): Base directory containing the code files
        tokenizer: HuggingFace tokenizer for encoding code pairs
        max_length (int): Maximum sequence length for tokenization
    """

    def __init__(self, file_pairs, labels, base_dir, tokenizer, max_length=512):
        """
        Initialize the plagiarism dataset.

        Args:
            file_pairs (list): List of tuples containing pairs of file IDs to compare
            labels (list): Corresponding binary labels (0/1) indicating plagiarism
            base_dir (str or Path): Root directory containing the code files
            tokenizer: HuggingFace tokenizer for encoding the code
            max_length (int, optional): Maximum sequence length for tokenization. Defaults to 512.
        """

        self.file_pairs = file_pairs
        self.labels = labels
        self.base_dir = Path(base_dir)
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        """
        Get the number of code pairs in the dataset.

        Returns:
            int: Number of code pairs
        """

        return len(self.file_pairs)

    def __getitem__(self, idx):
        """
        Get a code pair and its label by index.

        This method:
        1. Retrieves file IDs and label for the specified index
        2. Reads code content from both files
        3. Tokenizes both code samples together
        4. Returns a dictionary with tokenized inputs and label

        Args:
            idx (int): Index of the code pair to retrieve

        Returns:
            dict: Dictionary containing:
                - 'input_ids': Tokenized input IDs
                - 'attention_mask': Attention mask for the tokenized input
                - 'label': Tensor containing the plagiarism label
        """

        id1, id2 = self.file_pairs[idx]
        label = self.labels[idx]

        code1 = self._read_file(id1)
        code2 = self._read_file(id2)

        encoded = self.tokenizer(
            code1,
            code2,
            padding="max_length",
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt",
        )

        return {
            "input_ids": encoded["input_ids"].squeeze(),
            "attention_mask": encoded["attention_mask"].squeeze(),
            "label": torch.tensor(label, dtype=torch.long),
        }

    def _read_file(self, file_id):
        """
        Read a code file from either the plagiarism or non-plagiarism directory.

        This method tries multiple possible file paths:
        1. In the 'plagio' directory with and without .txt extension
        2. In the 'no plagio' directory with and without .txt extension

        It handles potential encoding issues by trying both UTF-8 and Latin-1 encodings.

        Args:
            file_id: ID of the file to read

        Returns:
            str: Content of the code file or empty string if file not found
        """

        plagio_path = self.base_dir / "plagio" / f"{file_id}"
        no_plagio_path = self.base_dir / "no_plagio" / f"{file_id}"

        paths = [
            plagio_path,
            no_plagio_path,
            Path(str(plagio_path) + ".txt"),
            Path(str(no_plagio_path) + ".txt"),
        ]

        for path in paths:
            if path.exists():
                try:
                    return path.read_text(encoding="utf-8")

                except UnicodeDecodeError:
                    return path.read_text(encoding="latin-1")

        return ""

## Model


In [None]:
class PlagiarismModel(nn.Module):
    """
    Neural network model for code plagiarism detection based on CodeBERT embeddings.

    This model utilizes CodeBERT to extract contextual embeddings from pairs of
    code samples, then passes the CLS token representation through a classifier
    network to predict whether the code pair exhibits plagiarism.

    Attributes:
        codebert: Pretrained CodeBERT model for extracting code embeddings
        classifier: Sequential neural network for binary classification
    """

    def __init__(self, model_name):
        """
        Initialize the plagiarism detection model.

        Args:
            model_name (str): Name or path of the pretrained CodeBERT model
                              (e.g., "microsoft/codebert-base")

        Note:
            The model assumes CodeBERT's hidden size is 768 dimensions. If using
            a different pretrained model, this value may need adjustment.
        """

        super(PlagiarismModel, self).__init__()

        self.codebert = AutoModel.from_pretrained(model_name)
        self.classifier = nn.Sequential(
            nn.Linear(768, 256),  # Reduce from CodeBERT's 768 dimensions to 256
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(256, 2),
        )

    def forward(self, input_ids, attention_mask):
        """
        Forward pass of the model.

        The model processes tokenized code pairs through CodeBERT, extracts the
        CLS token representation (which encodes the relationship between the two
        code samples), and passes it through the classifier to get plagiarism
        prediction logits.

        Args:
            input_ids (torch.Tensor): Tokenized input IDs of code pairs
            attention_mask (torch.Tensor): Attention mask for the input

        Returns:
            torch.Tensor: Logits for binary classification (no plagiarism, plagiarism)
        """

        outputs = self.codebert(input_ids=input_ids, attention_mask=attention_mask)
        cls_output = outputs.last_hidden_state[:, 0, :]

        return self.classifier(cls_output)

## Detection


In [None]:
class PlagiarismDetection:
    """
    Main class for code plagiarism detection that orchestrates the entire workflow.

    This class handles the complete plagiarism detection pipeline including:
    - Model initialization and configuration
    - Data loading and preprocessing
    - Model training and evaluation
    - Model saving and loading

    Attributes:
        model_name (str): Name or path of the CodeBERT model
        tokenizer: HuggingFace tokenizer for encoding code pairs
        model: The neural network model for plagiarism detection
        device: PyTorch device for computation (CPU or GPU)
    """

    def __init__(self, model_name="microsoft/codebert-base"):
        """
        Initialize the plagiarism detection system.

        Args:
            model_name (str, optional): Pretrained model identifier to use.
                                       Defaults to "microsoft/codebert-base".
        """

        self.model_name = model_name
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = PlagiarismModel(model_name)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)

    def load_data(self, csv_path, data_dir):
        """
        Load and prepare data for training and testing.

        Reads the CSV file containing code pair IDs and plagiarism labels,
        splits the data into training and test sets, and creates DataLoader
        instances for efficient batch processing.

        Args:
            csv_path (str): Path to the CSV file with columns 'id1', 'id2', 'plagio'
            data_dir (str): Directory containing code files

        Returns:
            tuple: (train_loader, test_loader) - DataLoader instances for training and testing
        """

        df = pd.read_csv(csv_path)
        file_pairs = list(zip(df["id1"], df["id2"]))
        labels = df["plagio"].tolist()

        train_pairs, test_pairs, train_labels, test_labels = train_test_split(
            file_pairs, labels, test_size=0.2, random_state=42, stratify=labels
        )

        train_dataset = PlagiarismDataset(
            train_pairs, train_labels, data_dir, self.tokenizer
        )
        test_dataset = PlagiarismDataset(
            test_pairs, test_labels, data_dir, self.tokenizer
        )

        train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=16)

        return train_loader, test_loader

    def train(self, train_loader, epochs=3):
        """
        Train the plagiarism detection model.

        Performs training for the specified number of epochs, tracking
        loss and accuracy metrics throughout the process.

        Args:
            train_loader (DataLoader): DataLoader containing training data
            epochs (int, optional): Number of training epochs. Defaults to 3.
        """

        optimizer = torch.optim.AdamW(self.model.parameters(), lr=2e-5)
        criterion = nn.CrossEntropyLoss()

        self.model.train()
        for epoch in range(epochs):
            total_loss = 0
            correct = 0
            total = 0

            for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
                input_ids = batch["input_ids"].to(self.device)
                attention_mask = batch["attention_mask"].to(self.device)
                labels = batch["label"].to(self.device)

                # Forward pass
                optimizer.zero_grad()
                outputs = self.model(input_ids, attention_mask)
                loss = criterion(outputs, labels)

                # Backward pass
                loss.backward()
                optimizer.step()

                # Track metrics
                total_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

            avg_loss = total_loss / len(train_loader)
            accuracy = 100 * correct / total
            print(f"Epoch {epoch+1}: Loss={avg_loss:.4f}, Accuracy={accuracy:.2f}%")

    def evaluate(self, test_loader):
        """
        Evaluate the model on test data.

        Sets the model to evaluation mode and calculates accuracy on the test set.

        Args:
            test_loader (DataLoader): DataLoader containing test data

        Returns:
            float: Accuracy percentage on test data
        """

        self.model.eval()
        correct = 0
        total = 0

        with torch.no_grad():
            for batch in tqdm(test_loader, desc="Testing"):
                input_ids = batch["input_ids"].to(self.device)
                attention_mask = batch["attention_mask"].to(self.device)
                labels = batch["label"].to(self.device)

                outputs = self.model(input_ids, attention_mask)
                _, predicted = torch.max(outputs, 1)

                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        accuracy = 100 * correct / total
        print(f"Test Accuracy: {accuracy:.2f}%")
        return accuracy

    def save_model(self, path):
        """
        Save the trained model to a file.

        Args:
            path (str): Path where the model should be saved
        """

        torch.save(self.model.state_dict(), path)
        print(f"Model saved to {path}")

    def load_model(self, path):
        """
        Load a trained model from a file.

        Args:
            path (str): Path to the saved model file
        """

        self.model.load_state_dict(torch.load(path, map_location=self.device))
        print(f"Model loaded from {path}")

## Execution


In [None]:
detector = PlagiarismDetection()

In [None]:
train_loader, test_loader = detector.load_data(
    csv_path="datasets/sdata.csv", data_dir="datasets"
) 

In [None]:
detector.train(train_loader, epochs=3)
accuracy = detector.evaluate(test_loader)

In [None]:
detector.save_model("plagiarism_model_codeBERT.pt")