In [1]:
! pip install --user -q google-cloud-aiplatform[ray]==1.56.0 \
                        ray[data,train,tune,serve]==2.9.3 \
                        datasets \
                        evaluate \
                        accelerate \
                        transformers \
                        torch \
                        numpy \
                        pandas \
                        deepspeed

In [None]:
# ! pip install "datasets" "evaluate" "accelerate==0.18.0" "transformers==4.26.0" "torch>=1.12.0" "deepspeed==0.12.3"


In [2]:
import pandas as pd
pd.__version__

'2.1.4'

In [3]:
import time
import numpy as np
import joblib
import pandas as pd
import seaborn as sns
import xgboost as xgb

from sklearn.preprocessing import MinMaxScaler, OneHotEncoder, LabelEncoder
from sklearn.compose import ColumnTransformer,make_column_transformer
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split, KFold, cross_val_score
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score, confusion_matrix,confusion_matrix,classification_report
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier

from google.cloud import bigquery
from google.cloud import aiplatform
from google.cloud.aiplatform.preview import vertex_ray
import ray
from ray.runtime_env import RuntimeEnv
from ray.air.config import RunConfig
from ray.air import CheckpointConfig, ScalingConfig
from ray.util.joblib import register_ray

In [8]:

PROJECT_NBR = "721521243942"
PROJECT_ID = "ai-hangsik"
REGION="us-central1"
RAY_CLUSTER_NM = "cluster-20250216-085207"


In [9]:
model_name = "EleutherAI/gpt-j-6B"
use_gpu = True
num_workers = 3
cpus_per_worker = 6
# local_dir = "/tmp/llm_checkpoints/"

In [10]:
aiplatform.init(project=PROJECT_ID, location=REGION)
RAY_ADDRESS=f"vertex_ray://projects/{PROJECT_NBR}/locations/{REGION}/persistentResources/{RAY_CLUSTER_NM}"
RAY_ADDRESS

'vertex_ray://projects/721521243942/locations/us-central1/persistentResources/cluster-20250216-085207'

In [29]:

# import joblib, sys
# sys.modules['sklearn.externals.joblib'] = joblib
# from ray.util.joblib import register_ray
# register_ray()


ray.shutdown()



In [12]:
RUNTIME_ENV = {
  "pip": [
      "google-cloud-aiplatform[ray]>=1.56.0",
      "ray[data]==2.9.3",
      "ray[train]==2.9.3",
      "ray[tune]==2.9.3",
      "datasets",
      "evaluate",
      "accelerate==0.18.0",
      "transformers==4.26.0",
      "torch>=1.12.0",
      "deepspeed==0.12.3",
  ],
}

ray.init(address=RAY_ADDRESS,runtime_env=RUNTIME_ENV)

[Ray on Vertex AI]: Cluster State = State.RUNNING


SIGTERM handler is not set because current thread is not the main thread.


0,1
Python version:,3.10.14
Ray version:,2.9.3
Vertex SDK version:,1.56.0
Dashboard:,b35d9e635d081c04-dot-us-central1.aiplatform-training.googleusercontent.com
Interactive Terminal Uri:,d66a42511c5883bb-dot-us-central1.aiplatform-training.googleusercontent.com
Cluster Name:,cluster-20250216-085207


Put failed:
Put failed:
Put failed:
Put failed:


In [28]:
"""
Minimal Ray Train + DeepSpeed example adapted from
https://github.com/huggingface/accelerate/blob/main/examples/nlp_example.py

Fine-tune a BERT model with DeepSpeed ZeRO-3 and Ray Train and Ray Data
"""

from tempfile import TemporaryDirectory

import deepspeed
import torch
from datasets import load_dataset
from deepspeed.accelerator import get_accelerator
from torchmetrics.classification import BinaryAccuracy, BinaryF1Score
from transformers import AutoModelForSequenceClassification, AutoTokenizer, set_seed

import ray
import ray.train
from ray.train import Checkpoint, DataConfig, ScalingConfig
from ray.train.torch import TorchTrainer


