### $\textbf{Installing and importing required packages}$

In [None]:
'''
installing compatible versions of required libraries.
We pin transformers to <5.1.0 because as of 2026.01,
MoLFormer (ibm/MoLFormer-XL-both-10pct) relies on
internal modules such as transformers.onnx and remote
configuration code that are not fully compatible with
Transformers >=5.1.0. installing torch and onnx ensures
model loading and ONNX-related utilities work correctly.
Runtime version should be 2026.01 if running on Colab.
'''


%pip install -q \
    "transformers<5.1.0" \
    datasets \
    evaluate \
    wandb \
    peft \
    accelerate \
    umap-learn \
    onnx \
    rdkit

In [None]:
from rdkit.Chem import Draw
import pandas as pd

#-------------------------------------------------------------------------#

import torch
import torch.nn as nn
from scipy.special import softmax
import evaluate
from transformers.activations import ACT2FN
from transformers import AutoModel, AutoConfig
from transformers.modeling_outputs import SequenceClassifierOutput
#from peft import LoraConfig, get_peft_model

#-------------------------------------------------------------------------#

from torch.utils.data import Dataset, DataLoader
from rdkit import Chem
from rdkit.Chem import AllChem

#-------------------------------------------------------------------------#

from google.colab import files, drive


#-------------------------------------------------------------------------#

from transformers import TrainingArguments, Trainer, AutoTokenizer

#-------------------------------------------------------------------------#
from datasets import load_dataset
#-------------------------------------------------------------------------#
from torch.nn.functional import cosine_similarity
import matplotlib.pyplot as plt
#-------------------------------------------------------------------------#
import numpy as np
from sklearn.metrics import (
    roc_auc_score,
    average_precision_score,
    precision_score,
    recall_score,
    f1_score,
    mean_squared_error,
    mean_absolute_error,
    r2_score,
)
#-------------------------------------------------------------------------#
import shutil
#-------------------------------------------------------------------------#
import umap
#-------------------------------------------------------------------------#
import wandb
import os

In [None]:
# requires an existing account in wandb. We used a designated API key
# for privacy measures, we did not disclose our API key

wandb.login(key="API-KEY")

### $\textbf{Validating RDKit installation}$

In [None]:
# Two different ways to write the SMILES for Aspirin
smiles_1 = "CC(=O)OC1=CC=CC=C1C(=O)O"
smiles_2 = "O=C(O)c1ccccc1OC(=O)C"

# converting to RDKit Molecule objects
mol1 = Chem.MolFromSmiles(smiles_1)
mol2 = Chem.MolFromSmiles(smiles_2)

# getting the Canonical SMILES
can_smiles_1 = Chem.MolToSmiles(mol1)
can_smiles_2 = Chem.MolToSmiles(mol2)

pd.set_option('display.colheader_justify', 'center')
df = pd.DataFrame(columns=['Smiles', 'Canonical Smiles'])
df.loc[len(df)] = [smiles_1, can_smiles_1]
df.loc[len(df)] = [smiles_2, can_smiles_2]
print(df)
print()

# visualizing them
Draw.MolsToGridImage([mol1, mol2], legends=['Version A', 'Version B'])

### $\textbf{Setting up required classes for extension approaches}$

In [None]:
# The Classification Head Class in Molformer's repository. Will be used in a custom class.
# the attribute "num_labels" was added for execution
# https://huggingface.co/ibm-research/MoLFormer-XL-both-10pct/blob/main/modeling_molformer.py

class MolformerClassificationHead(nn.Module):
    def __init__(self, config, num_labels):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dense2 = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(
            config.classifier_dropout_prob
            if config.classifier_dropout_prob is not None
            else config.hidden_dropout_prob
        )
        self.out_proj = nn.Linear(config.hidden_size, num_labels)
        self.classifier_act_fn = ACT2FN[config.hidden_act]
        self.skip_connection = config.classifier_skip_connection

    def forward(self, pooled_output):
        hidden_state = self.dense(pooled_output)
        hidden_state = self.dropout(hidden_state)
        hidden_state = self.classifier_act_fn(hidden_state)
        if self.skip_connection:
            hidden_state = residual = hidden_state + pooled_output
        hidden_state = self.dense2(hidden_state)
        hidden_state = self.dropout(hidden_state)
        hidden_state = self.classifier_act_fn(hidden_state)
        if self.skip_connection:
            hidden_state = hidden_state + residual
        return self.out_proj(hidden_state)

