Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
harborn committed May 22, 2024
1 parent 30b3204 commit bc1b511
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 103 deletions.
173 changes: 78 additions & 95 deletions llm_on_ray/finetune/finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
from typing import Any, Dict, Union, Optional

import torch

import datasets
import transformers

from peft import get_peft_model, LoraConfig
import deltatuner

import ray
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
Expand All @@ -47,80 +50,6 @@
use_habana = False


def get_accelerate_environment_variable(config: Dict[str, Any]) -> dict:
device = config["Training"]["device"]
accelerate_mode = config["Training"]["accelerate_mode"]
mixed_precision = config["Training"]["mixed_precision"]
mode_env_vars = {
"cpu": {
"DDP": {
"ACCELERATE_USE_CPU": "true",
"ACCELERATE_USE_IPEX": "true",
"ACCELERATE_MIXED_PRECISION": mixed_precision,
}
},
"gpu": {
"DDP": {
"ACCELERATE_USE_CPU": "false",
"ACCELERATE_USE_XPU": "true",
"ACCELERATE_USE_IPEX": "true",
"ACCELERATE_MIXED_PRECISION": mixed_precision,
},
"FSDP": {
"ACCELERATE_USE_CPU": "false",
"ACCELERATE_USE_XPU": "true",
"ACCELERATE_USE_IPEX": "true",
"ACCELERATE_USE_FSDP": "true",
"FSDP_SHARDING_STRATEGY": "1",
"FSDP_OFFLOAD_PARAMS": "false",
"FSDP_AUTO_WRAP_POLICY": "NO_WRAP",
"FSDP_BACKWARD_PREFETCH": "BACKWARD_PRE",
"FSDP_STATE_DICT_TYPE": "SHARDED_STATE_DICT",
"FSDP_FORWARD_PREFETCH": "false",
"FSDP_USE_ORIG_PARAMS": "false",
"FSDP_SYNC_MODULE_STATES": "true",
"ACCELERATE_MIXED_PRECISION": mixed_precision,
},
"DEEPSPEED": {
"ACCELERATE_USE_CPU": "false",
"ACCELERATE_USE_XPU": "true",
"ACCELERATE_USE_IPEX": "true",
"ACCELERATE_USE_DEEPSPEED": "true",
"ACCELERATE_MIXED_PRECISION": mixed_precision,
},
},
"hpu": {
"DDP": {
"ACCELERATE_USE_CPU": "false",
"ACCELERATE_USE_XPU": "false",
"ACCELERATE_USE_IPEX": "false",
"ACCELERATE_MIXED_PRECISION": mixed_precision,
},
"DEEPSPEED": {
"ACCELERATE_USE_CPU": "false",
"ACCELERATE_USE_XPU": "false",
"ACCELERATE_USE_IPEX": "false",
"ACCELERATE_USE_DEEPSPEED": "true",
"ACCELERATE_MIXED_PRECISION": mixed_precision,
},
},
}
if device not in mode_env_vars or accelerate_mode not in mode_env_vars[device]:
supported_mode_info = ""
for k in mode_env_vars.keys():
supported_mode_info += k + ":["
for m in mode_env_vars[k]:
supported_mode_info += m + ","
supported_mode_info = supported_mode_info[:-1]
supported_mode_info += "],"
supported_mode_info = supported_mode_info[:-1]

raise ValueError(
f"device {device} and accelerate mode {accelerate_mode} not supported. supported device and accelerate mode is {supported_mode_info}"
)
return mode_env_vars[device][accelerate_mode]


def convert_to_training_args(cls, config):
device = config["Training"]["device"]
accelerate_mode = config["Training"]["accelerate_mode"]
Expand Down Expand Up @@ -172,15 +101,6 @@ def convert_to_training_args(cls, config):
return cls(**args)


def get_device_environment_variable(device):
if device == "hpu":
return {
"HABANA_VISIBLE_DEVICES": "all",
"RAY_EXPERIMENTAL_NOSET_HABANA_VISIBLE_MODULES": "true",
}
return {}