def train_func(config):
    """Your training function that will be launched on each worker."""

    # Unpack training configs
    set_seed(config["seed"])
    num_epochs = config["num_epochs"]
    train_batch_size = config["train_batch_size"]
    eval_batch_size = config["eval_batch_size"]

    # Instantiate the Model
    model = AutoModelForSequenceClassification.from_pretrained(
        "bert-base-cased", return_dict=True
    )

    # Prepare Ray Data Loaders
    # ====================================================
    train_ds = ray.train.get_dataset_shard("train")
    eval_ds = ray.train.get_dataset_shard("validation")

    tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

    def collate_fn(batch):
        outputs = tokenizer(
            list(batch["sentence1"]),
            list(batch["sentence2"]),
            truncation=True,
            padding="longest",
            return_tensors="pt",
        )
        outputs["labels"] = torch.LongTensor(batch["label"])
        return outputs

    train_dataloader = train_ds.iter_torch_batches(
        batch_size=train_batch_size, collate_fn=collate_fn
    )
    eval_dataloader = eval_ds.iter_torch_batches(
        batch_size=eval_batch_size, collate_fn=collate_fn
    )
    # ====================================================

    # Initialize DeepSpeed Engine
    model, optimizer, _, lr_scheduler = deepspeed.initialize(
        model=model,
        model_parameters=model.parameters(),
        config=deepspeed_config,
    )
    device = get_accelerator().device_name(model.local_rank)
    
    # Initialize Evaluation Metrics
    f1 = BinaryF1Score().to(device)
    accuracy = BinaryAccuracy().to(device)

    for epoch in range(num_epochs):
        # Training
        model.train()
        for batch in train_dataloader:
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(**batch)
            loss = outputs.loss
            model.backward(loss)
            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()

        # Evaluation
        model.eval()
        for batch in eval_dataloader:
            batch = {k: v.to(device) for k, v in batch.items()}
            with torch.no_grad():
                outputs = model(**batch)
            predictions = outputs.logits.argmax(dim=-1)

            f1.update(predictions, batch["labels"])
            accuracy.update(predictions, batch["labels"])

        # torchmetrics will aggregate the metrics across all workers
        eval_metric = {
            "f1": f1.compute().item(),
            "accuracy": accuracy.compute().item(),
        }
        f1.reset()
        accuracy.reset()

        if model.global_rank == 0:
            print(f"epoch {epoch}:", eval_metric)

        # Report checkpoint and metrics to Ray Train
        # ==============================================================
        with TemporaryDirectory() as tmpdir:
            # Each worker saves its own checkpoint shard
            model.save_checkpoint(tmpdir)

            # Ensure all workers finished saving their checkpoint shard
            torch.distributed.barrier()

            # Report checkpoint shards from each worker in parallel
            ray.train.report(
                metrics=eval_metric, checkpoint=Checkpoint.from_directory(tmpdir)
            )
        # ==============================================================


if __name__ == "__main__":
    deepspeed_config = {
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": 2e-5,
            },
        },
        "scheduler": {"type": "WarmupLR", "params": {"warmup_num_steps": 100}},
        "fp16": {"enabled": True},
        "bf16": {"enabled": False},  # Turn this on if using AMPERE GPUs.
        "zero_optimization": {
            "stage": 3,
            "offload_optimizer": {
                "device": "none",
            },
            "offload_param": {
                "device": "none",
            },
        },
        "gradient_accumulation_steps": 1,
        "gradient_clipping": True,
        "steps_per_print": 10,
        "train_micro_batch_size_per_gpu": 16,
        "wall_clock_breakdown": False,
    }

    training_config = {
        "seed": 42,
        "num_epochs": 3,
        "train_batch_size": 16,
        "eval_batch_size": 32,
        "deepspeed_config": deepspeed_config,
    }

    # Prepare Ray Datasets
    hf_datasets = load_dataset("glue", "mrpc")
    ray_datasets = {
        "train": ray.data.from_huggingface(hf_datasets["train"]),
        "validation": ray.data.from_huggingface(hf_datasets["validation"]),
    }

    trainer = TorchTrainer(
        train_func,
        train_loop_config=training_config,
        scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
        datasets=ray_datasets,
        dataset_config=DataConfig(datasets_to_split=["train", "validation"]),
        # If running in a multi-node cluster, this is where you
        # should configure the run's persistent storage that is accessible
        # across all worker nodes.
        # run_config=ray.train.RunConfig(storage_path="s3://..."),
    )

    result = trainer.fit()

    # Retrieve the best checkponints from results
    result.best_checkpoints