In [None]:
class SMILESDataset(Dataset):

    '''
    This is a custom PyTorch Dataset designed to handle
    molecular data represented by SMILES (Simplified
    Molecular Input Line Entry System) strings. It is
    specifically tailored for tasks involving sequence
    classification or regression using models like MoLFormer.

    Upon initialization, the dataset canonicalizes all
    SMILES strings using RDKit. This ensures that each
    molecule has a unique and standardized representation,
    which is crucial for consistent model training. It
    handles invalid or unparsable SMILES strings by
    filtering them out, preventing errors during downstream
    processing.
    '''

    def __init__(self, df, tokenizer, label_column="label", task_type="cls"):
        processed_data = []

        # processing only valid and acceptable smiles sequences,
        # otherwise they produce NaN values in predictions
        for _, row in df.iterrows():
            smiles = self.canonicalize(row['smiles'])
            if smiles is not None:
                processed_data.append({
                    "smiles": smiles,
                    "label": row[label_column]
                  })

        self.smiles_list = [item['smiles'] for item in processed_data]
        self.labels = [item['label'] for item in processed_data]
        self.tokenizer = tokenizer
        self.max_length = 202 # fixed max length that matches the MoLFormer model
        self.task_type = task_type # Store task_type

    def __len__(self):
        return len(self.smiles_list)

    # https://github.com/IBM/molformer/blob/main/notebooks/pretrained_molformer/frozen_embeddings_classification.ipynb
    def canonicalize(self, s):
        mol = Chem.MolFromSmiles(s) # getting the molecule from smiles sequence
        if mol is not None:
            return Chem.MolToSmiles(mol, canonical=True, isomericSmiles=True) # after canonicalization - returning to smiles sequence
        return None # Returning None if RDKit fails to parse the SMILES


    def get_num_classes(self):
      # For regression, num_labels is 1. For classification, it's the number of unique labels.
      return len(set(self.labels)) if self.task_type == "cls" else 1


    def get_length_stats(self, mode="effective"):
        '''
         mode:
         'true': tokenized length without truncation/padding
         'effective': non-padding tokens after truncation
        '''

        if not self.smiles_list:
            return 0.0, 0, 0

        lengths = []

        if mode == "true":
            for smiles in self.smiles_list:
                lengths.append(
                    len(self.tokenizer(smiles, padding=False, truncation=False)["input_ids"])
                )

        elif mode == "effective":
            for i in range(len(self.smiles_list)):
              item = self.__getitem__(i)
              attention_mask = item['attention_mask']
              lengths.append(
                    attention_mask.sum().item()
                )

        else:
            raise ValueError(f"Unknown mode: {mode}")

        avg = sum(lengths) / len(lengths)
        max_length = max(lengths)
        min_length = min(lengths)

        return avg, max_length, min_length

    def __getitem__(self, idx):
        # This converts a SMILES string into a list of numbers (tokens)
        smiles = self.smiles_list[idx]
        label = self.labels[idx]

        inputs = self.tokenizer(smiles,
                                  truncation=True,
                                  padding="max_length",
                                  max_length=self.max_length,
                                  return_tensors="pt")

        input_ids = inputs['input_ids'].squeeze()
        attention_mask = inputs['attention_mask'].squeeze()

        # for best practices, we ensure required data types
        if self.task_type == "reg":
            labels = torch.tensor(label, dtype=torch.float)
        else:
            labels = torch.tensor(label, dtype=torch.long)

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'labels': labels # the column name as required by the hugging face API
        }

In [None]:
class CustomModelForSequenceClassification(nn.Module):

    """
    This class is a custom PyTorch nn.Module designed
    to adapt the pre-trained MoLFormer encoder
    for sequence classification or regression tasks.
    """
    def __init__(self, model_name, num_labels):
        super().__init__()
        self.encoder = AutoModel.from_pretrained(model_name, trust_remote_code=True)
        # Pass num_labels to the classification head
        self.num_labels = num_labels
        self.classifier = MolformerClassificationHead(self.encoder.config, num_labels)


        if self.num_labels == 1:
            self.loss_fn = nn.MSELoss()
        elif self.num_labels == 2:
            self.loss_fn = nn.CrossEntropyLoss()
        else: # for multi-label classification, adapted from the MoLFormer's code on HF.
            self.loss_fn = nn.BCEWithLogitsLoss()
            # we did not conduct experiments on multi-label classification, this is never used
            # possible future direction - to extend to multi-label classification

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.encoder(
            input_ids=input_ids,
            attention_mask=attention_mask)

        # take first token in the sequence for all items in the batch
        cls_embeddings = outputs.last_hidden_state[:, 0, :]

        logits = self.classifier(cls_embeddings)

        loss = None
        if labels is not None:
            if self.num_labels == 1: # reshaping labels for regression to (batch_size, 1)
                labels = labels.view(-1, 1).float()
            if self.num_labels > 2: # for multi-label classification
                labels = labels.float()
            loss = self.loss_fn(logits, labels)

        return {"loss": loss, "logits": logits}

