# Large Language Model Artificial Textclassification

This approach uses a LLM for text encoding and learns a downstream task for detection of artificial generated texts based on this encoding.

In [None]:
from transformers import GPT2Model, GPT2Tokenizer
from torch.optim import AdamW
from torch.utils.data import DataLoader
import dataprocessing.dataset as ds
import numpy as np
import pandas as pd 
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.manifold import TSNE
import xgboost as xgb
import torch
import torch.nn as nn
from tqdm import tqdm
import matplotlib.pyplot as plt

Define hardware usage.

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Device {device}')

Model Parameters

In [None]:
learning_rate = 1e-5
num_epochs = 5
num_classes = 2
batch_size = 20
model_name = 'gpt2'

## Load & Preprocess Dataset

In [None]:
def load_dataset():

    train_set_1 = pd.read_csv("./data/train_drcat_04.csv")
    #using only data with label == 1
    #train_set_1 = train_set_1[train_set_1["label"]==1]
    train_set_1 = train_set_1[["text","label"]]
    train_set_1['text'] = train_set_1['text'].str.replace('\n', '')

    train_set_2 = pd.read_csv("./data/daigt_external_dataset.csv", sep=',')
    train_set_2 = train_set_2.rename(columns={'generated': 'label'})
    train_set_2 = train_set_2[["source_text"]]
    train_set_2.columns = ["text"]
    train_set_2['text'] = train_set_2['text'].str.replace('\n', '')
    train_set_2["label"] = 1

    train_set_3 = pd.read_csv("./data/train_essays_RDizzl3_seven_v1.csv")

    train_set = pd.concat([train_set_1,train_set_2,train_set_3])

    X_train, X_val, y_train, y_val = train_test_split(train_set["text"],train_set["label"],test_size=0.2)

    data_train = []
    data_val = []
    max_sequence_length = 0

    for ii in range(len(X_train)):
        data_train.append({'text': X_train.values[ii], 'label': y_train.values[ii]})
        if len(X_train.values[ii]) > max_sequence_length: max_sequence_length=len(X_train.values[ii])
    for ii in range(len(X_val)):
        data_val.append({'text': X_val.values[ii], 'label': y_val.values[ii]})
        if len(X_val.values[ii]) > max_sequence_length: max_sequence_length=len(X_val.values[ii])

    print(f'Number of Training Data: {len(y_train)}, Number of Validation Data: {len(y_val)}')

    return data_train, data_val, max_sequence_length


In [None]:
def tokenize_function(data, tokenizer, max_length):
    
    data_list = []  

    for ii in tqdm(range(len(data)), desc=f'Tokenize'):
        text = data[ii]['text']
        label = data[ii]['label']

        # Tokenize the text using the GPT tokenizer
        tokenized_text = tokenizer.encode_plus(
            text, 
            add_special_tokens=True,
            max_length=max_length,
            truncation=True,
            padding='max_length' if max_length else 'longest',
            return_tensors='pt'
        )

        data_list.append({'text': tokenized_text, 'label': label})

    return data_list

In [None]:
data_train, data_val, max_sequence_length = load_dataset()

if max_sequence_length > 50: max_sequence_length = 50
print(f'Maximum Sequence Lenght: {max_sequence_length}')


### Tokenization

In [None]:
tokenizer = GPT2Tokenizer.from_pretrained(model_name)
# default to left padding
tokenizer.padding_side = "left"
# Define PAD Token = EOS Token = 50256
tokenizer.pad_token = tokenizer.eos_token

data_train = tokenize_function(data_train, tokenizer, max_sequence_length)
data_val = tokenize_function(data_val, tokenizer, max_sequence_length)

train_custom_dataset = ds.CustomDataset(data_train)
val_custom_dataset = ds.CustomDataset(data_val)

train_data_loader = DataLoader(train_custom_dataset, batch_size=batch_size, shuffle=True)
val_data_loader = DataLoader(val_custom_dataset, batch_size=batch_size, shuffle=False)

