diff --git a/dspy/clients/anyscale.py b/dspy/clients/anyscale.py index 2c599ec7f9..d65c0e9912 100644 --- a/dspy/clients/anyscale.py +++ b/dspy/clients/anyscale.py @@ -52,66 +52,61 @@ def finetune_anyscale( """Start the finetune job.""" train_kwargs = train_kwargs or {} assert "model" not in train_kwargs, "Model should not be in the train_kwargs" + assert anyscale.__version__ >= "0.24.65", "Anyscale version >= 0.24.65 is required to use the dataset upload feature" + assert all([x in train_kwargs for x in ["job_config_path", "llmforge_config_path"]]), "Both job_config_path and llmforge_config_path are required" train_kwargs_copy = train_kwargs.copy() train_kwargs_copy["model"] = model - logger.info("[Finetune] Starting training process...") + + job_config_path = train_kwargs.get("job_config_path", None) + llmforge_config_path = train_kwargs.get("llmforge_config_path", None) + serve_config_path = train_kwargs.get("serve_config_path", None) + if train_method not in TRAINING_METHODS_ANYSCALE: raise NotImplementedError(f"AnyScale can only support {TRAINING_METHODS_ANYSCALE} for the time being") - logger.info("[Finetune] Validating the dataset format...") if not verify_dataset(train_data): # TODO: Does AnyScale support text completion models? err = "[Finetune] Error: Unable to verify that the dataset is in the correct format." logger.error(err) raise RuntimeError(err) - logger.info("[Finetune] Converting data to JSONL format...") train_data_path = save_data(train_data, provider_name=PROVIDER_ANYSCALE) - logger.info("[Finetune] Submitting data to remote storage...") - remote_train_path, _ = submit_data(train_path=train_data_path) - logger.info(f"[Finetune] Data submitted. Remote train path: {remote_train_path}") - logger.info("[Finetune] Generating configuration files...") - _, compute_config = generate_config_files(train_path=remote_train_path, **train_kwargs_copy) - - logger.info("[Finetune] Starting remote training...") - job_id = start_remote_training(compute_config=compute_config, **train_kwargs_copy) - job.job_id = job_id - logger.info(f"[Finetune] Remote training started. Job ID: {job_id}") + # TODO(Isaac): Figure out a better pattern + job_config_temp = yaml.safe_load(open(job_config_path, "r")) + remote_train_path = submit_data(train_path=train_data_path, job_config=job_config_temp) + + job_config = generate_config_files(train_path=remote_train_path, llmforge_config_path=llmforge_config_path, job_config_path=job_config_path, model=model) + + # Remove potential duplicate compute config + + job.job_id = start_remote_training(job_config=job_config) - logger.info("[Finetune] Waiting for training to complete...") wait_for_training(job.job_id) - logger.info("[Finetune] Training completed.") - logger.info("[Finetune] Retrieving model information...") model_info = get_model_info(job.job_id) - logger.info(f"[Finetune] Model info retrieved: {model_info}") + # model_info[storage_uri] is a path to your cloud where the best(last if no eval) checkpoint weights are forwarded storage_uri = model_info["storage_uri"] - logger.info(f"[Finetune] Copying LoRA weights from {storage_uri}...") - model_names, lora_dynamic_path = copy_lora_weights(storage_uri, model_info, job.job_id) - logger.info(f"[Finetune] LoRA weights copied. Model names: {model_names}") - - - logger.info("[Finetune] Setting result in future object...") - model_step_pairs = sorted([(model_name, int(model_name.split("-")[-1])) for model_name in model_names], key=lambda x: x[1]) - last_model_checkpoint = model_step_pairs[-1][0] - logger.info("[Finetune] Training process completed successfully.") - logger.info("[Finetune] Updating model config with the proper dynamic path") - serve_config_path = train_kwargs.pop("serve_config_path", "serve_1B.yaml") - update_model_config(lora_dynamic_path, serve_config_path, job_id) - job.model_names = model_names + lora_dynamic_path = storage_uri.split(model)[0] + final_model_name = model + storage_uri.split(model)[1] + + if serve_config_path: + update_serve_model_config(lora_dynamic_path, serve_config_path) + job.model_names = [final_model_name] - return last_model_checkpoint + return "openai/" + final_model_name def wait_for_training(job_id): """Wait for the training to complete.""" - anyscale.job.wait(id=job_id, timeout_s=18000) + logger.info("[Finetune] Waiting for training to complete...") + anyscale.job.wait(id=job_id) + logger.info("[Finetune] Training completed.") -def update_model_config(lora_dynamic_path: str, serve_config_path: str, job_id: str): +def update_serve_model_config(lora_dynamic_path: str, serve_config_path: str): """Update the model config storage location with the job_id.""" with open(serve_config_path, "r") as f: serve_config = yaml.safe_load(f) @@ -121,8 +116,7 @@ def update_model_config(lora_dynamic_path: str, serve_config_path: str, job_id: with open(model_config_location, "r") as f: model_config = yaml.safe_load(f) - dynamic_path_until_job_id = lora_dynamic_path.split(job_id)[0] + job_id - model_config["lora_config"]["dynamic_lora_loading_path"] = dynamic_path_until_job_id + model_config["lora_config"]["dynamic_lora_loading_path"] = lora_dynamic_path with open(model_config_location, "w") as f: yaml.safe_dump(model_config, f) @@ -130,6 +124,7 @@ def update_model_config(lora_dynamic_path: str, serve_config_path: str, job_id: def verify_dataset(dataset: List[dict[str, Any]]) -> bool: """Verify the training arguments before starting training.""" + logger.info("[Finetune] Verifying dataset...") dataset_validation = openai_data_validation(dataset) if dataset_validation: @@ -139,125 +134,48 @@ def verify_dataset(dataset: List[dict[str, Any]]) -> bool: return True -def submit_data(train_path: str): - """Upload the data to the Workspace cloud storage.""" - storage = os.environ['ANYSCALE_ARTIFACT_STORAGE'] +def submit_data(train_path: str, job_config: Dict[str, Any]): + """Upload the data to cloud storage.""" + logger.info("[Finetune] Submitting data to remote storage...") + dataset_name = f"dataset-{job_config.get('name', 'dspy-llmforge-fine-tuning-job')}" + train_path_remote = anyscale.llm.dataset.upload(train_path, name=dataset_name, cloud=job_config.get("cloud", None)).storage_uri + logger.info(f"[Finetune] Data submitted. Remote train path: {train_path_remote}") + + return train_path_remote + + +def generate_config_files(train_path: str, llmforge_config_path: str, job_config_path: str, model: str): + assert llmforge_config_path, "LLMForge config is required to generate the config files" + assert job_config_path, "Job config is required to start the finetuning job" - datasets = {"train": train_path} - - fine_tuning_file_ids = {} - for name, path in datasets.items(): - num_items = len(read_jsonl(path)) - logger.info(f"Number of items in {name} data: {num_items}") - - remote_path = os.path.join(storage, path.split("/")[-1]) - logger.info(f"Uploading {name} data to S3 at {remote_path}") - if remote_path[:2] == "s3": - os.system(f"aws s3 cp {path} {remote_path}") - elif remote_path[:2] == "gs": - os.system(f"gcloud storage cp {path} {remote_path}") - else: - os.system(f"cp {path} {remote_path}") - logger.info(f"Copied {path} to {remote_path}") - fine_tuning_file_ids[name] = remote_path + llmforge_config = yaml.safe_load(open(llmforge_config_path, "r")) + job_config_dict = yaml.safe_load(open(job_config_path, "r")) - return fine_tuning_file_ids["train"], fine_tuning_file_ids.get("val", None) - - -def generate_config_files(train_path: str, **kwargs): - base_model_yaml_path = kwargs.get("train_config_yaml", None) - assert kwargs["model"] is not None, "Model is required to generate the config files" - - use_lora = kwargs.get("use_lora", False) - example_dir = "" - lora_path = "configs/training/lora" if use_lora else "configs/training/full_param" - - - if not base_model_yaml_path: - def get_yaml_config(model_name): - if "llama" in model_name.lower(): - if "70b" in model_name: - return "llama-3-70b.yaml" - elif "13b" in model_name: - return "llama-3-70b.yaml" - else: - return "llama-3-8b.yaml" - elif "mistral" in model_name.lower(): - if "mixtral" in model_name.lower(): - return "mixtral-8x7b.yaml" - else: - return "mistral-7b.yaml" - else: - raise RuntimeError("No default yaml found for the model") - - default_model_yaml_path = get_yaml_config(kwargs["model"]) - base_model_yaml_path = os.path.join(example_dir, lora_path, default_model_yaml_path) - logger.info(f"Using default yaml template for model: {base_model_yaml_path}") - - model_config_data = yaml.safe_load(open(base_model_yaml_path, "r")) - model_config_data.update(kwargs.get("hyperparameters", {})) + llmforge_config["model_id"] = model + llmforge_config["train_path"] = train_path + llmforge_config = {k: v for k, v in llmforge_config.items() if v is not None} + + logger.info(f"Model config data: {llmforge_config}") + yaml.safe_dump(llmforge_config, open(llmforge_config_path, "w")) + + if not job_config_dict.get("env_vars", None): + job_config_dict["env_vars"] = {} + + for env_var in ["HF_TOKEN", "HF_HOME", "WANDB_API_KEY"]: + if env_var not in job_config_dict["env_vars"] and os.environ.get(env_var, None): + job_config_dict["env_vars"][env_var] = os.environ[env_var] - model_config_data["model_id"] = kwargs["model"] - - custom_modifications = { - "model_id": kwargs["model"], - "train_path": train_path, - "logger": { - "provider": "wandb", - }, - "num_checkpoints_to_keep": 10 - } - if kwargs.get("output_dir", None): - custom_modifications["output_dir"] = kwargs["output_dir"] - - model_config_data.update(custom_modifications) - model_config_data = {k: v for k, v in model_config_data.items() if v is not None} - - def freeze(d): - if isinstance(d, dict): - return tuple(sorted((key, freeze(value)) for key, value in d.items())) - elif isinstance(d, list): - return tuple(freeze(value) for value in sorted(d)) - elif isinstance(d, set): - return tuple(freeze(value) for value in sorted(d)) - return d - - def hash_dict(d): - return hash(freeze(d)) - dict_sorted_hash = hash_dict(model_config_data) - if dict_sorted_hash < 0: - dict_sorted_hash = -dict_sorted_hash - filename = f"model_config_dspy_{dict_sorted_hash}.yaml" - logger.info(f"Model config data: {model_config_data}") - yaml.safe_dump(model_config_data, open(filename, "w")) - - ft_path = os.path.join("utils", "ft.py") - - compute_config_dict = { - "name": "dspy-llmforge-fine-tuning-job", - "entrypoint": f"llmforge anyscale finetune {filename}", - "working_dir": ".", - "image_uri": "localhost:5555/anyscale/llm-forge:0.5.6", - "requirements": [ - "wandb", - ], - "env_vars": { - "WANDB_API_KEY": os.environ.get("WANDB_API_KEY", ""), - "HF_TOKEN": os.environ.get("HF_TOKEN", ""), - "HF_HOME": os.environ.get("HF_HOME", ""), - } - } - compute_config_kwargs = kwargs.get("compute_config", {}) - compute_config_dict.update(compute_config_kwargs) - compute_config = JobConfig(**compute_config_dict) - - job_runner_config_path = kwargs.get("compute_yaml_path", "job_runner_config.yaml") - - return job_runner_config_path, compute_config - - -def start_remote_training(compute_config, **kwargs) -> str: - job_id: str = anyscale.job.submit(compute_config) + + job_config = JobConfig(**job_config_dict) + + + return job_config + + +def start_remote_training(job_config) -> str: + logger.info("[Finetune] Starting remote training...") + job_id: str = anyscale.job.submit(job_config) + logger.info(f"[Finetune] Remote training started. Job ID: {job_id}") return job_id @@ -267,57 +185,10 @@ def wait_for_training(job_id): def get_model_info(job_id): - return anyscale.llm.model.get(job_id=job_id).to_dict() - - -def copy_lora_weights(storage_uri, model_info, job_id): - try: - from google.cloud import storage - - storage_client = storage.Client() - - bucket_name = storage_uri.split('/')[2] - source_folder = '/'.join(storage_uri.split('/')[3:-1]) - logger.info(f"Source folder: {source_folder}") - - bucket = storage_client.bucket(bucket_name) - - blobs = bucket.list_blobs(prefix=source_folder) - - subfolders = set() - for blob in blobs: - if '/' in blob.name[len(source_folder):]: - subfolder = blob.name.split('/')[:-1] - subfolders.add('/'.join(subfolder)) - - base_model_id = model_info["base_model_id"] - lora_dynamic_path = f"dspy/lora_weights/{job_id}/{base_model_id}" - - model_names = [] - for subfolder in subfolders: - subfolder_name = subfolder.split('/')[-1] - destination_folder = f"{lora_dynamic_path}:{subfolder_name}" - if subfolder_name.startswith("epoch"): - model_names.append("/".join(destination_folder.split("/")[-2:])) - else: - continue - - subfolder_blobs = bucket.list_blobs(prefix=subfolder) - - for blob in subfolder_blobs: - source_blob = bucket.blob(blob.name) - destination_blob_name = f"{destination_folder}/{blob.name.split('/')[-1]}" - bucket.copy_blob(source_blob, bucket, destination_blob_name) - logger.info(f"Copied {source_blob.name} to {destination_blob_name}") - - logger.info(f"All subfolders copied to: gs://{bucket_name}/{lora_dynamic_path}") - completed_path = f"gs://{bucket_name}/{lora_dynamic_path}" - return model_names, completed_path - - except Exception as e: - logger.error(f"An error occurred: {str(e)}") - raise e - + logger.info("[Finetune] Retrieving model information from Anyscale Models SDK...") + info = anyscale.llm.model.get(job_id=job_id).to_dict() + logger.info(f"[Finetune] Model info retrieved: {info}") + return info def read_jsonl(filename): with open(filename, "r") as f: diff --git a/dspy/clients/finetune.py b/dspy/clients/finetune.py index d9d0df8e8e..2920cdac0b 100644 --- a/dspy/clients/finetune.py +++ b/dspy/clients/finetune.py @@ -4,6 +4,7 @@ from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional +from dspy.utils.logging import logger import ujson from datasets.fingerprint import Hasher @@ -111,6 +112,7 @@ def save_data( provider_name: Optional[str] = None, ) -> str: """Save the fine-tuning data to a file.""" + logger.info("[Finetune] Converting data to JSONL format...") # Construct the file name based on the data hash hash = Hasher.hash(data) file_name = f"{hash}.jsonl"