[2025-02-16 01:01:56,236] [INFO] [real_accelerator.py:158:get_accelerator] Setting ds_accelerator to cuda (auto detect)


I0000 00:00:1739667716.187187 3375329 fork_posix.cc:75] Other threads are currently calling into gRPC, skipping fork() handlers
I0000 00:00:1739667716.215294 3375329 fork_posix.cc:75] Other threads are currently calling into gRPC, skipping fork() handlers


In [11]:
# THIS SHOULD BE HIDDEN IN DOCS AND ONLY RAN IN CI
# Download the model from our S3 mirror as it's faster

# import ray
# import subprocess
# import ray.util.scheduling_strategies


# def force_on_node(node_id: str, remote_func_or_actor_class):
#     scheduling_strategy = ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
#         node_id=node_id, soft=False
#     )
#     options = {"scheduling_strategy": scheduling_strategy}
#     return remote_func_or_actor_class.options(**options)


# def run_on_every_node(remote_func_or_actor_class, **remote_kwargs):
#     refs = []
#     for node in ray.nodes():
#         if node["Alive"] and node["Resources"].get("GPU", None):
#             refs.append(
#                 force_on_node(node["NodeID"], remote_func_or_actor_class).remote(
#                     **remote_kwargs
#                 )
#             )
#     return ray.get(refs)


# @ray.remote(num_gpus=1)
# def download_model():
#     from transformers.utils.hub import TRANSFORMERS_CACHE

#     path = os.path.expanduser(
#         os.path.join(TRANSFORMERS_CACHE, "models--EleutherAI--gpt-j-6B")
#     )
    
#     # subprocess.run(["mkdir", "-p", os.path.join(path, "snapshots", "main")])
#     # subprocess.run(["mkdir", "-p", os.path.join(path, "refs")])

#     if os.path.exists(path):
#         return
    
#     subprocess.run(
#         [
#             "gsutil",
#             "-m",
#             "cp",
#             "gs://sllm_checkpoints/EleutherAI/gpt-j-6b/",
#             os.path.join(path, "snapshots", "main"),
#         ]
#     )
#     # with open(os.path.join(path, "snapshots", "main", "hash"), "r") as f:
#     #     f_hash = f.read().strip()
#     with open(os.path.join(path), "w") as f:
#         f.write(f_hash)
    
#     # os.rename(
#     #     os.path.join(path, "snapshots", "main"), os.path.join(path, "snapshots", f_hash)
#     # )

# _ = run_on_every_node(download_model)

In [23]:
from datasets import load_dataset

print("Loading tiny_shakespeare dataset")
current_dataset = load_dataset("tiny_shakespeare")
current_dataset

Loading tiny_shakespeare dataset


DatasetDict({
    train: Dataset({
        features: ['text'],
        num_rows: 1
    })
    validation: Dataset({
        features: ['text'],
        num_rows: 1
    })
    test: Dataset({
        features: ['text'],
        num_rows: 1
    })
})

In [22]:
hf_dataset = load_dataset("wikitext", "wikitext-2-raw-v1")
hf_dataset