In [None]:
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, embeddings, labels):
        # Calculate pairwise distances
        pairwise_distances = torch.cdist(embeddings, embeddings)
        
        # Expand labels to compare each pair
        expanded_labels = labels.unsqueeze(0) == labels.unsqueeze(1)
        
        # Calculate loss based on pairwise distances and labels
        loss_matrix = expanded_labels.float() * torch.pow(pairwise_distances, 2) + \
                      (~expanded_labels).float() * torch.pow(torch.clamp(self.margin - pairwise_distances, min=0.0), 2)
        
        # Ignore diagonal elements
        mask = ~torch.eye(loss_matrix.size(0), dtype=bool, device=loss_matrix.device)
        loss_contrastive = torch.masked_select(loss_matrix, mask).mean()
        
        return loss_contrastive

## Define Model

In [None]:
base_model = GPT2Model.from_pretrained(model_name)
# resize model embedding to match new tokenizer
base_model.resize_token_embeddings(len(tokenizer))
# fix model padding token id
base_model.config.pad_token_id = base_model.config.eos_token_id
base_model.to(device)

# loss function
contrastive_loss = ContrastiveLoss().to(device)

# Optimizer
optimizer = AdamW(base_model.parameters(), lr=learning_rate)

## XG-Boost Classifier

In [None]:
class XGboost():

    def __init__(self):

        self.xgb_cl = xgb.XGBClassifier()
        self.preds = None
        self.acc_score = None

    def fit(self, data_loader, base_model, device):

        encoded_data = []
        labels_data = []
        fit_pbar = tqdm(data_loader, desc="Fit XG-Boost", leave=True)

        with torch.no_grad():
            for batch_fit in fit_pbar:
                inputs_fit, labels_fit = batch_fit
                inputs_fit.to(device)
                labels_fit = labels_fit.type(torch.FloatTensor)
                labels_fit = labels_fit.to(device)
                outputs_fit = base_model(**inputs_fit)
                last_hidden_states_fit = outputs_fit.last_hidden_state
                sequence_embedding_fit = last_hidden_states_fit[:,0].mean(dim=1)
                encoded_data.append(sequence_embedding_fit.clone().detach().cpu().numpy())
                labels_data.append(labels_fit.clone().detach().cpu().numpy())

        encoded_data = np.vstack(encoded_data).reshape((-1, base_model.config.hidden_size))
        labels_data = np.vstack(labels_data).reshape((-1,1)).astype('int8')

        self.xgb_cl.fit(encoded_data, labels_data)


    def predict(self, data_loader, base_model, device):

        encoded_data = []
        labels_data = []
        pred_pbar = tqdm(data_loader, desc="Predict XG-Boost", leave=True)

        with torch.no_grad():
            for batch_pred in pred_pbar:
                inputs_pred, labels_pred = batch_pred
                inputs_pred.to(device)
                labels_pred = labels_pred.type(torch.FloatTensor)
                labels_pred = labels_pred.to(device)
                outputs_pred = base_model(**inputs_pred)
                last_hidden_states_pred = outputs_pred.last_hidden_state
                sequence_embedding_pred = last_hidden_states_pred[:,0].mean(dim=1) 
                encoded_data.append(sequence_embedding_pred.clone().detach().cpu().numpy())
                labels_data.append(labels_pred.clone().detach().cpu().numpy())

        encoded_data = np.vstack(encoded_data).reshape((-1, base_model.config.hidden_size))
        labels_data = np.vstack(labels_data).reshape((-1,1)).astype('int8')

        self.preds = self.xgb_cl.predict(encoded_data)
        self.acc_score = accuracy_score(labels_data, self.preds)

        return self.preds
    
    def get_acc_score(self):

        return self.acc_score

In [None]:
# Fit & Predict XG-Boost
xgb_cl = XGboost()
xgb_cl.fit(train_data_loader, base_model, device)
xgb_cl.predict(val_data_loader, base_model, device)
acc_pretrained = xgb_cl.get_acc_score()
print(f'Pretraining Accuracy XG-Boost Classifier: {acc_pretrained}')