In [None]:
"""
------------------------------------------------------------------
Preliminary PEFT Experiment: LoRA Fine-Tuning on MoLFormer

As an exploratory extension of our work, we implemented a parameter-
efficient fine-tuning (PEFT) approach using LoRA (Low-Rank Adaptation)
on top of the MoLFormer backbone.

In this setup, LoRA adapters are injected into the transformer attention
layers (query and value projections), while a custom classification head
is trained on top of the CLS token representation.

This experiment is still preliminary and was not fully optimized or
systematically evaluated. However, we include it here as a potential
direction for future research on efficient molecular representation
fine-tuning using MoLFormer
------------------------------------------------------------------


class LoraMoLFormer(nn.Module):
    def __init__(self, model_name, num_labels, lora_r=8, lora_alpha=16):
        super().__init__()

        # Loading the base (foundation) Model
        self.base_model = AutoModel.from_pretrained(model_name, trust_remote_code=True)
        self.base_model.config.num_labels = num_labels

        # Configuring LoRA, either with input or default values
        peft_config = LoraConfig(
            task_type=None, # We are using a custom head, so we manage the task manually
            r=lora_r,
            lora_alpha=lora_alpha,
            target_modules=["query", "value"], # Standard for transformer LoRA
            lora_dropout=0.1,
            bias="none"
        )

        self.peft_model = get_peft_model(self.base_model, peft_config)


        self.custom_head = MolformerClassificationHead(self.base_model.config, num_labels)

        self.loss_fn = nn.CrossEntropyLoss()

    def forward(self, input_ids, attention_mask=None, labels=None):
        # Pass inputs to the Base Model
        outputs = self.peft_model(input_ids=input_ids, attention_mask=attention_mask)

        # Extract the CLS token representation (the "summary" of the molecule)
        # MoLFormer uses the first token [0] for this.
        molecule_embedding = outputs.last_hidden_state[:, 0, :]

        # Pass the embedding through your custom head
        logits = self.custom_head(molecule_embedding)

        # Calculate loss manually so the Trainer doesn't crash
        loss = None
        if labels is not None:
            loss = self.loss_fn(logits, labels)

        # Return everything in a format the Hugging Face Trainer understands
        return SequenceClassifierOutput(
            loss=loss,
            logits=logits
        )

"""

### $\textbf{Loading and creating the datasets}$

In [None]:
from google.colab import drive

# if files are loaded from google drive, use this and follow the instructions that pop up

drive.mount('/content/drive')

In [None]:
# https://moleculenet.org/datasets-1

def load_datasets(task_type, model_name=None, tokenizer=None, datasets_folder=None):

    '''
    Loads datasets from a specified folder, processes them
    using a tokenizer, and returns a dictionary of
    SMILESDataset instances.
    '''

    if datasets_folder is None:
        datasets_folder = f"/content/drive/MyDrive/deep-learning-final-project/{task_type}"

    # this code is intended for the MoLFormer model, but for modularity,
    # we allow passing a custom tokenizer and model ID

    # if no tokenizer is provided, load the default one based on the model ID
    if tokenizer is None:
        if model_name is None:
            model_name = "ibm/MoLFormer-XL-both-10pct"

        tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)

    dataframes = {}
    datasets = {}

    # looping through all CSV files in the folder
    for filename in os.listdir(datasets_folder):
        if filename.endswith(".csv"):
            name = filename.replace(".csv", "")
            file_path = os.path.join(datasets_folder, filename)

            print(f"Loaded: {name}")

            try:
                df = pd.read_csv(file_path)
            except UnicodeDecodeError:
                df = pd.read_csv(file_path, encoding='latin1') # a fallback encoding if the file can't be read (only used for the bbbp files)

            dataframes[name] = df
            datasets[name] = SMILESDataset(df, tokenizer, task_type=task_type)

    return datasets


In [None]:

'''
#------if files are loaded locally on to google colab and not from google drive, use the following code.------#

# https://moleculenet.org/datasets-1

def load_datasets(task_type, model_name=None, tokenizer=None):

  uploaded = files.upload()

  dataframes = {}
  datasets = {}

     # if no tokenizer is provided, load the default one based on the model ID
  if tokenizer is None:
      if model_name is None:
          model_name = "ibm/MoLFormer-XL-both-10pct"

      tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)

  for filename in uploaded.keys():
      name = filename.replace(".csv", "")  # e.g. bace_train
      print(f"Loaded: {name}")

      # Try reading with 'latin1' encoding to resolve UnicodeDecodeError
      try:
          df = pd.read_csv(filename)
      except UnicodeDecodeError:
          df = pd.read_csv(filename, encoding='latin1') # a fallback encoding if the file can't be read (only used for the bbbp files)

      dataframes[name] = df
      datasets[name] = SMILESDataset(df, tokenizer, task_type=task_type)

  return datasets
'''

