## Hyperparameter tuning using WandB

In [None]:

#Importing Libraries

import wandb
import os
from scipy.special import softmax
from sklearn.metrics import roc_auc_score, precision_score, recall_score, f1_score,matthews_corrcoef
from datasets import Dataset
from datasets import load_dataset
import datasets
from peft import LoraConfig, get_peft_model
import torch
import torch.nn.functional as F
from sklearn.preprocessing import LabelEncoder
from transformers import TrainingArguments, EarlyStoppingCallback, AutoModelForSequenceClassification, AutoTokenizer
from peft import PeftModel  
import pandas as pd
import numpy as np 

In [None]:
def data_load():
    train_flav=pd.read_csv('.../flavor_datasets/fart_train.csv')
    val_flav=pd.read_csv('.../flavor_datasets/fart_val.csv')
    train_flav.drop('Unnamed: 0',axis=1, inplace=True)
    val_flav.drop('Unnamed: 0',axis=1, inplace=True)

    return train_flav, val_flav  

In [None]:
def data_prep(data_process):

    dataset = Dataset.from_pandas(data_process)
       

    return dataset

def tokenize_function(examples,tokenizer):

    return tokenizer(examples["Canonicalized SMILES"], padding="max_length", truncation=True, max_length=512)


In [None]:
def label_encoding(dataset):

    label_encoder = LabelEncoder()

    encoded_labels = label_encoder.fit_transform(dataset['Canonicalized Taste'])

    dataset = dataset.add_column('label', encoded_labels)
    
    columns_to_remove = ["Canonicalized SMILES", "Standardized SMILES", 
                     "Canonicalized Taste", "Original Labels", "Source", "is_multiclass"]


    dataset = dataset.remove_columns(columns_to_remove)

    return dataset

In [None]:
from peft import LoraConfig, get_peft_model

def lora_config(r, lora_alpha, dropout):

    lora_config = LoraConfig(
        task_type="SEQ_CLS",  # Sequence classification task
        r=r,  # Rank of LoRA matrices
        lora_alpha=lora_alpha,  # Scaling factor double of rank( from the rule of thumb)
        target_modules='all-linear',
        lora_dropout=dropout  # Dropout rate
        #init_lora_weights="gaussian"
    )

    return lora_config

## Focal Loss 

In [None]:

def focal_loss(inputs, targets, alpha=1, gamma=2):
    log_prob = F.log_softmax(inputs, dim=-1)
    prob = torch.exp(log_prob)  # Convert log probabilities back to normal probabilities

    targets_one_hot = F.one_hot(targets, num_classes=inputs.shape[-1])
    pt = torch.sum(prob * targets_one_hot, dim=-1)  # Get probability of the true class

    focal_loss = -alpha * (1 - pt) ** gamma * torch.sum(log_prob * targets_one_hot, dim=-1)
    
    return focal_loss.mean()

In [None]:
class focalloss(Trainer):
    
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits

        loss = focal_loss(logits, labels)
        
        return (loss, outputs) if return_outputs else loss

## Weighted Loss

In [None]:
import numpy as np
import torch
from transformers import Trainer
from collections import Counter

