# Disclaimer
This material was prepared as an account of work sponsored by an agency of the United States Government.  Neither the United States Government nor the United States Department of Energy, nor Battelle, nor any of their employees, nor any jurisdiction or organization that has cooperated in the development of these materials, makes any warranty, express or implied, or assumes any legal liability or responsibility for the accuracy, completeness, or usefulness or any information, apparatus, product, software, or process disclosed, or represents that its use would not infringe privately owned rights. Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or otherwise does not necessarily constitute or imply its endorsement, recommendation, or favoring by the United States Government or any agency thereof, or Battelle Memorial Institute. The views and opinions of authors expressed herein do not necessarily state or reflect those of the United States Government or any agency thereof.

PACIFIC NORTHWEST NATIONAL LABORATORY operated by BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY under Contract DE-AC05-76RL01830.

# Hyperparameter Search for Fine-Tuning on Azure Databricks

Wraps `nukelm.run_fine_tune` to install dependencies, provide a drop-down menu for parameters, and copy results to Azure storage. It utilizes multiple GPUs and integrates with MLFlow.

The search space is assumed to be a simple grid to be explored exhaustively. `hyperopt` is used for its Spark integrations, not its adpative search algorithms.

Please use Databricks Runtime 7.3 LTS ML (preferably with GPU support).

| Parameter | Description |
| --- | --- |
| Label Type | Whether to use fine-grained (multi-class) or coarse-grained (binary) labels. |
| RNG Seed | Integer with which to seed a random number generator. |

In [None]:
# Note: this approach is for running on DataBricks and assumes installation of nukelm via the wheel file.
%pip install nukelm-1.0.0-py3-none-any.whl

In [None]:
import json
import logging
import os
import pickle
import shutil
import uuid
from functools import partial
from pathlib import Path
from typing import Dict

import hyperopt
import mlflow
import numpy as np
import torch

from nukelm import run_fine_tune
from nukelm.hyperopt_utils import suggest, validate_space_exhaustive_search


# disable noisy py4j logger
logging.getLogger("py4j").setLevel(logging.WARNING)

# locations on Azure storage
blob_path = Path("")
assert blob_path.exists()

working_path = blob_path 
dbfs_working_path = blob_path 

data_path = working_path / "osti" / "finetune"
DATASET_SIZE = 188654

In [None]:
space = {
    "Batch Size": hyperopt.hp.choice("batch_size", [16, 64]),
    "Learning Rate": hyperopt.hp.choice("learning_rate", [1e-5, 2e-5, 5e-5]),
    "Model Name": hyperopt.hp.choice(
        "Model Name",
        [
            "roberta_base-ots",
            "roberta_base-trained",
            "roberta_large-ots",
            "roberta_large-trained",
            "scibert-ots",
            "scibert-trained",
        ],
    ),
}
# raises error if exhaustive search not possible
validate_space_exhaustive_search(space)

In [None]:
# find number of iterations by running through the search space once
_ALL_ARGS = []


def dummy_objective(args):
    """Make list of all args from exhaustive search."""
    _ALL_ARGS.append(args)
    return 0


_ = hyperopt.fmin(
    fn=dummy_objective,
    space=space,
    trials=hyperopt.Trials(),
    algo=partial(suggest, nbMaxSucessiveFailures=1000),
    max_evals=np.inf,
    show_progressbar=False,
)
NUM_ARGS = len(_ALL_ARGS)

print(f"Running {NUM_ARGS} training iterations.")

In [None]:
MODELS = {
    "roberta_base-ots": {
        "Model Name or Path": "roberta-base",
        "Tokenizer Name": "roberta-base",
    },
    "roberta_base-trained": {
        "Model Name or Path": str(working_path / "tmp_pretraining_input" / "roberta-base"),
        "Tokenizer Name": "roberta-base",
    },
    "roberta_large-ots": {
        "Model Name or Path": "roberta-large",
        "Tokenizer Name": "roberta-large",
    },
    "roberta_large-trained": {
        "Model Name or Path": str(working_path / "tmp_pretraining_input" / "roberta-large"),
        "Tokenizer Name": "roberta-large",
    },
    "scibert-ots": {
        "Model Name or Path": "allenai/scibert_scivocab_uncased",
        "Tokenizer Name": "allenai/scibert_scivocab_uncased",
    },
    "scibert-trained": {
        "Model Name or Path": str(working_path / "tmp_pretraining_input" / "scibert"),
        "Tokenizer Name": "allenai/scibert_scivocab_uncased",
    },
}

In [None]:
dbutils.widgets.dropdown("Label Type", "Coarse", ["Fine", "Coarse"])  # type: ignore # NOQA: F821
dbutils.widgets.text("RNG Seed", "42")  # type: ignore # NOQA: F821