In [None]:
# loading datasets for classification tasks

cls_datasets = load_datasets("cls")
cls_datasets

In [None]:
# CLASSIFICATION TASKS DATASETS

bace_train = cls_datasets['bace_train']
bace_test = cls_datasets['bace_test']
bace_eval = cls_datasets['bace_valid']

bbbp_train = cls_datasets['bbbp_train']
bbbp_test = cls_datasets['bbbp_test']
bbbp_eval = cls_datasets['bbbp_valid']

clintox_train = cls_datasets['clintox_train']
clintox_test = cls_datasets['clintox_test']
clintox_eval = cls_datasets['clintox_valid']

'''
we wanted to use the hiv dataset, however it was signficantly
larger than the rest of the datasets (~33K samples) Therefore,
training took longer and we wanted to keep the training datasets
as balanced as we can in terms of size.

# hiv_train = datasets['hiv_train']
# hiv_test = datasets['hiv_test']
# hiv_eval = datasets['hiv_valid']
'''


cls_datasets = {
    "BBBP": {"train": bbbp_train,
             "eval": bbbp_eval,
             "test": bbbp_test},

    "BACE": {"train": bace_train,
             "eval": bace_eval,
             "test": bace_test},

    "CLINTOX": {"train": clintox_train,
                "eval": clintox_eval,
                "test": clintox_test},
}

In [None]:
# loading datasets for regression tasks

reg_datasets = load_datasets("reg")
reg_datasets

In [None]:
# REGRESSION TASKS DATASETS

lipo_train = reg_datasets['lipo_train']
lipo_test = reg_datasets['lipo_test']
lipo_eval = reg_datasets['lipo_valid']

esol_train = reg_datasets['esol_train']
esol_test = reg_datasets['esol_test']
esol_eval = reg_datasets['esol_valid']

freesolv_train = reg_datasets['freesolv_train']
freesolv_test = reg_datasets['freesolv_test']
freesolv_eval = reg_datasets['freesolv_valid']


reg_datasets = {
    "LIPO": {"train": lipo_train,
             "eval": lipo_eval,
             "test": lipo_test},

    "ESOL": {"train": esol_train,
             "eval": esol_eval,
             "test": esol_test},

    "FreeSolv": {"train": freesolv_train,
                 "eval": freesolv_eval,
                 "test": freesolv_test},

}


### $\textbf{Motivation through toy example: Visualization of Pooled Embedding vs. First Token Embedding ([CLS], <bos>)}$

In [None]:
# loading a common dataset for Chemistry property prediction task (ESol)
dataset = load_dataset("gayane/esol")

In [None]:
toy_df = pd.DataFrame(dataset['train']) # extracting the training molecular sequences

'''
SMILES (Simplified Molecular Input Line Entry System)
is a way to represent chemical structures using a single
line of text. The problem is that a single molecule can
often be written in many different "correct" ways.
Canonicalization means to pick exactly one unique version
of that string to be the "standard" (or "canonical") name.
Once a SMILES string is canonicalized, it acts like a
"fingerprint" or a unique ID for that specific molecule.
'''

toy_df['smiles'] = toy_df['smiles'].apply(lambda x: Chem.MolToSmiles(Chem.MolFromSmiles(x), canonical=True, isomericSmiles=True))
toy_df

In [None]:
# loading the Molformer foundation model
model_name = "ibm-research/MoLFormer-XL-both-10pct"

model = AutoModel.from_pretrained(model_name, deterministic_eval=True, trust_remote_code=True)
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)

smiles_list = toy_df['smiles'].tolist()

# tokenizing the sequences
inputs = tokenizer(smiles_list, padding=True, return_tensors="pt")

In [None]:
# getting inference *token-level* contextual embeddings for toy dataset sequences
with torch.no_grad():
    out = model(**inputs)

In [None]:
# calculating cosine similarity between the pooled output
# and the first token embedding in the sequence

# Calculating all cosine similarities and storing them
cosine_similarities = []
pooled_embs = []
first_token_embs = []