class WeightedLossTrainer(Trainer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Extract labels from train_dataset
        labels = self.train_dataset['labels']  

        # Count label frequencies
        label_counts = Counter(labels)
        total_count = len(labels)

        # Compute inverse frequency weights
        num_classes = self.model.config.num_labels
        weights = [1 - (label_counts[i] / total_count) if i in label_counts else 1.0 for i in range(num_classes)]

        self.class_weights = torch.tensor(weights).float().to("cuda")

    def compute_loss(self, model, inputs, return_outputs=False,num_items_in_batch=None, **kwargs):
        outputs = model(**inputs)
        logits = outputs.get("logits")
        labels = inputs.get("labels")

        # Use class weights in CrossEntropyLoss
        loss_func = torch.nn.CrossEntropyLoss(weight=self.class_weights)
        loss = loss_func(logits.view(-1, self.model.config.num_labels), labels.view(-1))

        return (loss, outputs) if return_outputs else loss



In [None]:
import re

sweep_config = {
"name": "Flavor Hyperparameter Tuning",
"method": "bayes",
"metric": {
    "goal": "maximize", 
    "name": "eval/mcc_metric"},
"parameters": {"lr": {
        "distribution": "uniform",
        "min": 1e-5,  
        "max": 2e-3},
    "r": {"values": [4,8,16,32,64, 128]},
    "lora_alpha": {"values": [4,8,16,32,64,128]},
    "dropout": {"values": [0.0,0.1,0.2] },
    
    "optimizer": {"value": ["adamw"]}}
}

sweep_id = wandb.sweep(sweep_config, project="huggingface")

model_list= ["DeepChem/ChemBERTa-77M-MLM",
             "DeepChem/ChemBERTa-10M-MLM",
             "DeepChem/ChemBERTa-10M-MTR",
             "DeepChem/ChemBERTa-5M-MTR",
             "DeepChem/ChemBERTa-77M-MTR",
             "ibm/MoLFormer-XL-both-10pct"]

for model_name in model_list:
    print(f"Running sweep for model: {model_name}")
    
    def safe_model_name(name1):
        return re.sub(r"[^a-zA-Z0-9]", "__", name1)


    
    def run_training():

        run = wandb.init(project="flavor analysis chemberta Hyperparameter Tuning")
        config = run.config


        model_id_clean = safe_model_name(model_name)
        print(f"Model ID cleaned: {model_id_clean}")
        run_id = wandb.run.id

        # Define unique output folders
        save_dir = f".../{model_id_clean}/{run_id}"
        logging_dir = f".../{model_id_clean}/{run_id}"
        os.makedirs(save_dir, exist_ok=True)

    
        tokenizer = AutoTokenizer.from_pretrained(model_name,trust_remote_code=True)
        model = AutoModelForSequenceClassification.from_pretrained(
            model_name,
            num_labels=5,
            trust_remote_code=True
        )


        train_data, val_data=data_load()
        training_data=data_prep(train_data)
        validation_data=data_prep(val_data)    
        training_data=training_data.map(lambda x: tokenize_function(x, tokenizer), batched=True)
        validation_data=validation_data.map(lambda x: tokenize_function(x, tokenizer), batched=True)


        training_data=label_encoding(training_data)
        validation_data=label_encoding(validation_data)
        

        peft_config = lora_config(config.r, config.lora_alpha, config.dropout)
        lora_model = get_peft_model(model, peft_config)
        lora_model.print_trainable_parameters()

        training_args = TrainingArguments(
        output_dir=save_dir,
        eval_strategy="steps",
        learning_rate=config.lr,
        per_device_train_batch_size=32,
        per_device_eval_batch_size=32,
        num_train_epochs=5,
        weight_decay=0.01,
        save_strategy="steps",
        logging_dir="./logs_flavor_chem_wandb",
        logging_strategy="steps",
        logging_steps=100,
        report_to="wandb",
        load_best_model_at_end=True,
        metric_for_best_model="eval_mcc_metric",
        greater_is_better=True,
        remove_unused_columns=False,

        )


        metric = evaluate.load("accuracy")

        def compute_metrics(eval_pred):

            logits, labels = eval_pred

            predictions = np.argmax(logits, axis=-1)
            probabilities= softmax(logits, axis=1)
            mcc = matthews_corrcoef(labels, predictions)

                
            return {
                    "eval_mcc_metric": mcc,
                    "Accuracy": metric.compute(predictions=predictions, references=labels)["accuracy"],
                    "AUC-ROC": roc_auc_score(labels, probabilities,multi_class="ovr"),  # AUC-ROC requires probabilities
                    "Precision": precision_score(labels, predictions,average="macro"),
                    "Recall": recall_score(labels, predictions,average="macro"),
                    "F1-score": f1_score(labels, predictions,average="macro")
                }


        trainer_flavor = WeightedLossTrainer(
        model=lora_model,
        args=training_args,
        train_dataset=training_data,
        eval_dataset= validation_data,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=5)]
        )

        """
        trainer_flavor = focalloss(
        model=lora_model,
        args=training_args,
        train_dataset=training_data,
        eval_dataset= validation_data,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=5)]
        )
        """



        trainer_flavor.train()
        trainer_flavor.save_model(save_dir)
            
        print(f"Model saved to {save_dir}")

        wandb.finish()

    



    wandb.agent(sweep_id, function=run_training, count=5)

    api = wandb.Api()
    sweep = api.sweep(f"huggingface/{sweep_id}")
    print(sweep.runs[0].summary_metrics)

    runs_with_rmse = [run for run in sweep.runs if 'eval/mcc_metric' in run.summary_metrics]
    if runs_with_rmse:
        # Sort by rmse in descending order (maximize)
        best_run = sorted(runs_with_rmse, key=lambda run: run.summary_metrics['eval/mcc_metric'])[0]
    else:
        raise ValueError("No runs found with 'eval/mcc_metric' metric.")

    best_hyperparameters = best_run.config
    print(f"Best hyperparameters: {best_hyperparameters}")
    print("completed sweep for model: ",model_name)




## Evaluation

In [None]:
#compute metrics
#metric = evaluate.load("accuracy")

def compute_metrics(eval_pred):

    logits, labels = eval_pred

    predictions = np.argmax(logits, axis=-1)
    probabilities= softmax(logits, axis=1)
    mcc = matthews_corrcoef(labels, predictions)

        
    return {
            "eval_mcc_metric": mcc,
            #"Accuracy": metric.compute(predictions=predictions, references=labels)["accuracy"],
            "AUC-ROC": roc_auc_score(labels, probabilities,multi_class="ovr"),  # AUC-ROC requires probabilities
            "Precision": precision_score(labels, predictions,average="macro"),
            "Recall": recall_score(labels, predictions,average="macro"),
            "F1-score": f1_score(labels, predictions,average="macro")
        }