In [None]:
def plot_embedding(data_loader, model, device, title):

    encoded_data = []
    labels_data = []
    test_pbar = tqdm(data_loader, desc="Visualize Embedding", leave=True)

    with torch.no_grad():
        for batch_test in test_pbar:
            inputs_test, labels_test = batch_test
            inputs_test.to(device)
            labels_test = labels_test.type(torch.FloatTensor)
            labels_test = labels_test.to(device)
            outputs_test = model(**inputs_test)
            last_hidden_states_test = outputs_test.last_hidden_state
            sequence_embedding_test = last_hidden_states_test[:,0].mean(dim=1) 
            encoded_data.append(sequence_embedding_test.clone().detach().cpu().numpy())
            labels_data.append(labels_test.clone().detach().cpu().numpy())

    encoded_data = np.vstack(encoded_data).reshape((-1, model.config.hidden_size))
    labels_data = np.vstack(labels_data).reshape((-1,1)).astype('int8')

    tsne = TSNE(n_components=2, learning_rate='auto', init='random', perplexity=3)
    proj_data = tsne.fit_transform(encoded_data)

    lda_result_df = pd.DataFrame({'dim_1': proj_data[:,0], 'dim_2': proj_data[:,1], 'label': labels_data[:,0]})
    fig, ax = plt.subplots()
    s = ax.scatter(lda_result_df['dim_1'], lda_result_df['dim_2'], c=labels_data, s=120)
    ax.set_xlabel('dim_1')
    ax.set_ylabel('dim_2')
    ax.set_aspect('equal')
    ax.legend(s.legend_elements()[0],['0','1'], loc=2)
    ax.set_title(title)

    plt.show()

In [None]:
# plot base model embedding representation
plot_embedding(val_data_loader, base_model, device, 'Preprained Embedding Representation')

## Training Loop

In [None]:
# Training loop contrastive learning
for epoch in range(1, num_epochs+1):
    loss_train_epoch = []
    loss_val_epoch = []
    train_pbar = tqdm(train_data_loader, desc=f'Epoch: {epoch}/{num_epochs}', leave=True)
    for batch_train in train_pbar:
        inputs_train, labels_train = batch_train
        inputs_train.to(device)
        labels_train = labels_train.type(torch.FloatTensor)
        labels_train = labels_train.to(device)

        optimizer.zero_grad()
        # Forward pass
        outputs_train = base_model(**inputs_train)
        last_hidden_states_train = outputs_train.last_hidden_state
        sequence_embedding_train = last_hidden_states_train[:,0].mean(dim=1) 

        # Compute contrastive loss using representations
        contrastive_loss_value_train = contrastive_loss.forward(sequence_embedding_train, labels_train)/batch_size

        loss_train_epoch.append(contrastive_loss_value_train.item())
        train_pbar.set_postfix({'train_loss': np.mean(loss_train_epoch)})

        # Backward pass
        contrastive_loss_value_train.backward()
        optimizer.step()
    
    with torch.no_grad():
        for batch_val in val_data_loader:
            inputs_val, labels_val = batch_val
            inputs_val.to(device)
            labels_val = labels_val.type(torch.FloatTensor)
            labels_val = labels_val.to(device)

            # Forward pass
            outputs_val = base_model(**inputs_val)
            last_hidden_states_val = outputs_val.last_hidden_state
            sequence_embedding_val = last_hidden_states_val[:,0].mean(dim=1) 

            # Compute contrastive loss using representations
            contrastive_loss_value_val = contrastive_loss.forward(sequence_embedding_val, labels_val)/batch_size

            loss_val_epoch.append(contrastive_loss_value_val.item())

    print(f'LOSS train: {np.mean(loss_train_epoch)} valid: {np.mean(loss_val_epoch)}')

In [None]:
# Fit & Predict XG-Boost
xgb_cl = XGboost()
xgb_cl.fit(train_data_loader, base_model, device)
xgb_cl.predict(val_data_loader, base_model, device)
acc_finetuned = xgb_cl.get_acc_score()
print(f'Finetuned Accuracy XG-Boost Classifier: {acc_finetuned}')

In [None]:
plot_embedding(val_data_loader, base_model, device, 'Finetuned Embedding Representation')

In [None]:
print(f'Accuracy Pretraining: {acc_pretrained} Finetuned: {acc_finetuned}')