In [1]:
#! pip install "datasets" "evaluate" "accelerate==0.18.0" "transformers==4.26.0" "torch>=1.12.0" "deepspeed==0.12.3"

In [2]:
import numpy as np
import pandas as pd
import os

In [3]:
model_name = "EleutherAI/gpt-j-6B"
use_gpu = True
num_workers = 2
cpus_per_worker = 12

In [4]:
import ray
ray.init()

2024-04-10 23:18:44,185	INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8266 [39m[22m


0,1
Python version:,3.10.0
Ray version:,2.10.0
Dashboard:,http://127.0.0.1:8266


[36m(TrainTrainable pid=51932)[0m [2024-04-10 23:19:05,192] [INFO] [real_accelerator.py:158:get_accelerator] Setting ds_accelerator to cuda (auto detect)


[36m(RayTrainWorker pid=52019)[0m Setting up process group for: env:// [rank=0, world_size=2]
[36m(TorchTrainer pid=51932)[0m Started distributed worker processes: 
[36m(TorchTrainer pid=51932)[0m - (ip=10.0.0.239, pid=52019) world_rank=0, local_rank=0, node_rank=0
[36m(TorchTrainer pid=51932)[0m - (ip=10.0.0.239, pid=52020) world_rank=1, local_rank=1, node_rank=0


[36m(RayTrainWorker pid=52020)[0m [2024-04-10 23:19:10,822] [INFO] [real_accelerator.py:158:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[36m(RayTrainWorker pid=52019)[0m [2024-04-10 23:19:10,815] [INFO] [real_accelerator.py:158:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[36m(RayTrainWorker pid=52019)[0m Preparing training arguments
[36m(RayTrainWorker pid=52019)[0m [2024-04-10 23:19:11,764] [INFO] [comm.py:637:init_distributed] cdb=None
[36m(RayTrainWorker pid=52019)[0m Loading model
[36m(RayTrainWorker pid=52019)[0m [2024-04-10 23:20:00,924] [INFO] [partition_parameters.py:348:__exit__] finished initializing model - num_params = 285, num_elems = 6.05B
[36m(RayTrainWorker pid=52020)[0m Preparing training arguments
[36m(RayTrainWorker pid=52020)[0m [2024-04-10 23:19:11,936] [INFO] [comm.py:637:init_distributed] cdb=None
[36m(RayTrainWorker pid=52020)[0m Loading model
[36m(RayTrainWorker pid=52020)[0m Model loaded


In [5]:
from datasets import load_dataset

print("Loading tiny_shakespeare dataset")
current_dataset = load_dataset("tiny_shakespeare",trust_remote_code=True)
current_dataset

Loading tiny_shakespeare dataset


DatasetDict({
    train: Dataset({
        features: ['text'],
        num_rows: 1
    })
    validation: Dataset({
        features: ['text'],
        num_rows: 1
    })
    test: Dataset({
        features: ['text'],
        num_rows: 1
    })
})

In [6]:
import ray.data

ray_datasets = {
    "train": ray.data.from_huggingface(current_dataset["train"]),
    "validation": ray.data.from_huggingface(current_dataset["validation"]),
}

ray_datasets

You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this dataset from the next major release of `datasets`.


{'train': MaterializedDataset(num_blocks=1, num_rows=1, schema={text: string}),
 'validation': MaterializedDataset(num_blocks=1, num_rows=1, schema={text: string})}

In [7]:
block_size = 256

In [8]:
from transformers import AutoTokenizer


def split_text(batch: pd.DataFrame) -> pd.DataFrame:
    text = list(batch["text"])
    flat_text = "".join(text)
    split_text = [
        x.strip()
        for x in flat_text.split("\n")
        if x.strip() and not x.strip()[-1] == ":"
    ]
    return pd.DataFrame(split_text, columns=["text"])


def tokenize(batch: pd.DataFrame) -> dict:
    tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
    tokenizer.pad_token = tokenizer.eos_token
    ret = tokenizer(
        list(batch["text"]),
        truncation=True,
        max_length=block_size,
        padding="max_length",
        return_tensors="np",
    )
    ret["labels"] = ret["input_ids"].copy()
    return dict(ret)


processed_datasets = {
    key: (
        ds.map_batches(split_text, batch_format="pandas")
        .map_batches(tokenize, batch_format="pandas")
    )
    for key, ds in ray_datasets.items()
}
processed_datasets

{'train': MapBatches(tokenize)
 +- MapBatches(split_text)
    +- Dataset(num_rows=1, schema={text: string}),
 'validation': MapBatches(tokenize)
 +- MapBatches(split_text)
    +- Dataset(num_rows=1, schema={text: string})}

In [9]:
import evaluate
import torch
from transformers import (
    Trainer,
    TrainingArguments,
    GPTJForCausalLM,
    AutoTokenizer,
    default_data_collator,
)
from transformers.utils.logging import disable_progress_bar, enable_progress_bar

from ray import train
from ray.train.huggingface.transformers import prepare_trainer, RayTrainReportCallback


def train_func(config):
    # Use the actual number of CPUs assigned by Ray
    os.environ["OMP_NUM_THREADS"] = str(
        train.get_context().get_trial_resources().bundles[-1].get("CPU", 1)
    )
    # Enable tf32 for better performance
    torch.backends.cuda.matmul.allow_tf32 = True

    batch_size = config.get("batch_size", 4)
    epochs = config.get("epochs", 2)
    warmup_steps = config.get("warmup_steps", 0)
    learning_rate = config.get("learning_rate", 0.00002)
    weight_decay = config.get("weight_decay", 0.01)
    steps_per_epoch = config.get("steps_per_epoch")

    deepspeed = {
        "fp16": {
            "enabled": "auto",
            "initial_scale_power": 8,
            "hysteresis": 4,
            "consecutive_hysteresis": True,
        },
        "bf16": {"enabled": "auto"},
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": "auto",
                "betas": "auto",
                "eps": "auto",
            },
        },
        "zero_optimization": {
            "stage": 3,
            "offload_optimizer": {
                "device": "cpu",
                #"pin_memory": True,
                "pin_memory": False,
            },
            #"overlap_comm": True,
            "overlap_comm": False,
            "contiguous_gradients": True,
            "reduce_bucket_size": "auto",
            "stage3_prefetch_bucket_size": "auto",
            "stage3_param_persistence_threshold": "auto",
            "gather_16bit_weights_on_model_save": True,
            "round_robin_gradients": True,
        },
        "gradient_accumulation_steps": "auto",
        "gradient_clipping": "auto",
        "steps_per_print": 10,
        "train_batch_size": "auto",
        "train_micro_batch_size_per_gpu": "auto",
        "wall_clock_breakdown": False,
    }

    print("Preparing training arguments")
    training_args = TrainingArguments(
        "output",
        logging_steps=1,
        save_strategy="steps",
        save_steps=steps_per_epoch,
        max_steps=steps_per_epoch * epochs,
        per_device_train_batch_size=batch_size,
        gradient_accumulation_steps=1,
        learning_rate=learning_rate,
        weight_decay=weight_decay,
        warmup_steps=warmup_steps,
        label_names=["input_ids", "attention_mask"],
        push_to_hub=False,
        report_to="none",
        disable_tqdm=True,  # declutter the output a little
        fp16=True,
        #bf16=True,
        gradient_checkpointing=True,
        deepspeed=deepspeed,
    )
    disable_progress_bar()

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token

    print("Loading model")

    model = GPTJForCausalLM.from_pretrained(model_name, use_cache=True)
    model.resize_token_embeddings(len(tokenizer))

    print("Model loaded")

    enable_progress_bar()

    metric = evaluate.load("accuracy")

    train_ds = train.get_dataset_shard("train")
    eval_ds = train.get_dataset_shard("validation")

    train_ds_iterable = train_ds.iter_torch_batches(
        batch_size=batch_size,
        local_shuffle_buffer_size=train.get_context().get_world_size() * batch_size,
    )
    eval_ds_iterable = eval_ds.iter_torch_batches(batch_size=batch_size)

    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        return metric.compute(predictions=predictions, references=labels)

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_ds_iterable,
        eval_dataset=eval_ds_iterable,
        compute_metrics=compute_metrics,
        tokenizer=tokenizer,
        data_collator=default_data_collator,
    )

    # Add callback to report checkpoints to Ray Train
    trainer.add_callback(RayTrainReportCallback())
    trainer = prepare_trainer(trainer)
    trainer.train()

[2024-04-10 23:18:59,604] [INFO] [real_accelerator.py:158:get_accelerator] Setting ds_accelerator to cuda (auto detect)


In [10]:
storage_path = "/home/novelty/ray_test"  # TODO: Set up cloud storage
# storage_path="/mnt/path/to/nfs"     # TODO: Alternatively, set up NFS

In [11]:
batch_size = 2
train_ds_size = processed_datasets["train"].count()
steps_per_epoch = train_ds_size // (batch_size * num_workers)

In [12]:
from ray.train.torch import TorchTrainer
from ray.train import RunConfig, ScalingConfig, CheckpointConfig

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    train_loop_config={
        "epochs": 1,
        "batch_size": batch_size,  # per device
        "steps_per_epoch": steps_per_epoch,
    },
    scaling_config=ScalingConfig(
        num_workers=num_workers,
        use_gpu=use_gpu,
        resources_per_worker={"GPU": 1, "CPU": cpus_per_worker},
    ),
    datasets=processed_datasets,
    #run_config=RunConfig(storage_path=storage_path),
    run_config=RunConfig(storage_path=storage_path,checkpoint_config=CheckpointConfig(num_to_keep=1)),
)

In [13]:
results = trainer.fit()

0,1
Current time:,2024-04-10 23:20:23
Running for:,00:01:22.25
Memory:,5.8/62.7 GiB

Trial name,# failures,error file
TorchTrainer_f3873_00000,1,/tmp/ray/session_2024-04-10_23-18-41_959806_49334/artifacts/2024-04-10_23-19-01/TorchTrainer_2024-04-10_23-19-01/driver_artifacts/TorchTrainer_f3873_00000_0_2024-04-10_23-19-01/error.txt

Trial name,status,loc
TorchTrainer_f3873_00000,ERROR,10.0.0.239:51932


2024-04-10 23:20:23,619	ERROR tune_controller.py:1332 -- Trial task failed for trial TorchTrainer_f3873_00000
Traceback (most recent call last):
  File "/home/novelty/miniconda3/envs/ray210/lib/python3.10/site-packages/ray/air/execution/_internal/event_manager.py", line 110, in resolve_future
    result = ray.get(future)
  File "/home/novelty/miniconda3/envs/ray210/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/home/novelty/miniconda3/envs/ray210/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/home/novelty/miniconda3/envs/ray210/lib/python3.10/site-packages/ray/_private/worker.py", line 2667, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
  File "/home/novelty/miniconda3/envs/ray210/lib/python3.10/site-packages/ray/_private/worker.py", line 864, in get_objects
    raise value.as_ins