-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Refactor finetuning implementation to be 2.5 compatible #1594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
132 commits
Select commit
Hold shift + click to select a range
1cc717a
Add helper methods to control module LMs
dilarasoylu 7336cbc
Add structural equivalency check to predict
dilarasoylu 8c283c5
First pass at OpenAIModel. Needs more implementations
isaacbmiller 87e8370
Update module structural equivalency check to use named_predictors
dilarasoylu 44f452b
Update _assert_lm_consistency in program.py
dilarasoylu 417dcd4
Method rename in program.py
dilarasoylu 5b461f9
Add val set to OAI
isaacbmiller bed26d9
Adjust abstract args and fix retrieve
isaacbmiller 008ec06
Remove extra comments
isaacbmiller efe8a25
Address comments
dilarasoylu 5b82770
Remove old comment
dilarasoylu 790d8fd
Improve readability
dilarasoylu f9ae0bb
Add data functions
dilarasoylu f312910
Merge branch 'main_finetune' into fintuneable-lm
isaacbmiller c12263d
added data collection callback example
isaacbmiller 8f31aea
WIP
isaacbmiller 6fb426d
Merge branch 'prepare_finetune' into fintuneable-lm
isaacbmiller 8b0dcda
WIP Future LM + notebook
isaacbmiller 3ca59b7
WIP Lm
isaacbmiller 967dcc5
OAI data kinda working
isaacbmiller ed3e885
Add OAI
isaacbmiller abf263b
Remove data
isaacbmiller e087526
Implement LM Future
isaacbmiller 94bd638
ruff fixes
isaacbmiller e2389b4
move tiktoken dependency
isaacbmiller b56eb2a
Import finetuning method
isaacbmiller 3fc1449
Remove dataclass from LM
isaacbmiller 3469282
Add verification to OpenAIModel
isaacbmiller 8d24655
Smaller changes
isaacbmiller b1ddecc
Review changes
isaacbmiller aa17d54
ruff check
isaacbmiller 41c3be2
Change finetuning method to trainingmethod
isaacbmiller ba8b013
Add docstring and hparams
isaacbmiller bf8caa1
Fix typing
isaacbmiller d917502
Merge branch 'main' into main_finetune
dilarasoylu 6275c54
Update data functions
dilarasoylu 2cbc975
Prepare for merge
dilarasoylu 2589c52
Update data functions called in examples
dilarasoylu 6534082
Add input validation to finetuning
dilarasoylu 76fcaf2
Move data verification method to be a standalone method
dilarasoylu be5d070
Modify error message
dilarasoylu 74ca083
Notebook working
isaacbmiller 7097ba6
ruff fixes
isaacbmiller ef033cb
Test workflows
dilarasoylu f30d572
Convert a method on the Predict class to a function
dilarasoylu 2dde8c9
Convert methods on the Module class to functions
dilarasoylu 07e5736
Merge branch with 'main_finetune'
dilarasoylu ff6ecbc
Expose TrainableLM
dilarasoylu 37f685a
Update finetuning methods
dilarasoylu 484bc6f
Update finetune methods
dilarasoylu 5a895cc
Update finetune demo
dilarasoylu 221e3d8
Remove finetuning notebook v1
dilarasoylu 85f9a44
Colab fix
dilarasoylu 52cd8fe
Update notebook
dilarasoylu d21e818
Update finetuning_demo.ipynb
dilarasoylu 74bbba0
WIP anyscale
9c2fbb0
Moving to A100 node
7479a36
WIP starting notebook fr
765fe47
E2E notebook working - migrate to jobs + rayllm
dd06959
Remove offline modules
86e7740
pre-2.5
35bc427
WIP anyscale
33d54f4
Moving to A100 node
f97c09d
WIP starting notebook fr
2094431
E2E notebook working - migrate to jobs + rayllm
4082d5a
Remove offline modules
e3bb048
pre-2.5
e7ef3c4
Merge branch 'vllm-offline' of https://github.com/isaacbmiller/dspy i…
6da66a0
Merge
dilarasoylu 2c93154
Add LM.finetune()
dilarasoylu 56d64df
Remove dsp modifications
dilarasoylu c9c992c
Add launch kwargs to LM.finetune()
dilarasoylu 8f03141
Update LM
dilarasoylu 00014ef
WIP Converting to 2.5 LM FT
024fee6
Remove anyscale example
cc1f702
Clean files for PR
1684250
Updating trainable anyscale - to test
5289c70
Undo ensemble changes
b44de88
PR Cleaining
492d2e6
Job working
81accfe
Fix eval path pointing to default dataset
f231146
Fix eval path pointing to default dataset
9951d9b
Merge branch 'vllm-offline' of https://github.com/isaacbmiller/dspy i…
5ae10be
Remove trainable_openai
d7210d3
Update anyscale.py
dilarasoylu 3a948e1
Add AnyScale launch/kill calls to lm.py
dilarasoylu 731f5a3
Merge remote-tracking branch 'private/vllm-offline' into origin/dev_f…
isaacbmiller baa5f49
WIP switching to Job
fc587b7
WIP FT working evals not
42c581f
confirmed new FT working
9d8e6d3
Ruff fixes
b408892
Add OpenAI fine-tune methods
dilarasoylu 2305204
Support lm.copy()
dilarasoylu 1298880
Copy lm in execute_finetune_job
dilarasoylu d6bbb09
Merge remote-tracking branch 'upstream/main' into dev_finetune
9b7d2a3
Cleanup for PR
33e82be
WIP Remove process_dataset_threaded
00101a7
Update anyscale client
25f04c1
Copy edits
dilarasoylu e7d5046
Merge main
dilarasoylu 511b177
Merge branch 'main' into dev_finetune
dilarasoylu bee6bfe
Merge branch 'dev_finetune' into anyscale_dev_finetune
dilarasoylu 0d515a9
Re-add comment
dilarasoylu 6888694
Re-add space
dilarasoylu 4d3e2cf
Merge pull request #1633 from stanfordnlp/anyscale_dev_finetune
dilarasoylu 62b1f8d
Anyscale working
572a27d
ruff
cbdcbaf
Remove eval_set
88c3f96
fix failing test
82a9a64
Merge pull request #1641 from stanfordnlp/ft-changes
isaacbmiller 76a4630
Update openai.py
dilarasoylu 1ff79b3
small list fix
4800ae1
remove unnecesary comment
424c185
update method to train_method
b7229b6
remove finetuning_demo
f698df9
remove finetune_test
ac5fd2e
Merge branch 'main' into dev_finetune
044c954
Initial comment revisions
c72619f
ruff + remove self hosted
f2c4bed
remove unused utils and comment delimiters
decc074
Remove unused utils
3dd7bb1
Remove duplicate copy function
2c1ee9f
Clean up ft_teleprompter
2f30cb8
Move error message inside set_lm in program.py
dilarasoylu e428d07
Update program.py
dilarasoylu 4212ff5
Remove a duplicate set_lm
dilarasoylu dfe9ee8
Address comments
dilarasoylu 7ba3d01
Add comments to the comment
dilarasoylu 2e49def
Fix spacing
dilarasoylu 1eb95ce
Revert changes in predict.py
dilarasoylu 3881eb7
lint fixes
chenmoneygithub 9f040a8
Fix bad type hint
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,324 @@ | ||
| from typing import Any, Dict, List, Optional | ||
dilarasoylu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| import json | ||
| import yaml | ||
| import os | ||
|
|
||
| from dspy.utils.logging import logger | ||
| from dspy.clients.finetune import ( | ||
| FinetuneJob, | ||
| TrainingMethod, | ||
| save_data, | ||
| ) | ||
| from dspy.clients.openai import openai_data_validation | ||
|
|
||
| try: | ||
| # AnyScale fine-tuning requires the following additional imports | ||
| import anyscale | ||
| from anyscale.job import JobConfig | ||
| except ImportError: | ||
| anyscale = None | ||
|
|
||
|
|
||
| # List of training methods supported by AnyScale | ||
| TRAINING_METHODS_ANYSCALE = [ | ||
| TrainingMethod.SFT, | ||
| ] | ||
|
|
||
| PROVIDER_ANYSCALE = "anyscale" | ||
|
|
||
|
|
||
| def is_anyscale_model(model: str) -> bool: | ||
| """Check if the model is an AnyScale model.""" | ||
| # TODO: This needs to be implemented to support fine-tuning | ||
| logger.info("Is AnyScale model is not implemented, returning False as a default to not break lm.py") | ||
| return False | ||
|
|
||
|
|
||
| class FinetuneJobAnyScale(FinetuneJob): | ||
|
|
||
| def __init__(self, *args, **kwargs): | ||
| self.job_id = None | ||
| self.model_names = None | ||
| super().__init__(*args, **kwargs) | ||
|
|
||
|
|
||
| def finetune_anyscale( | ||
isaacbmiller marked this conversation as resolved.
Show resolved
Hide resolved
dilarasoylu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| job: FinetuneJobAnyScale, | ||
| model: str, | ||
| train_data: List[Dict[str, Any]], | ||
| train_kwargs: Optional[Dict[str, Any]]=None, | ||
| train_method: TrainingMethod = TrainingMethod.SFT, | ||
| ) -> str: | ||
| """Start the finetune job.""" | ||
| train_kwargs = train_kwargs or {} | ||
| assert "model" not in train_kwargs, "Model should not be in the train_kwargs" | ||
| train_kwargs_copy = train_kwargs.copy() | ||
| train_kwargs_copy["model"] = model | ||
|
|
||
| logger.info("[Finetune] Starting training process...") | ||
| if train_method not in TRAINING_METHODS_ANYSCALE: | ||
| raise NotImplementedError(f"AnyScale can only support {TRAINING_METHODS_ANYSCALE} for the time being") | ||
|
|
||
| logger.info("[Finetune] Validating the dataset format...") | ||
| if not verify_dataset(train_data): | ||
| # TODO: Does AnyScale support text completion models? | ||
| err = "[Finetune] Error: Unable to verify that the dataset is in the correct format." | ||
| logger.error(err) | ||
| raise RuntimeError(err) | ||
|
|
||
| logger.info("[Finetune] Converting data to JSONL format...") | ||
| train_data_path = save_data(train_data, provider_name=PROVIDER_ANYSCALE) | ||
| logger.info("[Finetune] Submitting data to remote storage...") | ||
| remote_train_path, _ = submit_data(train_path=train_data_path) | ||
| logger.info(f"[Finetune] Data submitted. Remote train path: {remote_train_path}") | ||
|
|
||
| logger.info("[Finetune] Generating configuration files...") | ||
| _, compute_config = generate_config_files(train_path=remote_train_path, **train_kwargs_copy) | ||
|
|
||
| logger.info("[Finetune] Starting remote training...") | ||
| job_id = start_remote_training(compute_config=compute_config, **train_kwargs_copy) | ||
| job.job_id = job_id | ||
| logger.info(f"[Finetune] Remote training started. Job ID: {job_id}") | ||
|
|
||
| logger.info("[Finetune] Waiting for training to complete...") | ||
| wait_for_training(job.job_id) | ||
| logger.info("[Finetune] Training completed.") | ||
|
|
||
| logger.info("[Finetune] Retrieving model information...") | ||
| model_info = get_model_info(job.job_id) | ||
| logger.info(f"[Finetune] Model info retrieved: {model_info}") | ||
|
|
||
| storage_uri = model_info["storage_uri"] | ||
| logger.info(f"[Finetune] Copying LoRA weights from {storage_uri}...") | ||
| model_names, lora_dynamic_path = copy_lora_weights(storage_uri, model_info, job.job_id) | ||
| logger.info(f"[Finetune] LoRA weights copied. Model names: {model_names}") | ||
|
|
||
|
|
||
| logger.info("[Finetune] Setting result in future object...") | ||
| model_step_pairs = sorted([(model_name, int(model_name.split("-")[-1])) for model_name in model_names], key=lambda x: x[1]) | ||
| last_model_checkpoint = model_step_pairs[-1][0] | ||
| logger.info("[Finetune] Training process completed successfully.") | ||
|
|
||
| logger.info("[Finetune] Updating model config with the proper dynamic path") | ||
| serve_config_path = train_kwargs.pop("serve_config_path", "serve_1B.yaml") | ||
| update_model_config(lora_dynamic_path, serve_config_path, job_id) | ||
| job.model_names = model_names | ||
|
|
||
| return last_model_checkpoint | ||
|
|
||
| def wait_for_training(job_id): | ||
| """Wait for the training to complete.""" | ||
| anyscale.job.wait(id=job_id, timeout_s=18000) | ||
|
|
||
|
|
||
| def update_model_config(lora_dynamic_path: str, serve_config_path: str, job_id: str): | ||
| """Update the model config storage location with the job_id.""" | ||
| with open(serve_config_path, "r") as f: | ||
| serve_config = yaml.safe_load(f) | ||
|
|
||
| model_config_location = serve_config["applications"][0]["args"]["llm_configs"][0] | ||
|
|
||
| with open(model_config_location, "r") as f: | ||
| model_config = yaml.safe_load(f) | ||
|
|
||
| dynamic_path_until_job_id = lora_dynamic_path.split(job_id)[0] + job_id | ||
| model_config["lora_config"]["dynamic_lora_loading_path"] = dynamic_path_until_job_id | ||
|
|
||
| with open(model_config_location, "w") as f: | ||
| yaml.safe_dump(model_config, f) | ||
|
|
||
|
|
||
| def verify_dataset(dataset: List[dict[str, Any]]) -> bool: | ||
| """Verify the training arguments before starting training.""" | ||
| dataset_validation = openai_data_validation(dataset) | ||
|
|
||
| if dataset_validation: | ||
| logger.error(f"Dataset validation failed: {dataset_validation}") | ||
| return False | ||
|
|
||
| return True | ||
|
|
||
|
|
||
| def submit_data(train_path: str): | ||
| """Upload the data to the Workspace cloud storage.""" | ||
| storage = os.environ['ANYSCALE_ARTIFACT_STORAGE'] | ||
|
|
||
| datasets = {"train": train_path} | ||
|
|
||
| fine_tuning_file_ids = {} | ||
| for name, path in datasets.items(): | ||
| num_items = len(read_jsonl(path)) | ||
| logger.info(f"Number of items in {name} data: {num_items}") | ||
|
|
||
| remote_path = os.path.join(storage, path.split("/")[-1]) | ||
| logger.info(f"Uploading {name} data to S3 at {remote_path}") | ||
| if remote_path[:2] == "s3": | ||
| os.system(f"aws s3 cp {path} {remote_path}") | ||
| elif remote_path[:2] == "gs": | ||
| os.system(f"gcloud storage cp {path} {remote_path}") | ||
| else: | ||
| os.system(f"cp {path} {remote_path}") | ||
| logger.info(f"Copied {path} to {remote_path}") | ||
| fine_tuning_file_ids[name] = remote_path | ||
|
|
||
| return fine_tuning_file_ids["train"], fine_tuning_file_ids.get("val", None) | ||
|
|
||
|
|
||
| def generate_config_files(train_path: str, **kwargs): | ||
| base_model_yaml_path = kwargs.get("train_config_yaml", None) | ||
| assert kwargs["model"] is not None, "Model is required to generate the config files" | ||
|
|
||
| use_lora = kwargs.get("use_lora", False) | ||
| example_dir = "" | ||
| lora_path = "configs/training/lora" if use_lora else "configs/training/full_param" | ||
|
|
||
|
|
||
| if not base_model_yaml_path: | ||
| def get_yaml_config(model_name): | ||
| if "llama" in model_name.lower(): | ||
| if "70b" in model_name: | ||
| return "llama-3-70b.yaml" | ||
| elif "13b" in model_name: | ||
| return "llama-3-70b.yaml" | ||
| else: | ||
| return "llama-3-8b.yaml" | ||
| elif "mistral" in model_name.lower(): | ||
| if "mixtral" in model_name.lower(): | ||
| return "mixtral-8x7b.yaml" | ||
| else: | ||
| return "mistral-7b.yaml" | ||
| else: | ||
| raise RuntimeError("No default yaml found for the model") | ||
|
|
||
| default_model_yaml_path = get_yaml_config(kwargs["model"]) | ||
| base_model_yaml_path = os.path.join(example_dir, lora_path, default_model_yaml_path) | ||
| logger.info(f"Using default yaml template for model: {base_model_yaml_path}") | ||
|
|
||
| model_config_data = yaml.safe_load(open(base_model_yaml_path, "r")) | ||
| model_config_data.update(kwargs.get("hyperparameters", {})) | ||
|
|
||
| model_config_data["model_id"] = kwargs["model"] | ||
|
|
||
| custom_modifications = { | ||
| "model_id": kwargs["model"], | ||
| "train_path": train_path, | ||
| "logger": { | ||
| "provider": "wandb", | ||
| }, | ||
| "num_checkpoints_to_keep": 10 | ||
| } | ||
| if kwargs.get("output_dir", None): | ||
| custom_modifications["output_dir"] = kwargs["output_dir"] | ||
|
|
||
| model_config_data.update(custom_modifications) | ||
| model_config_data = {k: v for k, v in model_config_data.items() if v is not None} | ||
|
|
||
| def freeze(d): | ||
| if isinstance(d, dict): | ||
| return tuple(sorted((key, freeze(value)) for key, value in d.items())) | ||
| elif isinstance(d, list): | ||
| return tuple(freeze(value) for value in sorted(d)) | ||
| elif isinstance(d, set): | ||
| return tuple(freeze(value) for value in sorted(d)) | ||
| return d | ||
|
|
||
| def hash_dict(d): | ||
| return hash(freeze(d)) | ||
| dict_sorted_hash = hash_dict(model_config_data) | ||
| if dict_sorted_hash < 0: | ||
| dict_sorted_hash = -dict_sorted_hash | ||
| filename = f"model_config_dspy_{dict_sorted_hash}.yaml" | ||
| logger.info(f"Model config data: {model_config_data}") | ||
| yaml.safe_dump(model_config_data, open(filename, "w")) | ||
|
|
||
| ft_path = os.path.join("utils", "ft.py") | ||
|
|
||
| compute_config_dict = { | ||
| "name": "dspy-llmforge-fine-tuning-job", | ||
| "entrypoint": f"llmforge anyscale finetune {filename}", | ||
| "working_dir": ".", | ||
| "image_uri": "localhost:5555/anyscale/llm-forge:0.5.6", | ||
| "requirements": [ | ||
| "wandb", | ||
| ], | ||
| "env_vars": { | ||
| "WANDB_API_KEY": os.environ.get("WANDB_API_KEY", ""), | ||
| "HF_TOKEN": os.environ.get("HF_TOKEN", ""), | ||
| "HF_HOME": os.environ.get("HF_HOME", ""), | ||
| } | ||
| } | ||
| compute_config_kwargs = kwargs.get("compute_config", {}) | ||
| compute_config_dict.update(compute_config_kwargs) | ||
| compute_config = JobConfig(**compute_config_dict) | ||
|
|
||
| job_runner_config_path = kwargs.get("compute_yaml_path", "job_runner_config.yaml") | ||
|
|
||
| return job_runner_config_path, compute_config | ||
|
|
||
|
|
||
| def start_remote_training(compute_config, **kwargs) -> str: | ||
| job_id: str = anyscale.job.submit(compute_config) | ||
| return job_id | ||
|
|
||
|
|
||
| def wait_for_training(job_id): | ||
| logger.info("Waiting for training to complete") | ||
| anyscale.job.wait(id=job_id, timeout_s=18000) | ||
|
|
||
|
|
||
| def get_model_info(job_id): | ||
| return anyscale.llm.model.get(job_id=job_id).to_dict() | ||
|
|
||
|
|
||
| def copy_lora_weights(storage_uri, model_info, job_id): | ||
| try: | ||
| from google.cloud import storage | ||
|
|
||
| storage_client = storage.Client() | ||
|
|
||
| bucket_name = storage_uri.split('/')[2] | ||
| source_folder = '/'.join(storage_uri.split('/')[3:-1]) | ||
| logger.info(f"Source folder: {source_folder}") | ||
|
|
||
| bucket = storage_client.bucket(bucket_name) | ||
|
|
||
| blobs = bucket.list_blobs(prefix=source_folder) | ||
|
|
||
| subfolders = set() | ||
| for blob in blobs: | ||
| if '/' in blob.name[len(source_folder):]: | ||
| subfolder = blob.name.split('/')[:-1] | ||
| subfolders.add('/'.join(subfolder)) | ||
|
|
||
| base_model_id = model_info["base_model_id"] | ||
| lora_dynamic_path = f"dspy/lora_weights/{job_id}/{base_model_id}" | ||
|
|
||
| model_names = [] | ||
| for subfolder in subfolders: | ||
| subfolder_name = subfolder.split('/')[-1] | ||
| destination_folder = f"{lora_dynamic_path}:{subfolder_name}" | ||
| if subfolder_name.startswith("epoch"): | ||
| model_names.append("/".join(destination_folder.split("/")[-2:])) | ||
| else: | ||
| continue | ||
|
|
||
| subfolder_blobs = bucket.list_blobs(prefix=subfolder) | ||
|
|
||
| for blob in subfolder_blobs: | ||
| source_blob = bucket.blob(blob.name) | ||
| destination_blob_name = f"{destination_folder}/{blob.name.split('/')[-1]}" | ||
| bucket.copy_blob(source_blob, bucket, destination_blob_name) | ||
| logger.info(f"Copied {source_blob.name} to {destination_blob_name}") | ||
|
|
||
| logger.info(f"All subfolders copied to: gs://{bucket_name}/{lora_dynamic_path}") | ||
| completed_path = f"gs://{bucket_name}/{lora_dynamic_path}" | ||
| return model_names, completed_path | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"An error occurred: {str(e)}") | ||
| raise e | ||
|
|
||
|
|
||
| def read_jsonl(filename): | ||
| with open(filename, "r") as f: | ||
| return [json.loads(line) for line in f] | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.