In [1]:
from pprint import pprint
import ray

ray.init()

2023-10-24 13:34:52,777	INFO worker.py:1633 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.11.5
Ray version:,2.7.1
Dashboard:,http://127.0.0.1:8265


In [2]:
pprint(ray.cluster_resources())

{'CPU': 12.0,
 'memory': 7480537908.0,
 'node:127.0.0.1': 1.0,
 'node:__internal_head__': 1.0,
 'object_store_memory': 2147483648.0}


In [3]:
use_gpu = False  # set this to False to run on CPUs
num_workers = 1  # set this to number of GPUs or CPUs you want to use

In [5]:
GLUE_TASKS = [
    "cola",
    "mnli",
    "mnli-mm",
    "mrpc",
    "qnli",
    "qqp",
    "rte",
    "sst2",
    "stsb",
    "wnli",
]

In [6]:
task = "cola"
model_checkpoint = "distilbert-base-uncased"
batch_size = 16

In [7]:
from datasets import load_dataset

actual_task = "mnli" if task == "mnli-mm" else task
datasets = load_dataset("glue", actual_task)

In [8]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, use_fast=True)

In [9]:
task_to_keys = {
    "cola": ("sentence", None),
    "mnli": ("premise", "hypothesis"),
    "mnli-mm": ("premise", "hypothesis"),
    "mrpc": ("sentence1", "sentence2"),
    "qnli": ("question", "sentence"),
    "qqp": ("question1", "question2"),
    "rte": ("sentence1", "sentence2"),
    "sst2": ("sentence", None),
    "stsb": ("sentence1", "sentence2"),
    "wnli": ("sentence1", "sentence2"),
}

In [10]:
import ray.data

ray_datasets = {
    "train": ray.data.from_huggingface(datasets["train"]),
    "validation": ray.data.from_huggingface(datasets["validation"]),
    "test": ray.data.from_huggingface(datasets["test"]),
}
ray_datasets

{'train': MaterializedDataset(
    num_blocks=1,
    num_rows=8551,
    schema={sentence: string, label: int64, idx: int32}
 ),
 'validation': MaterializedDataset(
    num_blocks=1,
    num_rows=1043,
    schema={sentence: string, label: int64, idx: int32}
 ),
 'test': MaterializedDataset(
    num_blocks=1,
    num_rows=1063,
    schema={sentence: string, label: int64, idx: int32}
 )}

In [21]:
import numpy as np
from typing import Dict


# Tokenize input sentences
def collate_fn(examples: Dict[str, np.array]):
    sentence1_key, sentence2_key = task_to_keys[task]
    if sentence2_key is None:
        outputs = tokenizer(
            list(examples[sentence1_key]),
            truncation=True,
            padding="longest",
            return_tensors="pt",
        )
    else:
        outputs = tokenizer(
            list(examples[sentence1_key]),
            list(examples[sentence2_key]),
            truncation=True,
            padding="longest",
            return_tensors="pt",
        )

    outputs["labels"] = torch.LongTensor(examples["label"])

    # Move all input tensors to GPU
    for key, value in outputs.items():
        outputs[key] = value.cuda()

    return outputs

