-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add draft for distributed pipeline (#264)
* add gitignore * add argo pipeline dependencies * add environment log component * add pipeline construction method * add run pipeline script * add envrc example
- Loading branch information
1 parent
b3fd7de
commit c9a03a8
Showing
9 changed files
with
1,082 additions
and
102 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
export ARGO_PIPELINE_BASE_IMAGE=us-central1-docker.pkg.dev/project/package/image:hash | ||
export ARGO_PIPELINE_ROOT=gs://storagebucket | ||
export ARGO_ENVIRONMENT_LOG_MACHINE_TYPE=n1-standard-4 | ||
export ARGO_ACCELERATOR_TYPE=NVIDIA_TESLA_T4 | ||
export ARGO_ACCELERATOR_COUNT=1 | ||
export ARGO_GCP_PROJECT_ID=project | ||
export ARGO_GCP_REGION=us-central1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
outputs/ | ||
multirun/ | ||
testing/ | ||
.envrc |
Empty file.
75 changes: 75 additions & 0 deletions
75
reproducibility/pipelines/argo/components/environment_log_component.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
from google_cloud_pipeline_components.v1.custom_job import ( | ||
create_custom_training_job_from_component, | ||
) | ||
from kfp.v2 import dsl | ||
from kfp.v2.dsl import Dataset # , Input, Model, Artifact | ||
from kfp.v2.dsl import Output | ||
|
||
|
||
def create_environment_log_component( | ||
base_image: str, | ||
display_name: str, | ||
machine_type: str, | ||
accelerator_type: str, | ||
accelerator_count: int, | ||
): | ||
@dsl.component( | ||
base_image=base_image, | ||
) | ||
def environment_log(message: str, environment_info: Output[Dataset]): | ||
print(message) | ||
print("Active conda environment and installed packages:") | ||
import os | ||
import subprocess | ||
|
||
commands = [ | ||
["ls", "-alh"], | ||
["pwd"], | ||
["which", "python"], | ||
["python", "--version"], | ||
["mamba", "info"], | ||
["mamba", "info", "--envs"], | ||
["mamba", "list"], | ||
["pip", "list"], | ||
["pip", "freeze"], | ||
["ls", "-alh", "/usr/local/nvidia/lib64"], | ||
["/usr/local/nvidia/bin/nvidia-smi"], | ||
] | ||
|
||
env_variables = ["NVIDIA_VISIBLE_DEVICES", "PATH", "LD_LIBRARY_PATH"] | ||
|
||
with open(environment_info.path, "a") as f: | ||
for command in commands: | ||
f.write(" ".join(command) + "\n") | ||
result = subprocess.run(command, capture_output=True, text=True) | ||
f.write(result.stdout + "\n" + result.stderr + "\n") | ||
print(result.stdout) | ||
print(result.stderr) | ||
|
||
for var in env_variables: | ||
f.write(f"{var}: {os.environ.get(var, 'Not Found')}\n") | ||
print(f"{var}: {os.environ.get(var, 'Not Found')}") | ||
|
||
import torch | ||
|
||
print(torch.__version__) | ||
|
||
if torch.cuda.is_available(): | ||
print("A CUDA-enabled GPU is available.") | ||
for device in range(torch.cuda.device_count()): | ||
print(f" Device: {device}") | ||
print(f" Name: {torch.cuda.get_device_name(device)}") | ||
print( | ||
f" Compute capability: {torch.cuda.get_device_capability(device)}" | ||
) | ||
print(f" Properties: {torch.cuda.get_device_properties(device)}") | ||
else: | ||
print("A CUDA-enabled GPU is not available.") | ||
|
||
return create_custom_training_job_from_component( | ||
environment_log, | ||
display_name=display_name, | ||
machine_type=machine_type, | ||
accelerator_type=accelerator_type, | ||
accelerator_count=accelerator_count, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import os | ||
from typing import Callable | ||
|
||
from dotenv import load_dotenv | ||
from hydra_zen import make_custom_builds_fn | ||
from kfp.v2 import dsl | ||
from kfp.v2.dsl import Dataset # Output, Input, Model, Artifact | ||
|
||
|
||
load_dotenv(".envrc") | ||
|
||
|
||
base_image = os.getenv("ARGO_PIPELINE_BASE_IMAGE") | ||
pipeline_root = os.getenv("ARGO_PIPELINE_ROOT") | ||
|
||
builds = make_custom_builds_fn(populate_full_signature=True) | ||
|
||
|
||
def create_complete_pipeline( | ||
pipeline_root: str, | ||
environment_log_component: Callable, | ||
): | ||
@dsl.pipeline( | ||
name="complete pipeline", | ||
description="complete run of pipeline", | ||
pipeline_root=pipeline_root, | ||
) | ||
def complete_pipeline( | ||
project: str, | ||
location: str, | ||
message: str, | ||
) -> Dataset: | ||
return environment_log_component( | ||
project=project, | ||
location=location, | ||
message=message, | ||
).outputs["environment_info"] | ||
|
||
return complete_pipeline |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import os | ||
from typing import Callable | ||
|
||
from components.environment_log_component import create_environment_log_component | ||
from construct_pipeline import create_complete_pipeline | ||
from dotenv import load_dotenv | ||
from google.cloud import aiplatform | ||
from hydra_zen import instantiate | ||
from hydra_zen import make_config | ||
from hydra_zen import make_custom_builds_fn | ||
from hydra_zen import store | ||
from hydra_zen import zen | ||
from kfp.v2 import dsl | ||
from kfp.v2.dsl import Dataset # Output, Input, Model, Artifact | ||
|
||
|
||
load_dotenv(".envrc") | ||
|
||
|
||
base_image = os.getenv("ARGO_PIPELINE_BASE_IMAGE") | ||
pipeline_root = os.getenv("ARGO_PIPELINE_ROOT") | ||
|
||
builds = make_custom_builds_fn(populate_full_signature=True) | ||
pbuilds = make_custom_builds_fn(zen_partial=True, populate_full_signature=True) | ||
|
||
EnvironmentLogComponentConf = builds(create_environment_log_component) | ||
base_envlog = EnvironmentLogComponentConf( | ||
base_image=base_image, | ||
display_name="environment_log_component", | ||
machine_type=os.environ["ARGO_ENVIRONMENT_LOG_MACHINE_TYPE"], | ||
accelerator_type=os.environ["ARGO_ACCELERATOR_TYPE"], | ||
accelerator_count=int(os.environ["ARGO_ACCELERATOR_COUNT"]), | ||
) | ||
envlog_store = store(group="job/environment_log_component") | ||
envlog_store(base_envlog, name="base_for_envlog_component") | ||
|
||
|
||
PipelineConf = builds( | ||
create_complete_pipeline, | ||
) | ||
base_pipeline = PipelineConf( | ||
pipeline_root=pipeline_root, | ||
environment_log_component=base_envlog, | ||
) | ||
|
||
pipeline_store = store(group="job/pipeline") | ||
pipeline_store(base_pipeline, name="base_pipeline") | ||
|
||
aiplatform.pipeline_jobs = aiplatform.PipelineJob | ||
JobConf = builds( | ||
aiplatform.PipelineJob.from_pipeline_func, | ||
) | ||
base_job = JobConf( | ||
pipeline_func=base_pipeline, | ||
parameter_values={ | ||
"project": os.environ["ARGO_GCP_PROJECT_ID"], | ||
"location": os.environ["ARGO_GCP_REGION"], | ||
"message": "message text", | ||
}, | ||
) | ||
|
||
job_store = store(group="job") | ||
job_store(base_job, name="base_job") | ||
|
||
|
||
@store(name="distributed_pipeline", hydra_defaults=["_self_", {"job": "base_job"}]) | ||
def task_function(job): | ||
print("submitting pipeline") | ||
print(job) | ||
job.submit() | ||
|
||
|
||
if __name__ == "__main__": | ||
store.add_to_hydra_store() | ||
zen(task_function).hydra_main( | ||
config_name="distributed_pipeline", | ||
version_base="1.1", | ||
config_path=".", | ||
) |