From 7f75a83a186dcd55252cd5aa3f30bf28e0bfb507 Mon Sep 17 00:00:00 2001 From: Isaac Miller Date: Mon, 21 Oct 2024 21:47:52 +0000 Subject: [PATCH 1/8] move kwargs into anyscale config and remove checkpointing --- dspy/clients/anyscale.py | 128 +++++++++++---------------------------- dspy/clients/finetune.py | 2 + 2 files changed, 39 insertions(+), 91 deletions(-) diff --git a/dspy/clients/anyscale.py b/dspy/clients/anyscale.py index 2c599ec7f9..32acbf6276 100644 --- a/dspy/clients/anyscale.py +++ b/dspy/clients/anyscale.py @@ -55,63 +55,49 @@ def finetune_anyscale( train_kwargs_copy = train_kwargs.copy() train_kwargs_copy["model"] = model - logger.info("[Finetune] Starting training process...") if train_method not in TRAINING_METHODS_ANYSCALE: raise NotImplementedError(f"AnyScale can only support {TRAINING_METHODS_ANYSCALE} for the time being") - logger.info("[Finetune] Validating the dataset format...") if not verify_dataset(train_data): # TODO: Does AnyScale support text completion models? err = "[Finetune] Error: Unable to verify that the dataset is in the correct format." logger.error(err) raise RuntimeError(err) - logger.info("[Finetune] Converting data to JSONL format...") train_data_path = save_data(train_data, provider_name=PROVIDER_ANYSCALE) - logger.info("[Finetune] Submitting data to remote storage...") remote_train_path, _ = submit_data(train_path=train_data_path) - logger.info(f"[Finetune] Data submitted. Remote train path: {remote_train_path}") - logger.info("[Finetune] Generating configuration files...") _, compute_config = generate_config_files(train_path=remote_train_path, **train_kwargs_copy) - - logger.info("[Finetune] Starting remote training...") - job_id = start_remote_training(compute_config=compute_config, **train_kwargs_copy) - job.job_id = job_id - logger.info(f"[Finetune] Remote training started. Job ID: {job_id}") + # Remove potential duplicate compute config + train_kwargs_copy.pop("compute_config") + + job.job_id = start_remote_training(compute_config=compute_config, **train_kwargs_copy) - logger.info("[Finetune] Waiting for training to complete...") wait_for_training(job.job_id) - logger.info("[Finetune] Training completed.") - logger.info("[Finetune] Retrieving model information...") model_info = get_model_info(job.job_id) - logger.info(f"[Finetune] Model info retrieved: {model_info}") - + # model_info[storage_uri] is a path to your cloud where the best(last if no eval) checkpoint weights are forwarded storage_uri = model_info["storage_uri"] - logger.info(f"[Finetune] Copying LoRA weights from {storage_uri}...") - model_names, lora_dynamic_path = copy_lora_weights(storage_uri, model_info, job.job_id) - logger.info(f"[Finetune] LoRA weights copied. Model names: {model_names}") + lora_dynamic_path = storage_uri.split(model)[0] + final_model_name = model + storage_uri.split(model)[1] - logger.info("[Finetune] Setting result in future object...") - model_step_pairs = sorted([(model_name, int(model_name.split("-")[-1])) for model_name in model_names], key=lambda x: x[1]) - last_model_checkpoint = model_step_pairs[-1][0] - logger.info("[Finetune] Training process completed successfully.") + assert "serve_config_path" in train_kwargs, "serve_config_path is required to update the model config" + serve_config_path = train_kwargs.pop("serve_config_path") + update_model_config(lora_dynamic_path, serve_config_path) + job.model_names = [final_model_name] - logger.info("[Finetune] Updating model config with the proper dynamic path") - serve_config_path = train_kwargs.pop("serve_config_path", "serve_1B.yaml") - update_model_config(lora_dynamic_path, serve_config_path, job_id) - job.model_names = model_names - - return last_model_checkpoint + # NOTE: For Litellm we need to prepend "openai/" to the model name + return "openai/" + final_model_name def wait_for_training(job_id): """Wait for the training to complete.""" + logger.info("[Finetune] Waiting for training to complete...") anyscale.job.wait(id=job_id, timeout_s=18000) + logger.info("[Finetune] Training completed.") -def update_model_config(lora_dynamic_path: str, serve_config_path: str, job_id: str): +def update_model_config(lora_dynamic_path: str, serve_config_path: str): """Update the model config storage location with the job_id.""" with open(serve_config_path, "r") as f: serve_config = yaml.safe_load(f) @@ -121,8 +107,7 @@ def update_model_config(lora_dynamic_path: str, serve_config_path: str, job_id: with open(model_config_location, "r") as f: model_config = yaml.safe_load(f) - dynamic_path_until_job_id = lora_dynamic_path.split(job_id)[0] + job_id - model_config["lora_config"]["dynamic_lora_loading_path"] = dynamic_path_until_job_id + model_config["lora_config"]["dynamic_lora_loading_path"] = lora_dynamic_path with open(model_config_location, "w") as f: yaml.safe_dump(model_config, f) @@ -130,6 +115,7 @@ def update_model_config(lora_dynamic_path: str, serve_config_path: str, job_id: def verify_dataset(dataset: List[dict[str, Any]]) -> bool: """Verify the training arguments before starting training.""" + logger.info("[Finetune] Verifying dataset...") dataset_validation = openai_data_validation(dataset) if dataset_validation: @@ -141,6 +127,7 @@ def verify_dataset(dataset: List[dict[str, Any]]) -> bool: def submit_data(train_path: str): """Upload the data to the Workspace cloud storage.""" + logger.info("[Finetune] Submitting data to remote storage...") storage = os.environ['ANYSCALE_ARTIFACT_STORAGE'] datasets = {"train": train_path} @@ -160,6 +147,8 @@ def submit_data(train_path: str): os.system(f"cp {path} {remote_path}") logger.info(f"Copied {path} to {remote_path}") fine_tuning_file_ids[name] = remote_path + + logger.info(f"[Finetune] Data submitted. Remote train path: {fine_tuning_file_ids['train']}") return fine_tuning_file_ids["train"], fine_tuning_file_ids.get("val", None) @@ -202,9 +191,7 @@ def get_yaml_config(model_name): custom_modifications = { "model_id": kwargs["model"], "train_path": train_path, - "logger": { - "provider": "wandb", - }, + "logger": kwargs.get("logging_kwargs", {}), "num_checkpoints_to_keep": 10 } if kwargs.get("output_dir", None): @@ -232,8 +219,8 @@ def hash_dict(d): yaml.safe_dump(model_config_data, open(filename, "w")) ft_path = os.path.join("utils", "ft.py") - - compute_config_dict = { + assert kwargs["compute_config"], "Compute config is required to start the finetuning job" + compute_config_dict = kwargs.pop("compute_config", { "name": "dspy-llmforge-fine-tuning-job", "entrypoint": f"llmforge anyscale finetune {filename}", "working_dir": ".", @@ -246,9 +233,13 @@ def hash_dict(d): "HF_TOKEN": os.environ.get("HF_TOKEN", ""), "HF_HOME": os.environ.get("HF_HOME", ""), } - } - compute_config_kwargs = kwargs.get("compute_config", {}) - compute_config_dict.update(compute_config_kwargs) + }) + compute_config_dict["env_vars"] = compute_config_dict.get("env_vars", {}) + for env_var in ["WANDB_API_KEY", "HF_TOKEN", "HF_HOME"]: + if env_var not in compute_config_dict["env_vars"]: + compute_config_dict["env_vars"][env_var] = os.environ.get(env_var, "") + + compute_config_dict["entrypoint"] = compute_config_dict["entrypoint"].format(filename=filename) compute_config = JobConfig(**compute_config_dict) job_runner_config_path = kwargs.get("compute_yaml_path", "job_runner_config.yaml") @@ -257,7 +248,9 @@ def hash_dict(d): def start_remote_training(compute_config, **kwargs) -> str: + logger.info("[Finetune] Starting remote training...") job_id: str = anyscale.job.submit(compute_config) + logger.info(f"[Finetune] Remote training started. Job ID: {job_id}") return job_id @@ -267,57 +260,10 @@ def wait_for_training(job_id): def get_model_info(job_id): - return anyscale.llm.model.get(job_id=job_id).to_dict() - - -def copy_lora_weights(storage_uri, model_info, job_id): - try: - from google.cloud import storage - - storage_client = storage.Client() - - bucket_name = storage_uri.split('/')[2] - source_folder = '/'.join(storage_uri.split('/')[3:-1]) - logger.info(f"Source folder: {source_folder}") - - bucket = storage_client.bucket(bucket_name) - - blobs = bucket.list_blobs(prefix=source_folder) - - subfolders = set() - for blob in blobs: - if '/' in blob.name[len(source_folder):]: - subfolder = blob.name.split('/')[:-1] - subfolders.add('/'.join(subfolder)) - - base_model_id = model_info["base_model_id"] - lora_dynamic_path = f"dspy/lora_weights/{job_id}/{base_model_id}" - - model_names = [] - for subfolder in subfolders: - subfolder_name = subfolder.split('/')[-1] - destination_folder = f"{lora_dynamic_path}:{subfolder_name}" - if subfolder_name.startswith("epoch"): - model_names.append("/".join(destination_folder.split("/")[-2:])) - else: - continue - - subfolder_blobs = bucket.list_blobs(prefix=subfolder) - - for blob in subfolder_blobs: - source_blob = bucket.blob(blob.name) - destination_blob_name = f"{destination_folder}/{blob.name.split('/')[-1]}" - bucket.copy_blob(source_blob, bucket, destination_blob_name) - logger.info(f"Copied {source_blob.name} to {destination_blob_name}") - - logger.info(f"All subfolders copied to: gs://{bucket_name}/{lora_dynamic_path}") - completed_path = f"gs://{bucket_name}/{lora_dynamic_path}" - return model_names, completed_path - - except Exception as e: - logger.error(f"An error occurred: {str(e)}") - raise e - + logger.info("[Finetune] Retrieving model information from Anyscale Models SDK...") + info = anyscale.llm.model.get(job_id=job_id).to_dict() + logger.info(f"[Finetune] Model info retrieved: {info}") + return info def read_jsonl(filename): with open(filename, "r") as f: diff --git a/dspy/clients/finetune.py b/dspy/clients/finetune.py index d9d0df8e8e..2920cdac0b 100644 --- a/dspy/clients/finetune.py +++ b/dspy/clients/finetune.py @@ -4,6 +4,7 @@ from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional +from dspy.utils.logging import logger import ujson from datasets.fingerprint import Hasher @@ -111,6 +112,7 @@ def save_data( provider_name: Optional[str] = None, ) -> str: """Save the fine-tuning data to a file.""" + logger.info("[Finetune] Converting data to JSONL format...") # Construct the file name based on the data hash hash = Hasher.hash(data) file_name = f"{hash}.jsonl" From c771e35a1aeefb396fc670328f999b652a948b99 Mon Sep 17 00:00:00 2001 From: Isaac Miller Date: Mon, 21 Oct 2024 22:30:48 +0000 Subject: [PATCH 2/8] remove default compuite confg --- dspy/clients/anyscale.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/dspy/clients/anyscale.py b/dspy/clients/anyscale.py index 32acbf6276..c27570153d 100644 --- a/dspy/clients/anyscale.py +++ b/dspy/clients/anyscale.py @@ -220,20 +220,7 @@ def hash_dict(d): ft_path = os.path.join("utils", "ft.py") assert kwargs["compute_config"], "Compute config is required to start the finetuning job" - compute_config_dict = kwargs.pop("compute_config", { - "name": "dspy-llmforge-fine-tuning-job", - "entrypoint": f"llmforge anyscale finetune {filename}", - "working_dir": ".", - "image_uri": "localhost:5555/anyscale/llm-forge:0.5.6", - "requirements": [ - "wandb", - ], - "env_vars": { - "WANDB_API_KEY": os.environ.get("WANDB_API_KEY", ""), - "HF_TOKEN": os.environ.get("HF_TOKEN", ""), - "HF_HOME": os.environ.get("HF_HOME", ""), - } - }) + compute_config_dict = kwargs.pop("compute_config") compute_config_dict["env_vars"] = compute_config_dict.get("env_vars", {}) for env_var in ["WANDB_API_KEY", "HF_TOKEN", "HF_HOME"]: if env_var not in compute_config_dict["env_vars"]: From 027b1c540236e65ef1d1d7e6b2a8a2de6c43f515 Mon Sep 17 00:00:00 2001 From: Isaac Miller Date: Tue, 22 Oct 2024 00:10:07 +0000 Subject: [PATCH 3/8] Change submit data --- dspy/clients/anyscale.py | 67 +++++++++++++++------------------------- 1 file changed, 25 insertions(+), 42 deletions(-) diff --git a/dspy/clients/anyscale.py b/dspy/clients/anyscale.py index c27570153d..0f6e7580fc 100644 --- a/dspy/clients/anyscale.py +++ b/dspy/clients/anyscale.py @@ -52,6 +52,9 @@ def finetune_anyscale( """Start the finetune job.""" train_kwargs = train_kwargs or {} assert "model" not in train_kwargs, "Model should not be in the train_kwargs" + assert "serve_config_path" in train_kwargs, "serve_config_path is required to update the model config" + assert anyscale.__version__ >= "0.24.65", "Anyscale version >= 0.24.65 is required to use the dataset upload feature" + train_kwargs_copy = train_kwargs.copy() train_kwargs_copy["model"] = model @@ -65,13 +68,13 @@ def finetune_anyscale( raise RuntimeError(err) train_data_path = save_data(train_data, provider_name=PROVIDER_ANYSCALE) - remote_train_path, _ = submit_data(train_path=train_data_path) + remote_train_path = submit_data(train_path=train_data_path, job_config=train_kwargs.get("job_config", {})) - _, compute_config = generate_config_files(train_path=remote_train_path, **train_kwargs_copy) + _, job_config = generate_config_files(train_path=remote_train_path, **train_kwargs_copy) # Remove potential duplicate compute config - train_kwargs_copy.pop("compute_config") + train_kwargs_copy.pop("job_config") - job.job_id = start_remote_training(compute_config=compute_config, **train_kwargs_copy) + job.job_id = start_remote_training(job_config=job_config, **train_kwargs_copy) wait_for_training(job.job_id) @@ -82,12 +85,10 @@ def finetune_anyscale( lora_dynamic_path = storage_uri.split(model)[0] final_model_name = model + storage_uri.split(model)[1] - assert "serve_config_path" in train_kwargs, "serve_config_path is required to update the model config" serve_config_path = train_kwargs.pop("serve_config_path") update_model_config(lora_dynamic_path, serve_config_path) job.model_names = [final_model_name] - # NOTE: For Litellm we need to prepend "openai/" to the model name return "openai/" + final_model_name def wait_for_training(job_id): @@ -125,38 +126,21 @@ def verify_dataset(dataset: List[dict[str, Any]]) -> bool: return True -def submit_data(train_path: str): - """Upload the data to the Workspace cloud storage.""" +def submit_data(train_path: str, job_config: Dict[str, Any]): + """Upload the data to cloud storage.""" logger.info("[Finetune] Submitting data to remote storage...") - storage = os.environ['ANYSCALE_ARTIFACT_STORAGE'] - - datasets = {"train": train_path} - - fine_tuning_file_ids = {} - for name, path in datasets.items(): - num_items = len(read_jsonl(path)) - logger.info(f"Number of items in {name} data: {num_items}") - - remote_path = os.path.join(storage, path.split("/")[-1]) - logger.info(f"Uploading {name} data to S3 at {remote_path}") - if remote_path[:2] == "s3": - os.system(f"aws s3 cp {path} {remote_path}") - elif remote_path[:2] == "gs": - os.system(f"gcloud storage cp {path} {remote_path}") - else: - os.system(f"cp {path} {remote_path}") - logger.info(f"Copied {path} to {remote_path}") - fine_tuning_file_ids[name] = remote_path - - logger.info(f"[Finetune] Data submitted. Remote train path: {fine_tuning_file_ids['train']}") - - return fine_tuning_file_ids["train"], fine_tuning_file_ids.get("val", None) + dataset_name = f"dataset-{job_config.get('name', 'dspy-llmforge-fine-tuning-job')}" + train_path_remote = anyscale.llm.dataset.upload(train_path, name=dataset_name, cloud=job_config.get("cloud", None)).storage_uri + logger.info(f"[Finetune] Data submitted. Remote train path: {train_path_remote}") + + return train_path_remote def generate_config_files(train_path: str, **kwargs): base_model_yaml_path = kwargs.get("train_config_yaml", None) assert kwargs["model"] is not None, "Model is required to generate the config files" - + assert kwargs["job_config"], "Job config is required to start the finetuning job" + use_lora = kwargs.get("use_lora", False) example_dir = "" lora_path = "configs/training/lora" if use_lora else "configs/training/full_param" @@ -219,24 +203,23 @@ def hash_dict(d): yaml.safe_dump(model_config_data, open(filename, "w")) ft_path = os.path.join("utils", "ft.py") - assert kwargs["compute_config"], "Compute config is required to start the finetuning job" - compute_config_dict = kwargs.pop("compute_config") - compute_config_dict["env_vars"] = compute_config_dict.get("env_vars", {}) + job_config_dict = kwargs.pop("job_config") + job_config_dict["env_vars"] = job_config_dict.get("env_vars", {}) for env_var in ["WANDB_API_KEY", "HF_TOKEN", "HF_HOME"]: - if env_var not in compute_config_dict["env_vars"]: - compute_config_dict["env_vars"][env_var] = os.environ.get(env_var, "") + if env_var not in job_config_dict["env_vars"]: + job_config_dict["env_vars"][env_var] = os.environ.get(env_var, "") - compute_config_dict["entrypoint"] = compute_config_dict["entrypoint"].format(filename=filename) - compute_config = JobConfig(**compute_config_dict) + job_config_dict["entrypoint"] = job_config_dict["entrypoint"].format(filename=filename) + job_config = JobConfig(**job_config_dict) job_runner_config_path = kwargs.get("compute_yaml_path", "job_runner_config.yaml") - return job_runner_config_path, compute_config + return job_runner_config_path, job_config -def start_remote_training(compute_config, **kwargs) -> str: +def start_remote_training(job_config, **kwargs) -> str: logger.info("[Finetune] Starting remote training...") - job_id: str = anyscale.job.submit(compute_config) + job_id: str = anyscale.job.submit(job_config) logger.info(f"[Finetune] Remote training started. Job ID: {job_id}") return job_id From 1fd9e28c40b5125012c98a580adde1e05490f333 Mon Sep 17 00:00:00 2001 From: Isaac Miller Date: Tue, 22 Oct 2024 18:12:21 +0000 Subject: [PATCH 4/8] Remove DSPy enforcing configs --- dspy/clients/anyscale.py | 118 +++++++++++++-------------------------- 1 file changed, 39 insertions(+), 79 deletions(-) diff --git a/dspy/clients/anyscale.py b/dspy/clients/anyscale.py index 0f6e7580fc..148fa4d53d 100644 --- a/dspy/clients/anyscale.py +++ b/dspy/clients/anyscale.py @@ -54,10 +54,16 @@ def finetune_anyscale( assert "model" not in train_kwargs, "Model should not be in the train_kwargs" assert "serve_config_path" in train_kwargs, "serve_config_path is required to update the model config" assert anyscale.__version__ >= "0.24.65", "Anyscale version >= 0.24.65 is required to use the dataset upload feature" - + assert all([x in train_kwargs for x in ["job_config_path", "llmforge_config_path", "serve_config_path"]]), "All of job_config_path, llmforge_config_path, and serve_config_path are required" train_kwargs_copy = train_kwargs.copy() train_kwargs_copy["model"] = model + + job_config_path = train_kwargs.get("job_config_path", None) + llmforge_config_path = train_kwargs.get("llmforge_config_path", None) + serve_config_path = train_kwargs.get("serve_config_path", None) + # llmforge_config = yaml.safe_load(open(llmforge_config_path, "r")) + if train_method not in TRAINING_METHODS_ANYSCALE: raise NotImplementedError(f"AnyScale can only support {TRAINING_METHODS_ANYSCALE} for the time being") @@ -68,13 +74,17 @@ def finetune_anyscale( raise RuntimeError(err) train_data_path = save_data(train_data, provider_name=PROVIDER_ANYSCALE) - remote_train_path = submit_data(train_path=train_data_path, job_config=train_kwargs.get("job_config", {})) - _, job_config = generate_config_files(train_path=remote_train_path, **train_kwargs_copy) + # TODO: Figure out a better pattern + job_config_temp = yaml.safe_load(open(job_config_path, "r")) + remote_train_path = submit_data(train_path=train_data_path, job_config=job_config_temp) + + job_config = generate_config_files(train_path=remote_train_path, llmforge_config_path=llmforge_config_path, job_config_path=job_config_path, model=model) + # Remove potential duplicate compute config - train_kwargs_copy.pop("job_config") + # train_kwargs_copy.pop("job_config") - job.job_id = start_remote_training(job_config=job_config, **train_kwargs_copy) + job.job_id = start_remote_training(job_config=job_config) wait_for_training(job.job_id) @@ -85,8 +95,7 @@ def finetune_anyscale( lora_dynamic_path = storage_uri.split(model)[0] final_model_name = model + storage_uri.split(model)[1] - serve_config_path = train_kwargs.pop("serve_config_path") - update_model_config(lora_dynamic_path, serve_config_path) + update_serve_model_config(lora_dynamic_path, train_kwargs.get("serve_config_path")) job.model_names = [final_model_name] return "openai/" + final_model_name @@ -94,11 +103,11 @@ def finetune_anyscale( def wait_for_training(job_id): """Wait for the training to complete.""" logger.info("[Finetune] Waiting for training to complete...") - anyscale.job.wait(id=job_id, timeout_s=18000) + anyscale.job.wait(id=job_id) logger.info("[Finetune] Training completed.") -def update_model_config(lora_dynamic_path: str, serve_config_path: str): +def update_serve_model_config(lora_dynamic_path: str, serve_config_path: str): """Update the model config storage location with the job_id.""" with open(serve_config_path, "r") as f: serve_config = yaml.safe_load(f) @@ -136,88 +145,39 @@ def submit_data(train_path: str, job_config: Dict[str, Any]): return train_path_remote -def generate_config_files(train_path: str, **kwargs): - base_model_yaml_path = kwargs.get("train_config_yaml", None) - assert kwargs["model"] is not None, "Model is required to generate the config files" - assert kwargs["job_config"], "Job config is required to start the finetuning job" +def generate_config_files(train_path: str, llmforge_config_path: str, job_config_path: str, model: str): + assert llmforge_config_path, "LLMForge config is required to generate the config files" + assert job_config_path, "Job config is required to start the finetuning job" + + llmforge_config = yaml.safe_load(open(llmforge_config_path, "r")) + job_config_dict = yaml.safe_load(open(job_config_path, "r")) - use_lora = kwargs.get("use_lora", False) - example_dir = "" - lora_path = "configs/training/lora" if use_lora else "configs/training/full_param" - - - if not base_model_yaml_path: - def get_yaml_config(model_name): - if "llama" in model_name.lower(): - if "70b" in model_name: - return "llama-3-70b.yaml" - elif "13b" in model_name: - return "llama-3-70b.yaml" - else: - return "llama-3-8b.yaml" - elif "mistral" in model_name.lower(): - if "mixtral" in model_name.lower(): - return "mixtral-8x7b.yaml" - else: - return "mistral-7b.yaml" - else: - raise RuntimeError("No default yaml found for the model") - - default_model_yaml_path = get_yaml_config(kwargs["model"]) - base_model_yaml_path = os.path.join(example_dir, lora_path, default_model_yaml_path) - logger.info(f"Using default yaml template for model: {base_model_yaml_path}") - - model_config_data = yaml.safe_load(open(base_model_yaml_path, "r")) - model_config_data.update(kwargs.get("hyperparameters", {})) + model_config_data = llmforge_config.copy() + if "model_id" or "train_path" in model_config_data: + logger.warning(f"model_id and train_path inside {llmforge_config_path} are going to be overridden") - model_config_data["model_id"] = kwargs["model"] - - custom_modifications = { - "model_id": kwargs["model"], - "train_path": train_path, - "logger": kwargs.get("logging_kwargs", {}), - "num_checkpoints_to_keep": 10 - } - if kwargs.get("output_dir", None): - custom_modifications["output_dir"] = kwargs["output_dir"] - - model_config_data.update(custom_modifications) + model_config_data["model_id"] = model + model_config_data["train_path"] = train_path model_config_data = {k: v for k, v in model_config_data.items() if v is not None} - def freeze(d): - if isinstance(d, dict): - return tuple(sorted((key, freeze(value)) for key, value in d.items())) - elif isinstance(d, list): - return tuple(freeze(value) for value in sorted(d)) - elif isinstance(d, set): - return tuple(freeze(value) for value in sorted(d)) - return d - - def hash_dict(d): - return hash(freeze(d)) - dict_sorted_hash = hash_dict(model_config_data) - if dict_sorted_hash < 0: - dict_sorted_hash = -dict_sorted_hash - filename = f"model_config_dspy_{dict_sorted_hash}.yaml" logger.info(f"Model config data: {model_config_data}") - yaml.safe_dump(model_config_data, open(filename, "w")) + yaml.safe_dump(model_config_data, open(llmforge_config_path, "w")) - ft_path = os.path.join("utils", "ft.py") - job_config_dict = kwargs.pop("job_config") - job_config_dict["env_vars"] = job_config_dict.get("env_vars", {}) - for env_var in ["WANDB_API_KEY", "HF_TOKEN", "HF_HOME"]: - if env_var not in job_config_dict["env_vars"]: - job_config_dict["env_vars"][env_var] = os.environ.get(env_var, "") + if not job_config_dict.get("env_vars", None): + job_config_dict["env_vars"] = {} + + for env_var in ["HF_TOKEN", "HF_HOME", "WANDB_API_KEY"]: + if env_var not in job_config_dict["env_vars"] and os.environ.get(env_var, None): + job_config_dict["env_vars"][env_var] = os.environ[env_var] + - job_config_dict["entrypoint"] = job_config_dict["entrypoint"].format(filename=filename) job_config = JobConfig(**job_config_dict) - job_runner_config_path = kwargs.get("compute_yaml_path", "job_runner_config.yaml") - return job_runner_config_path, job_config + return job_config -def start_remote_training(job_config, **kwargs) -> str: +def start_remote_training(job_config) -> str: logger.info("[Finetune] Starting remote training...") job_id: str = anyscale.job.submit(job_config) logger.info(f"[Finetune] Remote training started. Job ID: {job_id}") From 0f5eeeaefbaf4b7586cb430a1ed42e3af7e64b10 Mon Sep 17 00:00:00 2001 From: Isaac Miller Date: Tue, 22 Oct 2024 18:30:58 +0000 Subject: [PATCH 5/8] Modify original file --- dspy/clients/anyscale.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/dspy/clients/anyscale.py b/dspy/clients/anyscale.py index 148fa4d53d..8ec574d16a 100644 --- a/dspy/clients/anyscale.py +++ b/dspy/clients/anyscale.py @@ -151,17 +151,16 @@ def generate_config_files(train_path: str, llmforge_config_path: str, job_config llmforge_config = yaml.safe_load(open(llmforge_config_path, "r")) job_config_dict = yaml.safe_load(open(job_config_path, "r")) - - model_config_data = llmforge_config.copy() - if "model_id" or "train_path" in model_config_data: + + if "model_id" or "train_path" in llmforge_config: logger.warning(f"model_id and train_path inside {llmforge_config_path} are going to be overridden") - model_config_data["model_id"] = model - model_config_data["train_path"] = train_path - model_config_data = {k: v for k, v in model_config_data.items() if v is not None} + llmforge_config["model_id"] = model + llmforge_config["train_path"] = train_path + llmforge_config = {k: v for k, v in llmforge_config.items() if v is not None} - logger.info(f"Model config data: {model_config_data}") - yaml.safe_dump(model_config_data, open(llmforge_config_path, "w")) + logger.info(f"Model config data: {llmforge_config}") + yaml.safe_dump(llmforge_config, open(llmforge_config_path, "w")) if not job_config_dict.get("env_vars", None): job_config_dict["env_vars"] = {} From 131c65626506c411516c64eaaf2d93441c5d3b62 Mon Sep 17 00:00:00 2001 From: Isaac Miller Date: Tue, 22 Oct 2024 23:23:49 +0000 Subject: [PATCH 6/8] Make serve_config_path optional --- dspy/clients/anyscale.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dspy/clients/anyscale.py b/dspy/clients/anyscale.py index 8ec574d16a..5c9db1ad52 100644 --- a/dspy/clients/anyscale.py +++ b/dspy/clients/anyscale.py @@ -52,9 +52,8 @@ def finetune_anyscale( """Start the finetune job.""" train_kwargs = train_kwargs or {} assert "model" not in train_kwargs, "Model should not be in the train_kwargs" - assert "serve_config_path" in train_kwargs, "serve_config_path is required to update the model config" assert anyscale.__version__ >= "0.24.65", "Anyscale version >= 0.24.65 is required to use the dataset upload feature" - assert all([x in train_kwargs for x in ["job_config_path", "llmforge_config_path", "serve_config_path"]]), "All of job_config_path, llmforge_config_path, and serve_config_path are required" + assert all([x in train_kwargs for x in ["job_config_path", "llmforge_config_path"]]), "Both job_config_path and llmforge_config_path are required" train_kwargs_copy = train_kwargs.copy() train_kwargs_copy["model"] = model @@ -62,7 +61,6 @@ def finetune_anyscale( job_config_path = train_kwargs.get("job_config_path", None) llmforge_config_path = train_kwargs.get("llmforge_config_path", None) serve_config_path = train_kwargs.get("serve_config_path", None) - # llmforge_config = yaml.safe_load(open(llmforge_config_path, "r")) if train_method not in TRAINING_METHODS_ANYSCALE: raise NotImplementedError(f"AnyScale can only support {TRAINING_METHODS_ANYSCALE} for the time being") @@ -75,7 +73,7 @@ def finetune_anyscale( train_data_path = save_data(train_data, provider_name=PROVIDER_ANYSCALE) - # TODO: Figure out a better pattern + # TODO(Isaac): Figure out a better pattern job_config_temp = yaml.safe_load(open(job_config_path, "r")) remote_train_path = submit_data(train_path=train_data_path, job_config=job_config_temp) @@ -89,13 +87,15 @@ def finetune_anyscale( wait_for_training(job.job_id) model_info = get_model_info(job.job_id) + # model_info[storage_uri] is a path to your cloud where the best(last if no eval) checkpoint weights are forwarded storage_uri = model_info["storage_uri"] lora_dynamic_path = storage_uri.split(model)[0] final_model_name = model + storage_uri.split(model)[1] - - update_serve_model_config(lora_dynamic_path, train_kwargs.get("serve_config_path")) + + if serve_config_path: + update_serve_model_config(lora_dynamic_path, serve_config_path) job.model_names = [final_model_name] return "openai/" + final_model_name From f7030a0357c1e4afd69a945d3b3206cc3699b4ec Mon Sep 17 00:00:00 2001 From: Isaac Miller Date: Tue, 22 Oct 2024 23:24:24 +0000 Subject: [PATCH 7/8] Remove excess comment --- dspy/clients/anyscale.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dspy/clients/anyscale.py b/dspy/clients/anyscale.py index 5c9db1ad52..b7871a1275 100644 --- a/dspy/clients/anyscale.py +++ b/dspy/clients/anyscale.py @@ -80,7 +80,6 @@ def finetune_anyscale( job_config = generate_config_files(train_path=remote_train_path, llmforge_config_path=llmforge_config_path, job_config_path=job_config_path, model=model) # Remove potential duplicate compute config - # train_kwargs_copy.pop("job_config") job.job_id = start_remote_training(job_config=job_config) From ee572cd84fc4ee31f707f1f413c25c20b6018fdf Mon Sep 17 00:00:00 2001 From: Isaac Miller Date: Tue, 22 Oct 2024 23:24:58 +0000 Subject: [PATCH 8/8] Remove warning --- dspy/clients/anyscale.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/dspy/clients/anyscale.py b/dspy/clients/anyscale.py index b7871a1275..d65c0e9912 100644 --- a/dspy/clients/anyscale.py +++ b/dspy/clients/anyscale.py @@ -150,9 +150,6 @@ def generate_config_files(train_path: str, llmforge_config_path: str, job_config llmforge_config = yaml.safe_load(open(llmforge_config_path, "r")) job_config_dict = yaml.safe_load(open(job_config_path, "r")) - - if "model_id" or "train_path" in llmforge_config: - logger.warning(f"model_id and train_path inside {llmforge_config_path} are going to be overridden") llmforge_config["model_id"] = model llmforge_config["train_path"] = train_path