Skip to content

Commit

Permalink
add draft for distributed pipeline (pinellolab#264)
Browse files Browse the repository at this point in the history
* add gitignore
* add argo pipeline dependencies
* add environment log component
* add pipeline construction method
* add run pipeline script
* add envrc example
  • Loading branch information
cameronraysmith committed May 17, 2023
1 parent b3fd7de commit c9a03a8
Show file tree
Hide file tree
Showing 9 changed files with 1,082 additions and 102 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/cml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ on:
- "reproducibility/figures/*.py"
- "reproducibility/figures/dvc.*"
- "reproducibility/figures/config.yaml"
pull_request:
branches:
- master

# Review/set variables via gh CLI:
#
Expand Down
897 changes: 831 additions & 66 deletions poetry.lock

Large diffs are not rendered by default.

80 changes: 47 additions & 33 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ dvc-gs = { version = ">=2.20.0", optional = true }
jupyter-core = { version = ">=5.1.3", optional = true, source = "pypi" }
pyvis = { version = ">=0.3.2", optional = true }
hypothesis = {version = ">=6.71.0", optional = true }
kfp = { version = ">=1.8.18", optional = true }
google-cloud-aiplatform = { version = ">=1.24.1", optional = true }
google-cloud-pipeline-components = { version = ">=1.0.43", optional = true }
python-dotenv = { version = ">=1.0.0", optional = true }

[tool.conda-lock.dependencies]
seaborn = { version = "0.11.2", source = "conda-forge" }
Expand All @@ -104,39 +108,49 @@ platforms = [

[tool.poetry.extras]
dev = [
"Pygments",
"black",
"coverage",
"darglint",
"flake8",
"flake8-bandit",
"flake8-bugbear",
"flake8-docstrings",
"flake8-rst-docstrings",
"furo",
"hypothesis",
"isort",
"ipython",
"mypy",
"pep8-naming",
"pre-commit",
"pre-commit-hooks",
"pytest",
"pyupgrade",
"pyvis",
"safety",
"sphinx",
"sphinx-autobuild",
"sphinx-click",
"typeguard",
"xdoctest",
"myst-parser",
"dparse",
"pytest-cov",
"pytest-mock",
"poethepoet",
"dvc-gs",
"jupyter-core"
"black",
"coverage",
"darglint",
"dparse",
"dvc-gs",
"flake8-bandit",
"flake8-bugbear",
"flake8-docstrings",
"flake8-rst-docstrings",
"flake8",
"hypothesis",
"ipython",
"isort",
"jupyter-core",
"mypy",
"pep8-naming",
"poethepoet",
"pre-commit-hooks",
"pre-commit",
"Pygments",
"pytest-cov",
"pytest-mock",
"pytest",
"pyupgrade",
"pyvis",
"safety",
"typeguard",
"xdoctest",
]

docs = [
"furo",
"myst-parser",
"sphinx-autobuild",
"sphinx-click",
"sphinx",
]

pipeline = [
"google-cloud-aiplatform",
"google-cloud-pipeline-components",
"kfp",
"python-dotenv",
]


Expand Down
7 changes: 7 additions & 0 deletions reproducibility/pipelines/argo/.envrc.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export ARGO_PIPELINE_BASE_IMAGE=us-central1-docker.pkg.dev/project/package/image:hash
export ARGO_PIPELINE_ROOT=gs://storagebucket
export ARGO_ENVIRONMENT_LOG_MACHINE_TYPE=n1-standard-4
export ARGO_ACCELERATOR_TYPE=NVIDIA_TESLA_T4
export ARGO_ACCELERATOR_COUNT=1
export ARGO_GCP_PROJECT_ID=project
export ARGO_GCP_REGION=us-central1
4 changes: 4 additions & 0 deletions reproducibility/pipelines/argo/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
outputs/
multirun/
testing/
.envrc
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from google_cloud_pipeline_components.v1.custom_job import (
create_custom_training_job_from_component,
)
from kfp.v2 import dsl
from kfp.v2.dsl import Dataset # , Input, Model, Artifact
from kfp.v2.dsl import Output


def create_environment_log_component(
base_image: str,
display_name: str,
machine_type: str,
accelerator_type: str,
accelerator_count: int,
):
@dsl.component(
base_image=base_image,
)
def environment_log(message: str, environment_info: Output[Dataset]):
print(message)
print("Active conda environment and installed packages:")
import os
import subprocess

commands = [
["ls", "-alh"],
["pwd"],
["which", "python"],
["python", "--version"],
["mamba", "info"],
["mamba", "info", "--envs"],
["mamba", "list"],
["pip", "list"],
["pip", "freeze"],
["ls", "-alh", "/usr/local/nvidia/lib64"],
["/usr/local/nvidia/bin/nvidia-smi"],
]

env_variables = ["NVIDIA_VISIBLE_DEVICES", "PATH", "LD_LIBRARY_PATH"]

with open(environment_info.path, "a") as f:
for command in commands:
f.write(" ".join(command) + "\n")
result = subprocess.run(command, capture_output=True, text=True)
f.write(result.stdout + "\n" + result.stderr + "\n")
print(result.stdout)
print(result.stderr)

for var in env_variables:
f.write(f"{var}: {os.environ.get(var, 'Not Found')}\n")
print(f"{var}: {os.environ.get(var, 'Not Found')}")

import torch

print(torch.__version__)

if torch.cuda.is_available():
print("A CUDA-enabled GPU is available.")
for device in range(torch.cuda.device_count()):
print(f" Device: {device}")
print(f" Name: {torch.cuda.get_device_name(device)}")
print(
f" Compute capability: {torch.cuda.get_device_capability(device)}"
)
print(f" Properties: {torch.cuda.get_device_properties(device)}")
else:
print("A CUDA-enabled GPU is not available.")

return create_custom_training_job_from_component(
environment_log,
display_name=display_name,
machine_type=machine_type,
accelerator_type=accelerator_type,
accelerator_count=accelerator_count,
)
39 changes: 39 additions & 0 deletions reproducibility/pipelines/argo/construct_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
from typing import Callable

from dotenv import load_dotenv
from hydra_zen import make_custom_builds_fn
from kfp.v2 import dsl
from kfp.v2.dsl import Dataset # Output, Input, Model, Artifact


load_dotenv(".envrc")


base_image = os.getenv("ARGO_PIPELINE_BASE_IMAGE")
pipeline_root = os.getenv("ARGO_PIPELINE_ROOT")

builds = make_custom_builds_fn(populate_full_signature=True)


def create_complete_pipeline(
pipeline_root: str,
environment_log_component: Callable,
):
@dsl.pipeline(
name="complete pipeline",
description="complete run of pipeline",
pipeline_root=pipeline_root,
)
def complete_pipeline(
project: str,
location: str,
message: str,
) -> Dataset:
return environment_log_component(
project=project,
location=location,
message=message,
).outputs["environment_info"]

return complete_pipeline
79 changes: 79 additions & 0 deletions reproducibility/pipelines/argo/run_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
from typing import Callable

from components.environment_log_component import create_environment_log_component
from construct_pipeline import create_complete_pipeline
from dotenv import load_dotenv
from google.cloud import aiplatform
from hydra_zen import instantiate
from hydra_zen import make_config
from hydra_zen import make_custom_builds_fn
from hydra_zen import store
from hydra_zen import zen
from kfp.v2 import dsl
from kfp.v2.dsl import Dataset # Output, Input, Model, Artifact


load_dotenv(".envrc")


base_image = os.getenv("ARGO_PIPELINE_BASE_IMAGE")
pipeline_root = os.getenv("ARGO_PIPELINE_ROOT")

builds = make_custom_builds_fn(populate_full_signature=True)
pbuilds = make_custom_builds_fn(zen_partial=True, populate_full_signature=True)

EnvironmentLogComponentConf = builds(create_environment_log_component)
base_envlog = EnvironmentLogComponentConf(
base_image=base_image,
display_name="environment_log_component",
machine_type=os.environ["ARGO_ENVIRONMENT_LOG_MACHINE_TYPE"],
accelerator_type=os.environ["ARGO_ACCELERATOR_TYPE"],
accelerator_count=int(os.environ["ARGO_ACCELERATOR_COUNT"]),
)
envlog_store = store(group="job/environment_log_component")
envlog_store(base_envlog, name="base_for_envlog_component")


PipelineConf = builds(
create_complete_pipeline,
)
base_pipeline = PipelineConf(
pipeline_root=pipeline_root,
environment_log_component=base_envlog,
)

pipeline_store = store(group="job/pipeline")
pipeline_store(base_pipeline, name="base_pipeline")

aiplatform.pipeline_jobs = aiplatform.PipelineJob
JobConf = builds(
aiplatform.PipelineJob.from_pipeline_func,
)
base_job = JobConf(
pipeline_func=base_pipeline,
parameter_values={
"project": os.environ["ARGO_GCP_PROJECT_ID"],
"location": os.environ["ARGO_GCP_REGION"],
"message": "message text",
},
)

job_store = store(group="job")
job_store(base_job, name="base_job")


@store(name="distributed_pipeline", hydra_defaults=["_self_", {"job": "base_job"}])
def task_function(job):
print("submitting pipeline")
print(job)
job.submit()


if __name__ == "__main__":
store.add_to_hydra_store()
zen(task_function).hydra_main(
config_name="distributed_pipeline",
version_base="1.1",
config_path=".",
)

0 comments on commit c9a03a8

Please sign in to comment.