def convert_dtype(dtype: str) -> Optional[torch.dtype]:
supported_dtypes = {
"fp16": torch.float16,
Expand All @@ -190,6 +110,75 @@ def convert_dtype(dtype: str) -> Optional[torch.dtype]:
return supported_dtypes[dtype]


def load_tokenizer(config: Dict):
name = config.get("name")
load_config = config.get("config", {})
tokenizer = transformers.AutoTokenizer.from_pretrained(name, **load_config)
return tokenizer


def load_datasets(config: Dict):
def local_load(name, **load_config):
if os.path.isfile(name):
file = os.path.basename(os.path.abspath(name))
path = os.path.dirname(os.path.abspath(name))
dataset = datasets.load_dataset(path, data_files=file, **load_config)
else:
dataset = datasets.load_dataset(name, **load_config)
return dataset["train"]

name = config.get("name")
load_from_disk = config.get("load_from_disk", False)
validation_file = config.get("validation_file", None)
validation_split_percentage = config.get("validation_split_percentage", 0)

if name is not None and os.path.exists(name):
train_dataset = local_load(name)
if validation_file is not None:
validation_dataset = local_load(validation_file)
return datasets.DatasetDict({"train": train_dataset, "validation": validation_dataset})
if validation_split_percentage / 100 > 0.0 and validation_split_percentage / 100 < 1.0:
datasets_dict = train_dataset.train_test_split(
test_size=validation_split_percentage / 100
)
datasets_dict["validation"] = datasets_dict["test"]
return datasets_dict
return datasets.DatasetDict({"train": train_dataset})
else:
load_config = config.get("load_config", {})
if load_from_disk:
return datasets.load_from_disk(name, **load_config)
else:
return datasets.load_dataset(name, **load_config)


def load_model(config: Dict):
name = config.get("name")
model_dtype = config.get("dtype")
model_config = config.get("config", {})
model = transformers.AutoModelForCausalLM.from_pretrained(
name, torch_dtype=model_dtype, **model_config
)

lora_config = config.get("lora_config", None)
if lora_config:
peft_config = LoraConfig(**lora_config)
model = get_peft_model(model, peft_config)
deltatuner_config = config.get("deltatuner_config", None)
if deltatuner_config:
model = deltatuner.optimize(model, **deltatuner_config)

enable_gradient_checkpointing = config.get("enable_gradient_checkpointing")
if enable_gradient_checkpointing:
model.enable_input_require_grads()
model.gradient_checkpointing_enable()
model.config.use_cache = False

model.to(dtype=model_dtype, device=config.get("device"))

return model


def train_func(config: Dict[str, Any]):
os.chdir(config["cwd"])

Expand All @@ -206,15 +195,10 @@ def train_func(config: Dict[str, Any]):
if seed is not None:
set_seed(seed)

tokenizer = common.tokenizer.Tokenizer.registory.get("HuggingFaceTokenizer")()(
config={
"name": tokenizer_name,
"config": config["General"]["config"],
}
)
tokenizer = load_tokenizer({"name": tokenizer_name, "config": config["General"]["config"]})

datasets = common.dataset.Dataset.registory.get("HuggingfaceDataset")()(
config={
datasets = load_datasets(
{
"name": dataset_file,
"validation_file": config["Dataset"]["validation_file"],
"validation_split_percentage": config["Dataset"]["validation_split_percentage"],
Expand All @@ -234,8 +218,8 @@ def train_func(config: Dict[str, Any]):
)
tokenized_datasets = dataprocesser.tokenize_dataset(tokenizer, datasets)

model = common.model.Model.registory.get("HuggingFaceModelForCausalLM")()(
config={
model = load_model(
{
"name": base_model,
"dtype": convert_dtype(config["Training"].get("mixed_precision", "no")),
"device": torch.device(device),
Expand Down Expand Up @@ -341,7 +325,6 @@ def main(external_config=None):
"CCL_ZE_IPC_EXCHANGE": "sockets",
"CCL_WORKER_COUNT": str(ccl_worker_count),
"CCL_LOG_LEVEL": "info",
"WORLD_SIZE": str(num_training_workers),
"FI_TCP_IFACE": "lo",
"FI_PROVIDER": "tcp",
}
Expand Down
4 changes: 2 additions & 2 deletions llm_on_ray/pretrain/plugin/hf_pretrainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,14 @@ def train(self):
model_config = self.config.get("model")
model_config["deepspeed_zero_stage"] = training_args.deepspeed_plugin.zero_stage
if model_config:
self.model = common.load_model(model_config)
self.model = common.load.load_model(model_config)
else:
common.logger.warn("No internal model plugin provided")
self.model.train()

tokenizer_config = self.config.get("tokenizer")
if tokenizer_config:
self.tokenizer = common.load_tokenizer(tokenizer_config)
self.tokenizer = common.load.load_tokenizer(tokenizer_config)
else:
common.logger.warn("No internal tokenizer plugin provided")

Expand Down
12 changes: 6 additions & 6 deletions llm_on_ray/pretrain/pretrain.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def train_func(config: Dict[str, Any]):
initializer_config = config.get("initializer")
if initializer_config:
try:
initializer = common.get_initializer(initializer_config)
initializer = common.load.get_initializer(initializer_config)
initializer.init()
except Exception as e:
common.logger.critical(e, exc_info=True)
Expand All @@ -77,32 +77,32 @@ def train_func(config: Dict[str, Any]):

datasets_config = config.get("datasets")
if datasets_config:
datasets = common.load_dataset(datasets_config)
datasets = common.load.load_dataset(datasets_config)
common.logger.info(" ")
else:
common.logger.warn("No datasets plugin provided, use the built-in datasets of trainer")

tokenizer_config = config.get("tokenizer")
if tokenizer_config:
tokenizer = common.load_tokenizer(tokenizer_config)
tokenizer = common.load.load_tokenizer(tokenizer_config)
else:
common.logger.warn("No tokenizer plugin provided, use the built-in tokenizer of trainer")

model_config = config.get("model")
if model_config:
model = common.load_model(model_config)
model = common.load.load_model(model_config)
else:
common.logger.warn("No model plugin provided, use the built-in model of trainer")

optimizer_config = config.get("optimizer")
if optimizer_config:
optimizer = common.load_optimizer(model, config.get("optimizer"))
optimizer = common.load.load_optimizer(model, config.get("optimizer"))
else:
common.logger.warn("No optimizer plugin provided, use the built-in optimizer of trainer")

trainer_config = config.get("trainer")
if trainer_config:
trainer = common.get_trainer(config.get("trainer"))
trainer = common.load.get_trainer(config.get("trainer"))

try:
trainer.prepare(model, tokenizer, datasets, optimizer, accelerator)
Expand Down

0 comments on commit bc1b511

Please sign in to comment.