for i in range(len(toy_df)):
    pooled_embedding = out.pooler_output[i].unsqueeze(0) # the pooled embedding
    first_token_embedding = out.last_hidden_state[i][0].unsqueeze(0) # cls token embedding
    pooled_embs.append(pooled_embedding)
    first_token_embs.append(first_token_embedding)

    similarity = torch.nn.functional.cosine_similarity(pooled_embedding, first_token_embedding)
    cosine_similarities.append(similarity.item())

# converting to a PyTorch tensor for easier statistical operations
cosine_similarities_tensor = torch.tensor(cosine_similarities)

# printing descriptive statistics
print(f"\nDescriptive Statistics for Cosine Similarities:")
print(f"Average Cosine Similarity: {torch.mean(cosine_similarities_tensor):.4f}")
print(f"Minimum Cosine Similarity: {torch.min(cosine_similarities_tensor):.4f}")
print(f"Maximum Cosine Similarity: {torch.max(cosine_similarities_tensor):.4f}")
print(f"Standard Deviation: {torch.std(cosine_similarities_tensor):.4f}")
print()

# plotting a histogram of the cosine similarities
plt.figure(figsize=(10, 6))
plt.hist(cosine_similarities, bins=20, edgecolor='black', alpha=0.7)
plt.title('Distribution of Cosine Similarities Between Pooled Output and First Token Embedding')
plt.xlabel('Cosine Similarity')
plt.ylabel('Frequency')
plt.grid(axis='y', alpha=0.75)
plt.show()

In [None]:
# performing UMAP to compare embeddings of the first token and the pooled one
# code inspired by the practical notebooks in the Deep Learning course

pooled_embs_tensor = torch.cat(pooled_embs, dim=0) # tensor for pooled embeddings
first_token_embs_tensor = torch.cat(first_token_embs, dim=0) # tensor for cls token embeddings
combined_embeddings = torch.cat((pooled_embs_tensor, first_token_embs_tensor), dim=0) # combined tensor

In [None]:

pooled_embs_np = pooled_embs_tensor.cpu().numpy()
first_token_embs_np = first_token_embs_tensor.cpu().numpy()
combined_embeddings_np = combined_embeddings.cpu().numpy()

# initializing UMAP reducer
umap_reducer_2d = umap.UMAP(n_components=2,
                            min_dist=0.0,
                            metric='cosine',
                            random_state=42)

# fitting and transforming the data
umap_transformed_embeddings_2d = umap_reducer_2d.fit_transform(combined_embeddings_np)

print(f"Shape of umap_transformed_embeddings: {umap_transformed_embeddings_2d.shape}")

In [None]:
# separating the transformed embeddings back into pooled and first_token components for plotting
half_size = len(toy_df)
umap_pooled_output = umap_transformed_embeddings_2d[:half_size]
umap_first_token_embeddings = umap_transformed_embeddings_2d[half_size:]

plt.figure(figsize=(8, 8))
plt.scatter(umap_pooled_output[:, 0], umap_pooled_output[:, 1], label='Pooled Output', alpha=0.7, s=10, color="maroon")
plt.scatter(umap_first_token_embeddings[:, 0], umap_first_token_embeddings[:, 1], label='First Token Embeddings', alpha=0.7, s=10)
plt.title('UMAP of Pooled Output vs. First Token Embedding')
plt.xlabel('UMAP Component 1')
plt.ylabel('UMAP Component 2')
plt.legend()
plt.grid(True)
plt.show()

### $\textbf{Additional auxiliary functions and classes (that rely on the datasets)}$

In [None]:
# source: https://huggingface.co/docs/transformers/v5.0.0rc2/en/main_classes/trainer#transformers.TrainingArguments
# source: https://www.manning.com/books/domain-specific-small-language-models

