In [None]:
!pip install -q -U peft --no-index --find-links ../input/llm-detect-pip/
!pip install -q -U accelerate --no-index --find-links ../input/llm-detect-pip/
!pip install -q -U bitsandbytes --no-index --find-links ../input/llm-detect-pip/
!pip install -q -U transformers --no-index --find-links ../input/llm-detect-pip/

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import pandas as pd 
import numpy as np
from transformers import DataCollatorWithPadding
from transformers import AutoTokenizer, AutoModel
from tqdm import tqdm
import pandas as pd
import numpy as np
import pickle
from transformers import BitsAndBytesConfig
import peft

In [None]:
class ClassHead(nn.Module):
    def __init__(self, input_dim, num_outputs):
        super().__init__()
        self.linear = nn.Linear(input_dim, num_outputs)

    def forward(self, x):
        return self.linear(x)
    
class Pooling(nn.Module):
    def __init__(self, device):
        super().__init__()
        self.device = device
        
    def forward(self, x, attention_mask):
        mean_pooled_batch = torch.zeros(x.shape[0], x.shape[2], device=self.device)
        for i in range(attention_mask.shape[0]):
            masked_embeddings = x[i] * attention_mask[i, :, None]
            mean_pooled_batch[i] = torch.sum(masked_embeddings, axis=0) / torch.sum(attention_mask[i])
        return mean_pooled_batch
    
class ClassificationModel(nn.Module):
    def __init__(self, backbone, device, num_outputs, quantization_config=None, lora_config=None):
        super().__init__()
        self.backbone = None
        if quantization_config and lora_config:
            if type(backbone) == str:
                model = AutoModel.from_pretrained(backbone, quantization_config=quantization_config)
            else: 
                model = backbone
            self.backbone = get_peft_model(model, lora_config)           
        else:
            if type(backbone) == str:
                self.backbone = AutoModel.from_pretrained(backbone)
            else:
                self.backbone = backbone
        self.num_outputs = num_outputs
        self.hidden_dim = self.backbone.config.hidden_size
        self.device = device
        self.pooling = Pooling(self.device)
        self.head = ClassHead(self.hidden_dim, num_outputs)
    
    def forward(self, x, apply_softmax=False):
        attention_mask = x['attention_mask']
        x = self.backbone(**x)['last_hidden_state']
        x = self.pooling(x, attention_mask)
        x = self.head(x)
        if self.num_outputs == 1:
            x = x.view(-1)
        if apply_softmax:
            x = nn.functional.softmax(x, dim=1)
        return x 
    
class CustomDataset(Dataset):
    def __init__(self, backbone, path, max_length, text_col, sep=',', id_col=None, target_col=None, 
                 cat_col=None, sample=None, tokenizer=None) -> None:
        super().__init__()
        df = pd.read_csv(path, sep=sep)
        if sample:
            df = df.sample(sample)
        self.tokenizer = None
        if tokenizer:
            self.tokenizer = AutoTokenizer.from_pretrained(tokenizer)
        if not tokenizer:
            self.tokenizer = AutoTokenizer.from_pretrained(backbone)
        self.tokenizer.pad_token = self.tokenizer.eos_token
        self.collator = DataCollatorWithPadding(self.tokenizer, padding=True)
        self.tokenizer.deprecation_warnings["Asking-to-pad-a-fast-tokenizer"] = True
        self.ids = df[id_col].values.tolist() if id_col in df.columns else None
        self.texts = None
        if text_col in df.columns:
            if cat_col in df.columns:
                df[text_col] = df[cat_col] + ' [SEP] ' + df[text_col]
            self.texts = df[text_col].values.tolist()
        self.labels = df[target_col].values.tolist() if target_col in df.columns else None
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, index):
        r = self.tokenizer(
            text=self.texts[index],
            max_length=self.max_length,
            truncation=True,
            return_tensors='pt'
        )
        r = {k : v[0] for k, v in r.items()}
        if self.labels:
            r['labels'] = self.labels[index]
        return r

In [None]:
def class_inference_loop(model, dataloader, device, **predict_kwargs):
    model.to(device)
    predictions = []
    labels = []
    
    model.eval()
    with torch.no_grad(), torch.autocast(device_type=device):
        for batch_dict in tqdm(dataloader):
            y = None
            if 'labels' in batch_dict:
                y = batch_dict["labels"].float().to(device)
                del batch_dict['labels']
            x = batch_dict
            x = {k:v.to(device) for k, v in x.items()}
            
            logits = model(x, **predict_kwargs)
            probs = nn.functional.sigmoid(logits)
            predictions.extend(probs.cpu().tolist())
            if y is not None:
                labels.extend(y.cpu().tolist())  
    return predictions, labels

In [None]:
'''
backbone = AutoModel.from_pretrained('/kaggle/input/detect-ai-llm-train/encoder_model')
model = ClassificationModel(backbone, 'cuda', 1, None, None)
model.head.load_state_dict(torch.load('/kaggle/input/detect-ai-llm-train/encoder_head.pth'))

model.eval()

test_ds = CustomDataset('', '/kaggle/input/llm-detect-ai-generated-text/test_essays.csv', 512, 
                        'text', ',', 'id', None, None, sample=None, tokenizer='/kaggle/input/detect-ai-llm-train/encoder_tokenizer')
test_dl = DataLoader(test_ds, batch_size=1, shuffle=False, collate_fn=test_ds.collator)
pred, _ =  class_inference_loop(model, test_dl, 'cuda')
df = pd.DataFrame({'id': test_ds.ids, 'generated': pred})
df.to_csv('submission.csv', sep=',', index=False)
'''

quantization_config = BitsAndBytesConfig(
   load_in_4bit=True,
   bnb_4bit_quant_type="nf4",
   bnb_4bit_use_double_quant=True,
   bnb_4bit_compute_dtype=torch.float16,
)

mistral = AutoModel.from_pretrained("/kaggle/input/mistral-7b-v0-1/Mistral-7B-v0.1", quantization_config=quantization_config)
backbone = peft.PeftModel.from_pretrained(
    model=mistral,
    model_id='/kaggle/input/detect-ai-llm-train/decoder_model',
    is_trainable=False
)
model = ClassificationModel(backbone, 'cuda', 1, None, None)
model.head.load_state_dict(torch.load('/kaggle/input/detect-ai-llm-train/decoder_head.pth'))

model.eval()

test_ds = CustomDataset('', '/kaggle/input/llm-detect-ai-generated-text/test_essays.csv', 64, 
                        'text', ',', 'id', None, None, sample=None, tokenizer='/kaggle/input/detect-ai-llm-train/decoder_tokenizer')
test_dl = DataLoader(test_ds, batch_size=1, shuffle=False, collate_fn=test_ds.collator)
pred, _ =  class_inference_loop(model, test_dl, 'cuda')
df = pd.DataFrame({'id': test_ds.ids, 'generated': pred})
df.to_csv('submission.csv', sep=',', index=False)