[2m[33m(raylet)[0m [2023-10-25 11:38:41,575 E 84852 1276729] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2023-10-24_13-34-51_109442_84836 is over 95% full, available space: 24603598848; capacity: 494384795648. Object creation will fail if spilling is required.
[2m[33m(raylet)[0m [2023-10-25 11:38:51,577 E 84852 1276729] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2023-10-24_13-34-51_109442_84836 is over 95% full, available space: 24498843648; capacity: 494384795648. Object creation will fail if spilling is required.
[2m[33m(raylet)[0m [2023-10-25 11:39:01,669 E 84852 1276729] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2023-10-24_13-34-51_109442_84836 is over 95% full, available space: 24498454528; capacity: 494384795648. Object creation will fail if spilling is required.
[2m[33m(raylet)[0m [2023-10-25 11:39:11,768 E 84852 1276729] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2023-10-24_13-34-51_109442_84836 is over 95% full, available

In [12]:
import torch
import numpy as np

from datasets import load_metric
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer

import ray.train
from ray.train.huggingface.transformers import prepare_trainer, RayTrainReportCallback

num_labels = 3 if task.startswith("mnli") else 1 if task == "stsb" else 2
metric_name = (
    "pearson"
    if task == "stsb"
    else "matthews_correlation"
    if task == "cola"
    else "accuracy"
)
model_name = model_checkpoint.split("/")[-1]
validation_key = (
    "validation_mismatched"
    if task == "mnli-mm"
    else "validation_matched"
    if task == "mnli"
    else "validation"
)
name = f"{model_name}-finetuned-{task}"

# Calculate the maximum steps per epoch based on the number of rows in the training dataset.
# Make sure to scale by the total number of training workers and the per device batch size.
max_steps_per_epoch = ray_datasets["train"].count() // (batch_size * num_workers)


def train_func(config):
    print(f"Is CUDA available: {torch.cuda.is_available()}")

    metric = load_metric("glue", actual_task)
    tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, use_fast=True)
    model = AutoModelForSequenceClassification.from_pretrained(
        model_checkpoint, num_labels=num_labels
    )

    train_ds = ray.train.get_dataset_shard("train")
    eval_ds = ray.train.get_dataset_shard("eval")

    train_ds_iterable = train_ds.iter_torch_batches(
        batch_size=batch_size, collate_fn=collate_fn
    )
    eval_ds_iterable = eval_ds.iter_torch_batches(
        batch_size=batch_size, collate_fn=collate_fn
    )

    print("max_steps_per_epoch: ", max_steps_per_epoch)

    args = TrainingArguments(
        name,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        logging_strategy="epoch",
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        learning_rate=config.get("learning_rate", 2e-5),
        num_train_epochs=config.get("epochs", 2),
        weight_decay=config.get("weight_decay", 0.01),
        push_to_hub=False,
        max_steps=max_steps_per_epoch * config.get("epochs", 2),
        disable_tqdm=True,  # declutter the output a little
        no_cuda=not use_gpu,  # you need to explicitly set no_cuda if you want CPUs
        report_to="none",
    )

    def compute_metrics(eval_pred):
        predictions, labels = eval_pred
        if task != "stsb":
            predictions = np.argmax(predictions, axis=1)
        else:
            predictions = predictions[:, 0]
        return metric.compute(predictions=predictions, references=labels)

    trainer = Trainer(
        model,
        args,
        train_dataset=train_ds_iterable,
        eval_dataset=eval_ds_iterable,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics,
    )

    trainer.add_callback(RayTrainReportCallback())

    trainer = prepare_trainer(trainer)

    print("Starting training")
    trainer.train()

In [18]:
from ray.train.torch import TorchTrainer
from ray.train import RunConfig, ScalingConfig, CheckpointConfig

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
    datasets={
        "train": ray_datasets["train"],
        "eval": ray_datasets["validation"],
    },
    run_config=RunConfig(
        checkpoint_config=CheckpointConfig(
            num_to_keep=1,
            checkpoint_score_attribute="eval_loss",
            checkpoint_score_order="min",
        ),
    ),
)

In [19]:
result = trainer.fit()

0,1
Current time:,2023-10-24 13:44:59
Running for:,00:00:09.03
Memory:,13.4/16.0 GiB

Trial name,# failures,error file
TorchTrainer_67715_00000,1,/Users/piyush/ray_results/TorchTrainer_2023-10-24_13-44-50/TorchTrainer_67715_00000_0_2023-10-24_13-44-50/error.txt

Trial name,status,loc
TorchTrainer_67715_00000,ERROR,127.0.0.1:85636


[2m[36m(TorchTrainer pid=85636)[0m Starting distributed worker processes: ['85638 (127.0.0.1)']
[2m[36m(RayTrainWorker pid=85638)[0m Setting up process group for: env:// [rank=0, world_size=1]


[2m[36m(RayTrainWorker pid=85638)[0m Is CUDA available: False


[2m[36m(SplitCoordinator pid=85641)[0m Auto configuring locality_with_output=['6dd89804ce3641d8f56642502c88751c22edee93e0f8b822d791e3e6']


[2m[36m(RayTrainWorker pid=85638)[0m max_steps_per_epoch:  534
[2m[36m(RayTrainWorker pid=85638)[0m Starting training


[2m[36m(RayTrainWorker pid=85638)[0m Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'pre_classifier.weight', 'pre_classifier.bias', 'classifier.weight']
[2m[36m(RayTrainWorker pid=85638)[0m You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
[2m[36m(RayTrainWorker pid=85638)[0m torch.distributed process group is initialized, but parallel_mode != ParallelMode.DISTRIBUTED. In order to use Torch DDP, launch your script with `python -m torch.distributed.launch


(pid=85640) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=85640)[0m Executing DAG InputDataBuffer[Input] -> OutputSplitter[split(1, equal=True)]
[2m[36m(SplitCoordinator pid=85640)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['6dd89804ce3641d8f56642502c88751c22edee93e0f8b822d791e3e6'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(SplitCoordinator pid=85640)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-10-24 13:44:59,910	ERROR tune_controller.py:1502 -- Trial task failed for trial TorchTrainer_67715_00000
Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.11/site-packages/ray/air/execution/_internal/event_manager.py", line 110, in resolve_future
    result = ray.get(future)
             ^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/ray/_private/

TrainingFailedError: The Ray Train run failed. Please inspect the previous error messages for a cause. After fixing the issue (assuming that the error is not caused by your own application logic, but rather an error such as OOM), you can restart the run from scratch or continue this run.
To continue this run, you can use: `trainer = TorchTrainer.restore("/Users/piyush/ray_results/TorchTrainer_2023-10-24_13-44-50")`.
To start a new run that will retry on training failures, set `train.RunConfig(failure_config=train.FailureConfig(max_failures))` in the Trainer's `run_config` with `max_failures > 0`, or `max_failures = -1` for unlimited retries.

In [20]:
print(result)

Result(
  metrics={},
  path='/Users/piyush/ray_results/TorchTrainer_2023-10-24_13-42-04/TorchTrainer_03fef_00000_0_2023-10-24_13-42-04',
  filesystem='local',
  checkpoint=None
)