def get_training_args(
    output_dir,
    task_type,  # "reg" or "cls"
    best_metric,
    config=None
):
    '''
    Creates HuggingFace TrainingArguments with defaults
    with the option to override specific hyperparameters
    using a config dictionary.
    - output_dir: the directory where the model checkpoints
    and logs will be saved during training
    - task_type: "reg" or "cls"
    - best_metric: the metric to monitor for saving the best model
    - config: a dictionary of hyperparameters to use for training
    '''

    # if no config is provided, we use an empty dictionary
    if config is None:
        config = {}

    # using default config if values are not provided
    train_batch_size = config.get("train_batch_size", 64)
    eval_batch_size = config.get("eval_batch_size", 64)
    num_train_epochs = config.get("num_train_epochs", 10)
    learning_rate = config.get("learning_rate", 1e-5)  # MolFormer used 3e-5
    weight_decay = config.get("weight_decay", 0.01)
    warmup_ratio = config.get("warmup_ratio", 0.1)
    max_grad_norm = config.get("max_grad_norm", 1.0)
    seed = config.get("seed", 42)

    # Regression - lower is better, Classification - higher is better
    greater_is_better = task_type == "cls"

    return TrainingArguments(

        # the directory where the model checkpoints
        # and logs will be saved during training
        output_dir=output_dir,
        overwrite_output_dir=True,

        # evaluation and saving strategy
        # we evaluate and save the model
        # at the end of each epoch
        eval_strategy="epoch",
        save_strategy="epoch",

        load_best_model_at_end=True,
        remove_unused_columns=False, # to avoid issues with the custom dataset and model

        # the specific metric to monitor for saving the best model
        metric_for_best_model=best_metric,
        greater_is_better=greater_is_better,

        # hyperparameters that can be overridden by the config dictionary
        per_device_train_batch_size=train_batch_size,
        per_device_eval_batch_size=eval_batch_size,
        num_train_epochs=num_train_epochs,
        learning_rate=learning_rate,
        weight_decay=weight_decay,
        warmup_ratio=warmup_ratio,
        max_grad_norm=max_grad_norm,

        # logging strategy to log training progress every epoch
        logging_strategy="epoch",
        # logging_steps - if logging steps are desired

        report_to="wandb", # to enable logging to Weights & Biases
        dataloader_num_workers=0,

        # precision settings for training and evaluation
        fp16=False,
        fp16_full_eval=False,

        save_safetensors=False, # to avoid issues with the custom model and Trainer
        seed=seed
    )


In [None]:
def compute_metrics(pred):

    '''
    this function computes the metrics for regression
    and classification tasks and returns a dictionary
    of metrics for evaluation.
    - pred: the predictions from the model
    '''
    labels = pred.label_ids
    preds = pred.predictions

    # metrics for REGRESSION
    if preds.ndim == 1 or preds.shape[-1] == 1:
        predictions = preds.reshape(-1)

        # Safety check
        if np.isnan(predictions).any() or np.isinf(predictions).any():
            print("WARNING: NaN or Inf detected in regression predictions!")
            predictions = np.nan_to_num(predictions)

        mse = mean_squared_error(labels, predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(labels, predictions)
        r2 = r2_score(labels, predictions)

        return {
            "rmse": rmse,
            "mse": mse,
            "mae": mae,
            "r2": r2,
        }

    # metrics for BINARY CLASSIFICATION
    elif preds.shape[-1] == 2:
        logits = preds

        # Safety check for logits before applying softmax
        if np.isnan(logits).any() or np.isinf(logits).any():
            print("WARNING: NaN or Inf detected in logits!")
            logits = np.nan_to_num(logits)

        probs = softmax(logits, axis=-1)[:, 1]
        preds_cls = (probs >= 0.5).astype(int)

        # Safety check for probabilities after softmax
        if np.isnan(probs).any() or np.isinf(probs).any():
            print("WARNING: NaN or Inf detected in probabilities!")
            probs = np.nan_to_num(probs)

        return {
            "roc_auc": roc_auc_score(labels, probs),
            "pr_auc": average_precision_score(labels, probs),
            "precision": precision_score(labels, preds_cls, zero_division=0),
            "recall": recall_score(labels, preds_cls, zero_division=0),
            "f1": f1_score(labels, preds_cls, zero_division=0),
            "pos_rate": preds_cls.mean(),
        }

    else:
        raise ValueError(f"Unsupported prediction shape: {preds.shape}")

### $\textbf{Hyperparameter functions and code}$

In [None]:
def _hp_search_core(config, task_name, task_datasets, model_name=None):
    '''
    This function is the core training loop for a single
    hyperparameter sweep run. It will be called by a
    wrapper function that handles wandb.init.
    Logged with Weights&Biases

    config: a dictionary of hyperparameters to use for training
    task_name: the name of the task to train on (BBBP, BACE, etc.)
    task_datasets: a dictionary containing the train, eval, and test datasets
    model_name: the name of the model to use for training. If None,
                the default model will be used. (MoLFormer)
    '''

    train_dataset = task_datasets["train"]
    eval_dataset = task_datasets["eval"]
    task_type = train_dataset.task_type
    best_metric = "roc_auc" if task_type == "cls" else "rmse"

    num_labels = train_dataset.get_num_classes()

    # we repeat this as a safety net to make sure the model is specified
    if model_name is None:
        model_name = "ibm/MoLFormer-XL-both-10pct"

    model = CustomModelForSequenceClassification(model_name, num_labels)

    # setting training arguments
    training_args = get_training_args(
        output_dir=f"/content/molformer-hp-search/{task_name}",
        task_type=task_type,
        best_metric=best_metric,
        config=config
    )

    # initializing trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        compute_metrics=compute_metrics
    )

    trainer.train()


