In [None]:
import torch.nn as nn
import torch
import numpy as np
import pandas as pd
from datasets import Dataset
from transformers import (
    AutoTokenizer,
    AutoConfig,
    AutoModel,
    Trainer,
    TrainingArguments,
)
from sklearn.metrics import accuracy_score, f1_score, matthews_corrcoef, classification_report, confusion_matrix
CHEMBERT_NAME = "seyonec/ChemBERTa-zinc-base-v1"

tokenizer = AutoTokenizer.from_pretrained(CHEMBERT_NAME)
config = AutoConfig.from_pretrained(CHEMBERT_NAME, output_hidden_states=True)
encoder = AutoModel.from_pretrained(CHEMBERT_NAME, config=config)

df_train = pd.read_csv('./NIHDataset/train_df_class.csv')
df_test = pd.read_csv('./NIHDataset/test_df_class.csv')
df_val = pd.read_csv('./NIHDataset/val_df_class.csv')

df_train = df_train.rename(columns={"LD50_class": "label"})
df_val   = df_val.rename(columns={"LD50_class": "label"})
df_test  = df_test.rename(columns={"LD50_class": "label"})

df_train = df_train[['smiles', 'label']].dropna()
df_val   = df_val[['smiles', 'label']].dropna()
df_test  = df_test[['smiles', 'label']].dropna()

train_ds = Dataset.from_pandas(df_train.reset_index(drop=True))
val_ds   = Dataset.from_pandas(df_val.reset_index(drop=True))
test_ds  = Dataset.from_pandas(df_test.reset_index(drop=True))

def tokenize(batch):
    return tokenizer(
        batch["smiles"],
        padding="max_length",
        truncation=True,
        max_length=160
    )

train_ds = train_ds.map(tokenize, batched=True)
val_ds   = val_ds.map(tokenize, batched=True)
test_ds  = test_ds.map(tokenize, batched=True)
train_ds = train_ds.rename_column("label", "labels")
val_ds   = val_ds.rename_column("label", "labels")
test_ds  = test_ds.rename_column("label", "labels")

cols = ["input_ids", "attention_mask", "labels"]
train_ds.set_format("torch", columns=cols)
val_ds.set_format("torch", columns=cols)
test_ds.set_format("torch", columns=cols)
def pool_hidden_states(outputs, attention_mask, mode="cls"):
    hidden_states = outputs.hidden_states

    if mode == "cls":
        return hidden_states[-1][:, 0]

    elif mode == "mean":
        last = hidden_states[-1]
        mask = attention_mask.unsqueeze(-1)
        return (last * mask).sum(1) / mask.sum(1)
class ClassificationHead(nn.Module):
    def __init__(self, hidden_dim, num_classes, dropout=0.1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, num_classes)
        )

    def forward(self, x):
        return self.net(x)
class ChemBERTClassifier(nn.Module):
    def __init__(self, encoder, num_classes, pooling="cls"):
        super().__init__()
        self.encoder = encoder
        self.pooling = pooling
        self.classifier = ClassificationHead(
            hidden_dim=encoder.config.hidden_size,
            num_classes=num_classes
        )

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.encoder(
            input_ids=input_ids,
            attention_mask=attention_mask
        )

        pooled = pool_hidden_states(outputs, attention_mask, self.pooling)
        logits = self.classifier(pooled)

        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss()(logits, labels.long())

        return {"loss": loss, "logits": logits}
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)

    return {
        "accuracy": accuracy_score(labels, preds),
        "f1_macro": f1_score(labels, preds, average="macro"),
        "mcc": matthews_corrcoef(labels, preds)
    }
training_args = TrainingArguments(
    output_dir="./chembert_class",
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=100,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="mcc",
    greater_is_better=True,
    fp16=torch.cuda.is_available(),
    report_to="none"
)
num_classes = len(df_train["label"].unique())

model = ChemBERTClassifier(
    encoder=encoder,
    num_classes=num_classes,
    pooling="cls"
)
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)
trainer.train()

outputs = trainer.predict(test_ds)

logits = outputs.predictions
labels = outputs.label_ids

preds = np.argmax(logits, axis=1)

print("\n===== Test Results =====")
print("Accuracy:", accuracy_score(labels, preds))
print("MCC:", matthews_corrcoef(labels, preds))
print("F1:", f1_score(labels, preds, average="macro"))

print("\nConfusion Matrix")
print(confusion_matrix(labels, preds))

print("\nClassification Report")
print(classification_report(labels, preds))
