Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 73 additions & 202 deletions dspy/clients/anyscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,66 +52,61 @@ def finetune_anyscale(
"""Start the finetune job."""
train_kwargs = train_kwargs or {}
assert "model" not in train_kwargs, "Model should not be in the train_kwargs"
assert anyscale.__version__ >= "0.24.65", "Anyscale version >= 0.24.65 is required to use the dataset upload feature"
assert all([x in train_kwargs for x in ["job_config_path", "llmforge_config_path"]]), "Both job_config_path and llmforge_config_path are required"
train_kwargs_copy = train_kwargs.copy()
train_kwargs_copy["model"] = model

logger.info("[Finetune] Starting training process...")

job_config_path = train_kwargs.get("job_config_path", None)
llmforge_config_path = train_kwargs.get("llmforge_config_path", None)
serve_config_path = train_kwargs.get("serve_config_path", None)

if train_method not in TRAINING_METHODS_ANYSCALE:
raise NotImplementedError(f"AnyScale can only support {TRAINING_METHODS_ANYSCALE} for the time being")

logger.info("[Finetune] Validating the dataset format...")
if not verify_dataset(train_data):
# TODO: Does AnyScale support text completion models?
err = "[Finetune] Error: Unable to verify that the dataset is in the correct format."
logger.error(err)
raise RuntimeError(err)

logger.info("[Finetune] Converting data to JSONL format...")
train_data_path = save_data(train_data, provider_name=PROVIDER_ANYSCALE)
logger.info("[Finetune] Submitting data to remote storage...")
remote_train_path, _ = submit_data(train_path=train_data_path)
logger.info(f"[Finetune] Data submitted. Remote train path: {remote_train_path}")

logger.info("[Finetune] Generating configuration files...")
_, compute_config = generate_config_files(train_path=remote_train_path, **train_kwargs_copy)

logger.info("[Finetune] Starting remote training...")
job_id = start_remote_training(compute_config=compute_config, **train_kwargs_copy)
job.job_id = job_id
logger.info(f"[Finetune] Remote training started. Job ID: {job_id}")
# TODO(Isaac): Figure out a better pattern
job_config_temp = yaml.safe_load(open(job_config_path, "r"))
remote_train_path = submit_data(train_path=train_data_path, job_config=job_config_temp)

job_config = generate_config_files(train_path=remote_train_path, llmforge_config_path=llmforge_config_path, job_config_path=job_config_path, model=model)

# Remove potential duplicate compute config

job.job_id = start_remote_training(job_config=job_config)

logger.info("[Finetune] Waiting for training to complete...")
wait_for_training(job.job_id)
logger.info("[Finetune] Training completed.")

logger.info("[Finetune] Retrieving model information...")
model_info = get_model_info(job.job_id)
logger.info(f"[Finetune] Model info retrieved: {model_info}")

# model_info[storage_uri] is a path to your cloud where the best(last if no eval) checkpoint weights are forwarded
storage_uri = model_info["storage_uri"]
logger.info(f"[Finetune] Copying LoRA weights from {storage_uri}...")
model_names, lora_dynamic_path = copy_lora_weights(storage_uri, model_info, job.job_id)
logger.info(f"[Finetune] LoRA weights copied. Model names: {model_names}")


logger.info("[Finetune] Setting result in future object...")
model_step_pairs = sorted([(model_name, int(model_name.split("-")[-1])) for model_name in model_names], key=lambda x: x[1])
last_model_checkpoint = model_step_pairs[-1][0]
logger.info("[Finetune] Training process completed successfully.")

logger.info("[Finetune] Updating model config with the proper dynamic path")
serve_config_path = train_kwargs.pop("serve_config_path", "serve_1B.yaml")
update_model_config(lora_dynamic_path, serve_config_path, job_id)
job.model_names = model_names
lora_dynamic_path = storage_uri.split(model)[0]
final_model_name = model + storage_uri.split(model)[1]

if serve_config_path:
update_serve_model_config(lora_dynamic_path, serve_config_path)
job.model_names = [final_model_name]

return last_model_checkpoint
return "openai/" + final_model_name

def wait_for_training(job_id):
"""Wait for the training to complete."""
anyscale.job.wait(id=job_id, timeout_s=18000)
logger.info("[Finetune] Waiting for training to complete...")
anyscale.job.wait(id=job_id)
logger.info("[Finetune] Training completed.")


def update_model_config(lora_dynamic_path: str, serve_config_path: str, job_id: str):
def update_serve_model_config(lora_dynamic_path: str, serve_config_path: str):
"""Update the model config storage location with the job_id."""
with open(serve_config_path, "r") as f:
serve_config = yaml.safe_load(f)
Expand All @@ -121,15 +116,15 @@ def update_model_config(lora_dynamic_path: str, serve_config_path: str, job_id:
with open(model_config_location, "r") as f:
model_config = yaml.safe_load(f)

dynamic_path_until_job_id = lora_dynamic_path.split(job_id)[0] + job_id
model_config["lora_config"]["dynamic_lora_loading_path"] = dynamic_path_until_job_id
model_config["lora_config"]["dynamic_lora_loading_path"] = lora_dynamic_path

with open(model_config_location, "w") as f:
yaml.safe_dump(model_config, f)


def verify_dataset(dataset: List[dict[str, Any]]) -> bool:
"""Verify the training arguments before starting training."""
logger.info("[Finetune] Verifying dataset...")
dataset_validation = openai_data_validation(dataset)

if dataset_validation:
Expand All @@ -139,125 +134,48 @@ def verify_dataset(dataset: List[dict[str, Any]]) -> bool:
return True


def submit_data(train_path: str):
"""Upload the data to the Workspace cloud storage."""
storage = os.environ['ANYSCALE_ARTIFACT_STORAGE']
def submit_data(train_path: str, job_config: Dict[str, Any]):
"""Upload the data to cloud storage."""
logger.info("[Finetune] Submitting data to remote storage...")
dataset_name = f"dataset-{job_config.get('name', 'dspy-llmforge-fine-tuning-job')}"
train_path_remote = anyscale.llm.dataset.upload(train_path, name=dataset_name, cloud=job_config.get("cloud", None)).storage_uri
logger.info(f"[Finetune] Data submitted. Remote train path: {train_path_remote}")

return train_path_remote


def generate_config_files(train_path: str, llmforge_config_path: str, job_config_path: str, model: str):
assert llmforge_config_path, "LLMForge config is required to generate the config files"
assert job_config_path, "Job config is required to start the finetuning job"

datasets = {"train": train_path}

fine_tuning_file_ids = {}
for name, path in datasets.items():
num_items = len(read_jsonl(path))
logger.info(f"Number of items in {name} data: {num_items}")

remote_path = os.path.join(storage, path.split("/")[-1])
logger.info(f"Uploading {name} data to S3 at {remote_path}")
if remote_path[:2] == "s3":
os.system(f"aws s3 cp {path} {remote_path}")
elif remote_path[:2] == "gs":
os.system(f"gcloud storage cp {path} {remote_path}")
else:
os.system(f"cp {path} {remote_path}")
logger.info(f"Copied {path} to {remote_path}")
fine_tuning_file_ids[name] = remote_path
llmforge_config = yaml.safe_load(open(llmforge_config_path, "r"))
job_config_dict = yaml.safe_load(open(job_config_path, "r"))

return fine_tuning_file_ids["train"], fine_tuning_file_ids.get("val", None)


def generate_config_files(train_path: str, **kwargs):
base_model_yaml_path = kwargs.get("train_config_yaml", None)
assert kwargs["model"] is not None, "Model is required to generate the config files"

use_lora = kwargs.get("use_lora", False)
example_dir = ""
lora_path = "configs/training/lora" if use_lora else "configs/training/full_param"


if not base_model_yaml_path:
def get_yaml_config(model_name):
if "llama" in model_name.lower():
if "70b" in model_name:
return "llama-3-70b.yaml"
elif "13b" in model_name:
return "llama-3-70b.yaml"
else:
return "llama-3-8b.yaml"
elif "mistral" in model_name.lower():
if "mixtral" in model_name.lower():
return "mixtral-8x7b.yaml"
else:
return "mistral-7b.yaml"
else:
raise RuntimeError("No default yaml found for the model")

default_model_yaml_path = get_yaml_config(kwargs["model"])
base_model_yaml_path = os.path.join(example_dir, lora_path, default_model_yaml_path)
logger.info(f"Using default yaml template for model: {base_model_yaml_path}")

model_config_data = yaml.safe_load(open(base_model_yaml_path, "r"))
model_config_data.update(kwargs.get("hyperparameters", {}))
llmforge_config["model_id"] = model
llmforge_config["train_path"] = train_path
llmforge_config = {k: v for k, v in llmforge_config.items() if v is not None}

logger.info(f"Model config data: {llmforge_config}")
yaml.safe_dump(llmforge_config, open(llmforge_config_path, "w"))

if not job_config_dict.get("env_vars", None):
job_config_dict["env_vars"] = {}

for env_var in ["HF_TOKEN", "HF_HOME", "WANDB_API_KEY"]:
if env_var not in job_config_dict["env_vars"] and os.environ.get(env_var, None):
job_config_dict["env_vars"][env_var] = os.environ[env_var]

model_config_data["model_id"] = kwargs["model"]

custom_modifications = {
"model_id": kwargs["model"],
"train_path": train_path,
"logger": {
"provider": "wandb",
},
"num_checkpoints_to_keep": 10
}
if kwargs.get("output_dir", None):
custom_modifications["output_dir"] = kwargs["output_dir"]

model_config_data.update(custom_modifications)
model_config_data = {k: v for k, v in model_config_data.items() if v is not None}

def freeze(d):
if isinstance(d, dict):
return tuple(sorted((key, freeze(value)) for key, value in d.items()))
elif isinstance(d, list):
return tuple(freeze(value) for value in sorted(d))
elif isinstance(d, set):
return tuple(freeze(value) for value in sorted(d))
return d

def hash_dict(d):
return hash(freeze(d))
dict_sorted_hash = hash_dict(model_config_data)
if dict_sorted_hash < 0:
dict_sorted_hash = -dict_sorted_hash
filename = f"model_config_dspy_{dict_sorted_hash}.yaml"
logger.info(f"Model config data: {model_config_data}")
yaml.safe_dump(model_config_data, open(filename, "w"))

ft_path = os.path.join("utils", "ft.py")

compute_config_dict = {
"name": "dspy-llmforge-fine-tuning-job",
"entrypoint": f"llmforge anyscale finetune {filename}",
"working_dir": ".",
"image_uri": "localhost:5555/anyscale/llm-forge:0.5.6",
"requirements": [
"wandb",
],
"env_vars": {
"WANDB_API_KEY": os.environ.get("WANDB_API_KEY", ""),
"HF_TOKEN": os.environ.get("HF_TOKEN", ""),
"HF_HOME": os.environ.get("HF_HOME", ""),
}
}
compute_config_kwargs = kwargs.get("compute_config", {})
compute_config_dict.update(compute_config_kwargs)
compute_config = JobConfig(**compute_config_dict)

job_runner_config_path = kwargs.get("compute_yaml_path", "job_runner_config.yaml")

return job_runner_config_path, compute_config


def start_remote_training(compute_config, **kwargs) -> str:
job_id: str = anyscale.job.submit(compute_config)

job_config = JobConfig(**job_config_dict)


return job_config


def start_remote_training(job_config) -> str:
logger.info("[Finetune] Starting remote training...")
job_id: str = anyscale.job.submit(job_config)
logger.info(f"[Finetune] Remote training started. Job ID: {job_id}")
return job_id


Expand All @@ -267,57 +185,10 @@ def wait_for_training(job_id):


def get_model_info(job_id):
return anyscale.llm.model.get(job_id=job_id).to_dict()


def copy_lora_weights(storage_uri, model_info, job_id):
try:
from google.cloud import storage

storage_client = storage.Client()

bucket_name = storage_uri.split('/')[2]
source_folder = '/'.join(storage_uri.split('/')[3:-1])
logger.info(f"Source folder: {source_folder}")

bucket = storage_client.bucket(bucket_name)

blobs = bucket.list_blobs(prefix=source_folder)

subfolders = set()
for blob in blobs:
if '/' in blob.name[len(source_folder):]:
subfolder = blob.name.split('/')[:-1]
subfolders.add('/'.join(subfolder))

base_model_id = model_info["base_model_id"]
lora_dynamic_path = f"dspy/lora_weights/{job_id}/{base_model_id}"

model_names = []
for subfolder in subfolders:
subfolder_name = subfolder.split('/')[-1]
destination_folder = f"{lora_dynamic_path}:{subfolder_name}"
if subfolder_name.startswith("epoch"):
model_names.append("/".join(destination_folder.split("/")[-2:]))
else:
continue

subfolder_blobs = bucket.list_blobs(prefix=subfolder)

for blob in subfolder_blobs:
source_blob = bucket.blob(blob.name)
destination_blob_name = f"{destination_folder}/{blob.name.split('/')[-1]}"
bucket.copy_blob(source_blob, bucket, destination_blob_name)
logger.info(f"Copied {source_blob.name} to {destination_blob_name}")

logger.info(f"All subfolders copied to: gs://{bucket_name}/{lora_dynamic_path}")
completed_path = f"gs://{bucket_name}/{lora_dynamic_path}"
return model_names, completed_path

except Exception as e:
logger.error(f"An error occurred: {str(e)}")
raise e

logger.info("[Finetune] Retrieving model information from Anyscale Models SDK...")
info = anyscale.llm.model.get(job_id=job_id).to_dict()
logger.info(f"[Finetune] Model info retrieved: {info}")
return info

def read_jsonl(filename):
with open(filename, "r") as f:
Expand Down
2 changes: 2 additions & 0 deletions dspy/clients/finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
from dspy.utils.logging import logger

import ujson
from datasets.fingerprint import Hasher
Expand Down Expand Up @@ -111,6 +112,7 @@ def save_data(
provider_name: Optional[str] = None,
) -> str:
"""Save the fine-tuning data to a file."""
logger.info("[Finetune] Converting data to JSONL format...")
# Construct the file name based on the data hash
hash = Hasher.hash(data)
file_name = f"{hash}.jsonl"
Expand Down
Loading