In [None]:
def run_hp_search(tasks_to_run, sweep_config, num_trials=10, model_name=None):

    '''
    this function runs hyperparameter search for a list of tasks.

    - tasks_to_run: a list of task names to run hyperparameter search for
    - sweep_config: a dictionary of hyperparameters to use for training
    - model_name: the name of the model to use for training (default: MoLFormer)
    - num_trials: the number of trials to run for each task (default: 10)
    '''

    if model_name is None:
        model_name = "ibm/MoLFormer-XL-both-10pct"

    for task_name, task_datasets in tasks_to_run.items():

        project_name = f"molformer-hp-search-{task_name}"
        sweep_id = wandb.sweep(sweep_config, project=project_name)

        '''
        the wrapper function below facilitates the interaction
        between wandb.agent and the custom training logic,
        ensuring each trial runs with its unique configuration.
        '''
        def _task_sweep_wrapper():

            # initializes a new W&B run
            # corresponds to a new combination of hp
            with wandb.init() as run:

                # retrieves the specific hyperparameter
                # values chosen by the W&B sweep for this particular run.
                config = run.config

                # calling the actual search logic
                _hp_search_core(
                    config=config,
                    task_name=task_name,
                    task_datasets=task_datasets,
                    model_name=model_name
                )


        '''
        the agent launches 'count' individual runs (trials)
        per task. Each run will use a different set
        of hyperparameters (sampled according to sweep_config)
        and execute the _task_sweep_wrapper function.
        '''

        print(f"Starting sweep for task: {task_name}")
        wandb.agent(sweep_id, function=_task_sweep_wrapper, count=num_trials)
        print(f"Sweep for task {task_name} completed.")
        print("-" * 50)
        print()

In [None]:
# adapted from https://wandb.ai/matt24/vit-snacks-sweeps/reports/Hyperparameter-Search-for-HuggingFace-Transformer-Models--VmlldzoyMTUxNTg0

'''
Weights & Biases Sweeps requires a configuration
file to define the hyperparameters to explore,
their range of values, the search strategy, etc.
'''
# randomly searching the different values of hyperparameters
sweep_config = {
    'method': 'random'
}

'''
Note, MoLFormer's original hyperparameters were:
    - batch_size 128
    - d_dropout 0.1
    - dropout 0.1
    - lr_start 3e-5
    - num_workers 8
    - max_epochs 500

    for more information, see the original repository:
    https://github.com/IBM/molformer/tree/main/finetune
'''
# hyperparameters
parameters_dict = {
    'num_train_epochs': {
        'value': 1
    },

    'train_batch_size': {
        'values': [8, 16, 32, 64]
    },

    # the learning rate will be sampled log-uniformly
    'learning_rate': {
        'distribution': 'log_uniform_values',
        'min': 1e-5,
        'max': 3e-5
    },

    # adding a penalty to the loss function based
    # on the magnitude of the model's weights.
    'weight_decay': {
        'values': [0.0, 0.1, 0.2, 0.3, 0.4, 0.5]
    },

    # warmup ratio for the learning rate
    'warmup_ratio': {
            'values': [0.0, 0.1, 0.2, 0.3]
    }

}

'''
we will explore the hyperparameter values for one
epoch because later we will fine-tune for more
epochs using some of the best combinations of
values found with Sweeps
'''

# adding the parameters dictionary to the configuration
sweep_config['parameters'] = parameters_dict


In [None]:
# running hyperparameter search for classification tasks

run_hp_search(cls_datasets, sweep_config)

In [None]:
# running hyperparameter search for regression tasks

run_hp_search(reg_datasets, sweep_config)

### $\textbf{Finetuning}$

In [None]:
# setting configurations according to the hyperparameter search analysis for classification tasks

cls_datasets["BBBP"]["config"] = {
    "num_train_epochs": 20,
    "train_batch_size": 8,
    "learning_rate": 2.91e-5,
    "weight_decay": 0.3,
    "warmup_ratio": 0.3
    }

cls_datasets["BACE"]["config"] = {
    "num_train_epochs": 10,
    "train_batch_size": 8,
    "learning_rate": 2.62e-5,
    "weight_decay": 0.5,
    "warmup_ratio": 0.1
    }

cls_datasets["CLINTOX"]["config"] = {
    "num_train_epochs": 20,
    "train_batch_size": 16,
    "learning_rate": 1.82e-5,
    "weight_decay": 0.4,
    "warmup_ratio": 0.2
    }