In [None]:
LABEL_TYPE = dbutils.widgets.get("Label Type")  # type: ignore # NOQA: F821
rng_seed = int(dbutils.widgets.get("RNG Seed"))  # type: ignore # NOQA: F821

n_devices = torch.cuda.device_count()

FIXED_PARAMS = {
    "train_file": str(data_path / f"{LABEL_TYPE.lower()}-grained-labels" / "train.txt"),
    "validation_file": str(data_path / f"{LABEL_TYPE.lower()}-grained-labels" / "val.txt"),
    "num_train_epochs": 3,
    "per_device_train_batch_size": 4,
    "per_device_eval_batch_size": 8,
    "seed": rng_seed,
    "do_train": True,
    "do_eval": True,
    "evaluation_strategy": "steps",
    "load_best_model_at_end": True,
    "fp16": True,
    "n_gpu": n_devices,
}

In [None]:
def train(params: Dict):
    """Perform single fine-tuning run.

    Args:
        params (Dict): Training parameters which change run-to-run.

    Returns:
        Dict: `hyperopt` results including status and loss, if successful.
    """
    # avoid collisions with other runs
    run_uuid = uuid.uuid4()
    print(f"Run UUID: {run_uuid}")

    # cloud storage for intermediate outputs (to accomdate arbitrary number of serialized models)
    output_path = working_path / "tmp_training_output" / f"{run_uuid}"
    output_path.mkdir()
    params["output_dir"] = str(output_path)

    # tensorboard does not connect properly to mounted cloud storage
    log_path = Path("/tmp") / "logs" / f"{run_uuid}"
    log_path.mkdir(parents=True)
    config_path = log_path / "training_config.json"
    params["logging_dir"] = str(log_path)

    # override defaults for some parameters
    params.update(FIXED_PARAMS)

    model_name = params.pop("Model Name")
    params["model_name_or_path"] = MODELS[model_name]["Model Name or Path"]
    params["tokenizer_name"] = MODELS[model_name]["Tokenizer Name"]

    params["learning_rate"] = params.pop("Learning Rate")

    # calculate gradient accumulation steps necessary to achieve effective minibatch size
    batch_size = params.pop("Batch Size")
    n_devices = torch.cuda.device_count()
    gradient_accumulation_steps = batch_size / params["per_device_train_batch_size"] / n_devices
    assert params["gradient_accumulation_steps"] % 1 == 0, (
        f"Check interplay of batch size ({batch_size}), gradient accumulation "
        f"({gradient_accumulation_steps}), and multiple GPUs ({n_devices})"
    )
    params["gradient_accumulation_steps"] = gradient_accumulation_steps

    n_samples = DATASET_SIZE  # samples without resampling
    n_steps = int(params["num_train_epochs"] * np.ceil(n_samples / batch_size))

    # adjust warmup steps to batch size
    params["warmup_steps"] = int(0.06 * n_steps)

    # evaluate & save ~20 times in training, regardless of batch size
    logging_steps = int(np.floor(n_steps / 20))
    params["logging_steps"] = logging_steps
    params["save_steps"] = logging_steps
    params["eval_steps"] = logging_steps

    with open(config_path, "w") as handle:
        json.dump(params, handle)

    # attempt to clear GPU cache from any previous runs
    torch.cuda.empty_cache()

    try:
        metrics = run_fine_tune(config_path)
    except Exception as ex:
        print(repr(ex))
        return {"status": hyperopt.STATUS_FAIL}

    # backup Tensorboard logs
    shutil.copytree(log_path, Path(output_path) / "raw_logs")

    result = {
        "loss": metrics["eval_loss"],
        "status": hyperopt.STATUS_OK,
    }

    return result

In [None]:
spark_trials = True  # Set to False to run on head node only
parallelism = 2  # Set to number of nodes in cluster
evals_to_run = NUM_ARGS

# SparkTrials do not seem to be picklable for re-starting later
if spark_trials:
    trials = hyperopt.SparkTrials(parallelism=parallelism)
    rstate = np.random.RandomState(42)
else:
    trials_file = os.path.join(working_path, "tmp_training_output", f"hyperopt-trials_{LABEL_TYPE.lower()}.pkl")
    try:
        with open(trials_file, "rb") as handle:
            trials, rstate = pickle.load(handle)
        print("Loaded saved Trials")
    except Exception:
        print("Starting a new Trials object")
        trials = hyperopt.Trials()
        rstate = np.random.RandomState(42)

In [None]:
# default is 4 -- retries unlikely to fix errors
spark.conf.set("spark.task.maxFailures", 1)  # type: ignore # NOQA: F821

try:
    with mlflow.start_run():
        best = hyperopt.fmin(
            train,
            space=space,
            algo=suggest,
            max_evals=evals_to_run,
            trials=trials,
            rstate=rstate,
        )
finally:
    mlflow.end_run()
    if not spark_trials:
        with open(trials_file, "wb") as handle:
            pickle.dump((trials, rstate), handle)