In [None]:

# Map your folder names to the base HuggingFace model names
MODEL_NAME_MAP = {
    "DeepChemWL_Flavor_ChemBERTaWL_Flavor_5MWL_Flavor_MTR": "DeepChem/ChemBERTa-5M-MTR",
    "DeepChemWL_Flavor_ChemBERTaWL_Flavor_10MWL_Flavor_MTR": "DeepChem/ChemBERTa-10M-MTR",
    "DeepChemWL_Flavor_ChemBERTaWL_Flavor_77MWL_Flavor_MLM": "DeepChem/ChemBERTa-77M-MLM",
    "DeepChemWL_Flavor_ChemBERTaWL_Flavor_10MWL_Flavor_MLM": "DeepChem/ChemBERTa-10M-MLM",
    "DeepChemWL_Flavor_ChemBERTaWL_Flavor_77MWL_Flavor_MTR": "DeepChem/ChemBERTa-77M-MTR",
    "ibmWL_Flavor_MoLFormerWL_Flavor_XLWL_Flavor_bothWL_Flavor_10pct":"ibm/MoLFormer-XL-both-10pct"
    }


models_root_dir = ".../models_Flavor_WL"

eval_args = TrainingArguments(
    output_dir="./test_results_flavor",
    per_device_eval_batch_size=32,
    report_to="none",  # Disable logging to W&B for test
    disable_tqdm=True 

)

def find_all_peft_checkpoints(root_dir):
    checkpoints = []
    for model_folder in os.listdir(root_dir):
        model_folder_path = os.path.join(root_dir, model_folder)
        if not os.path.isdir(model_folder_path):
            continue
        for run_id in os.listdir(model_folder_path):
            run_path = os.path.join(model_folder_path, run_id)
            if not os.path.isdir(run_path):
                continue
            for subdir in os.listdir(run_path):
                checkpoint_path = os.path.join(run_path, subdir)
                if subdir.startswith("checkpoint-") and os.path.exists(os.path.join(checkpoint_path, "adapter_config.json")):
                    checkpoints.append((model_folder, run_id, checkpoint_path))
    return checkpoints

valid_checkpoints = find_all_peft_checkpoints(models_root_dir)
print(f"Found {len(valid_checkpoints)} valid checkpoints.")

for model_folder, run_id, checkpoint_path in valid_checkpoints:
    print("Model folder: ",model_folder)

    hf_model_name = MODEL_NAME_MAP[model_folder]
    print(f"Using base model: {hf_model_name}")

    
    # Load tokenizer and base model for the model type

    tokenizer = AutoTokenizer.from_pretrained(hf_model_name, trust_remote_code=True)
    base_model = AutoModelForSequenceClassification.from_pretrained(
        hf_model_name,
        num_labels=5,
        problem_type="single_label_classification",
        trust_remote_code=True
    )

    from datasets import Dataset

    from sklearn.preprocessing import LabelEncoder
    test_dataset= pd.read_csv('.../fart_test.csv')
    label_encoder = LabelEncoder() 
    test_dataset = Dataset.from_pandas(test_dataset)
    def tokenize_function(examples):

        return tokenizer(examples["Canonicalized SMILES"], padding="max_length", truncation=True, max_length=512)
    
    test_dataset = test_dataset.map(tokenize_function, batched=True)
    encoded_labels = label_encoder.fit_transform(test_dataset['Canonicalized Taste'])
    

    test_dataset = test_dataset.add_column('label', encoded_labels) 
    columns_to_remove = ["Canonicalized SMILES", "Standardized SMILES", 
                        "Canonicalized Taste", "Original Labels", "Source", "is_multiclass"]
    test_dataset = test_dataset.remove_columns(columns_to_remove) 

    

    # Load the adapter checkpoint
    adapter_model = PeftModel.from_pretrained(base_model, checkpoint_path)
    adapter_model.eval()

    # Eval
    from transformers import Trainer

    trainer = Trainer(
        model=adapter_model,
        args=eval_args,
        eval_dataset=test_dataset,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics
    )

    print(f"\n🔍 Evaluating {model_folder}/{run_id}/{os.path.basename(checkpoint_path)}")
    
    test_results = trainer.evaluate()
    print(f"Test results: {test_results}")
   
    
   

   



## Load and Merge Base Model with LoRA weights

In [None]:
base_model = AutoModelForSequenceClassification.from_pretrained(
    "DeepChem/ChemBERTa-77M-MLM", #change model name as per your requirement
    num_labels=5,
    problem_type="single_label_classification",    
    trust_remote_code=True,
    
)



adapter_model = PeftModel.from_pretrained(base_model, ".../DeepChem__ChemBERTa__77M__MLM/x38bwbvz/checkpoint-416")

final_model_clintox_molformer= adapter_model.merge_and_unload()

In [None]:
save_path = ".../weighted_loss_clin/final_model_chem_77M-MLM-WL"

final_model_clintox_molformer.save_pretrained(save_path)