README.md:   0%|          | 0.00/10.5k [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/733k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/6.36M [00:00<?, ?B/s]

validation-00000-of-00001.parquet:   0%|          | 0.00/657k [00:00<?, ?B/s]

Generating test split:   0%|          | 0/4358 [00:00<?, ? examples/s]

Generating train split:   0%|          | 0/36718 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/3760 [00:00<?, ? examples/s]

DatasetDict({
    test: Dataset({
        features: ['text'],
        num_rows: 4358
    })
    train: Dataset({
        features: ['text'],
        num_rows: 36718
    })
    validation: Dataset({
        features: ['text'],
        num_rows: 3760
    })
})

In [26]:
import ray.data

@ray.remote
def ray_data_task():
 
    datasets = {
        "train": ray.data.from_huggingface(hf_dataset["train"]),
        "validation": ray.data.from_huggingface(hf_dataset["validation"]),
    }
return datasets

ray_datasets = ray.get(ray_data_task.remote())

RuntimeError: Global node is not initialized.

In [10]:
import os
from typing import Dict

import torch
from filelock import FileLock
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.transforms import Normalize, ToTensor
from tqdm import tqdm

import ray.train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

def get_dataloaders(batch_size):
    # Transform to normalize the input images
    transform = transforms.Compose([ToTensor(), Normalize((0.5,), (0.5,))])

    with FileLock(os.path.expanduser("~/data.lock")):
        # Download training data from open datasets
        training_data = datasets.FashionMNIST(
            root="~/data",
            train=True,
            download=True,
            transform=transform,
        )

        # Download test data from open datasets
        test_data = datasets.FashionMNIST(
            root="~/data",
            train=False,
            download=True,
            transform=transform,
        )

    # Create data loaders
    train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_data, batch_size=batch_size)

    return train_dataloader, test_dataloader


# Model Definition
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(512, 10),
            nn.ReLU(),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


def train_func_per_worker(config: Dict):
    lr = config["lr"]
    epochs = config["epochs"]
    batch_size = config["batch_size_per_worker"]

    # Get dataloaders inside the worker training function
    train_dataloader, test_dataloader = get_dataloaders(batch_size=batch_size)

    # [1] Prepare Dataloader for distributed training
    # Shard the datasets among workers and move batches to the correct device
    # =======================================================================
    train_dataloader = ray.train.torch.prepare_data_loader(train_dataloader)
    test_dataloader = ray.train.torch.prepare_data_loader(test_dataloader)

    model = NeuralNetwork()

    # [2] Prepare and wrap your model with DistributedDataParallel
    # Move the model to the correct GPU/CPU device
    # ============================================================
    model = ray.train.torch.prepare_model(model)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)

    # Model training loop
    for epoch in range(epochs):
        if ray.train.get_context().get_world_size() > 1:
            # Required for the distributed sampler to shuffle properly across epochs.
            train_dataloader.sampler.set_epoch(epoch)

        model.train()
        for X, y in tqdm(train_dataloader, desc=f"Train Epoch {epoch}"):
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        model.eval()
        test_loss, num_correct, num_total = 0, 0, 0
        with torch.no_grad():
            for X, y in tqdm(test_dataloader, desc=f"Test Epoch {epoch}"):
                pred = model(X)
                loss = loss_fn(pred, y)

                test_loss += loss.item()
                num_total += y.shape[0]
                num_correct += (pred.argmax(1) == y).sum().item()

        test_loss /= len(test_dataloader)
        accuracy = num_correct / num_total

        # [3] Report metrics to Ray Train
        # ===============================
        ray.train.report(metrics={"loss": test_loss, "accuracy": accuracy})


def train_fashion_mnist(num_workers=3, use_gpu=False):
    global_batch_size = 32

    train_config = {
        "lr": 1e-3,
        "epochs": 5,
        "batch_size_per_worker": global_batch_size // num_workers,
    }

    # Configure computation resources
    scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=use_gpu)

    # Initialize a Ray TorchTrainer
    trainer = TorchTrainer(
        train_loop_per_worker=train_func_per_worker,
        train_loop_config=train_config,
        scaling_config=scaling_config,
    )

    # [4] Start distributed training
    # Run `train_func_per_worker` on all workers
    # =============================================
    result = trainer.fit()
    print(f"Training result: {result}")


if __name__ == "__main__":
    train_fashion_mnist(num_workers=3, use_gpu=True)

Training result: Result(
  metrics={'loss': 0.4006380294505395, 'accuracy': 0.8575284943011398},
  path='/root/ray_results/TorchTrainer_2025-02-15_16-47-40/TorchTrainer_92795_00000_0_2025-02-15_16-47-41',
  filesystem='local',
  checkpoint=None
)