In [None]:
# setting configurations according to the hyperparameter search analysis for regression tasks


reg_datasets["LIPO"]["config"] = {
    "num_train_epochs": 20,
    "train_batch_size": 8,
    "learning_rate": 2.48e-5,
    "weight_decay": 0.4,
    "warmup_ratio": 0.1
    }


reg_datasets["ESOL"]["config"] = {
    "num_train_epochs": 20,
    "train_batch_size": 8,
    "learning_rate": 2.85e-5,
    "weight_decay": 0.4,
    "warmup_ratio": 0.3
    }


reg_datasets["FreeSolv"]["config"] = {
    "num_train_epochs": 50,
    "train_batch_size": 8,
    "learning_rate": 2.96e-5,
    "weight_decay": 0.1,
    "warmup_ratio": 0.2
    }




In [None]:
def finetune(splits, task_type, model_name=None, project_name="My Project"):
    '''
    This function runs the fine-tuning loop for a given set
    of datasets and a task type (regression or classification).
    It iterates through each task, initializes a model, sets up
    training arguments and trains the model using HuggingFace's
    Trainer API. After training, it evaluates the model on the
    validation and test sets, stores the results, and saves the
    fine-tuned model.

    task type: "reg" - regression or "cls" - classification
    model_name: the name of the model to use for training. If None,
                the default model will be used.
    project_name: the name of the wandb project to use for logging.
    '''

    results = {}

    if model_name is None:
        model_name = "ibm/MoLFormer-XL-both-10pct"

    for task_name, splits in splits.items():
    # task_name corresponds to the property/task that we are fine-tuning for
    # splits is the train, eval, and test datasets for each task

        # printing display
        width = 70
        print("\n" + "-" * width)
        print(f" Fine-tuning task: {task_name} ".center(width, "-"))
        print(f" Type: {task_type.upper()} ".center(width, "-"))
        print("-" * width)

        # creating a folder for each task
        task_output_dir = f"/content/FinetunedModels/{task_name}"
        os.makedirs(task_output_dir, exist_ok=True)

        # extracting task-dependent variables
        num_labels = splits["train"].get_num_classes()
        config = splits["config"]
        train_dataset = splits["train"]
        eval_dataset = splits["eval"]
        test_dataset = splits["test"]

        if task_type == "cls":
          metric_key = "roc_auc"
        elif task_type == "reg":
          metric_key = "rmse"
        else:
          raise ValueError(
              f"Unknown task type: {task_type}. "
              "Please use 'cls' for classification or 'reg' for regression."
          )

        # initializing a fresh model per dataset
        model = CustomModelForSequenceClassification(model_name, num_labels)

        # training args per task
        task_training_args = get_training_args(output_dir=task_output_dir,
                                               task_type=task_type,
                                               best_metric=metric_key,
                                               config=config)

        # according to HF's Trainer API
        trainer = Trainer(
            model=model,
            args=task_training_args,
            train_dataset=train_dataset,
            eval_dataset=eval_dataset,
            compute_metrics=compute_metrics,  # custom metric function per task
          )

        run = wandb.init(
            project=project_name,
            name=f"finetune-{task_name}",
            reinit=True
          )

        trainer.train()

        # evaluating on validation and test set
        # a dictionary of metrics per dataset
        metrics_val = trainer.evaluate(eval_dataset=eval_dataset)
        metrics_test = trainer.evaluate(eval_dataset=test_dataset)

        # extracting the specific metric from the dictionary,
        # based on the task ('eval_roc_auc' or 'eval_rmse')
        eval_key = f"eval_{metric_key}"
        val_score = metrics_val.get(eval_key, 0.0)
        test_score = metrics_test.get(eval_key, 0.0)

        # storing in results
        results[task_name] = {
          f"val_{metric_key}": val_score,
          f"test_{metric_key}": test_score,
          "task_type": task_type
          }

        print(f"[{task_name}] {metric_key.upper()}: VAL - {val_score:.4f} | TEST - {test_score:.4f}")

        # saving the best model checkpoints
        shutil.make_archive(f"/content/{task_name}_best_model", "zip", trainer.state.best_model_checkpoint)

        run.finish()

    print("All results:", results)

In [None]:
# fine-tuning for classification tasks after hyper parameter search
# note: the project name is just an example

finetune(cls_datasets, "cls", project_name="finetuning-post-hp-search-cls-tasks-20-epochs")

In [None]:
# fine-tuning for regression tasks after hyper parameter search
# note: the project name is just an example

finetune(reg_datasets, "reg", project_name="finetuning-post-hp-search-reg-tasks-20-epochs")