Skip to content

Commit

Permalink
[Core] Merge Driver/Job's runtime environment when it conflicts (#39208)
Browse files Browse the repository at this point in the history
When the job and driver both specifies the runtime env, driver's runtime env is ignored. This is a confusing behavior and we'd like to merge instead.

This PR merges the runtime envs, and if there's a conflict raises an exception. User can override the job's runtime env by driver's runtime env by specifying the env var.
  • Loading branch information
rkooo567 committed Sep 7, 2023
1 parent 07d6e67 commit b1356d7
Show file tree
Hide file tree
Showing 10 changed files with 426 additions and 44 deletions.
1 change: 0 additions & 1 deletion .buildkite/pipeline.build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@
- echo "--- Running the script 100 times with fault of networking delay"
- for i in {1..100}; do ray job submit --address http://localhost:8265 --runtime-env python/ray/tests/chaos/runtime_env.yaml --working-dir python/ray/tests/chaos -- python potato_passer.py --num-actors=3 --pass-times=3 --sleep-secs=0.01; done


# TODO: write a test that needs bandwidth heavy lifting
- label: ":kubernetes: :mending_heart: chaos bandwidth test"
conditions: ["RAY_CI_LINUX_WHEELS_AFFECTED"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import argparse
import sys
import time

import ray

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Dashboard agent.")
parser.add_argument(
"--conflict",
type=str,
)
parser.add_argument(
"--worker-process-setup-hook",
type=str,
)

args = parser.parse_args()

if args.worker_process_setup_hook:
ray.init(
runtime_env={
"worker_process_setup_hook": lambda: print(
args.worker_process_setup_hook
)
}
)

@ray.remote
def f():
pass

ray.get(f.remote())
time.sleep(5)
sys.exit(0)

if args.conflict == "pip":
ray.init(runtime_env={"pip": ["numpy"]})
print(ray._private.worker.global_worker.runtime_env)
elif args.conflict == "env_vars":
ray.init(runtime_env={"env_vars": {"A": "1"}})
print(ray._private.worker.global_worker.runtime_env)
else:
ray.init(
runtime_env={
"env_vars": {"C": "1"},
}
)
print(ray._private.worker.global_worker.runtime_env)
162 changes: 162 additions & 0 deletions dashboard/modules/job/tests/test_job_inheritance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import time
import json
import sys

import pytest

from ray.job_submission import JobSubmissionClient, JobStatus
from ray.cluster_utils import Cluster
from ray.dashboard.modules.job.tests.conftest import (
_driver_script_path,
)


def wait_until_status(client, job_id, status_to_wait_for, timeout_seconds=20):
start = time.time()
while time.time() - start <= timeout_seconds:
status = client.get_job_status(job_id)
print(f"status: {status}")
if status in status_to_wait_for:
return
time.sleep(1)
raise Exception


def test_job_driver_inheritance():
try:
c = Cluster()
c.add_node(num_cpus=1)
# If using a remote cluster, replace 127.0.0.1 with the head node's IP address.
client = JobSubmissionClient("http://127.0.0.1:8265")
driver_script_path = _driver_script_path("driver_runtime_env_inheritance.py")
job_id = client.submit_job(
entrypoint=f"python {driver_script_path}",
runtime_env={
"env_vars": {"A": "1", "B": "2"},
"pip": ["requests"],
},
)

def wait(job_id):
wait_until_status(
client,
job_id,
{JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED},
timeout_seconds=60,
)

def get_runtime_env_from_logs(client, job_id):
wait(job_id)
logs = client.get_job_logs(job_id)
print(logs)
assert client.get_job_status(job_id) == JobStatus.SUCCEEDED
return json.loads(logs.strip().split("\n")[-1])

# Test key is merged
print("Test key merged")
runtime_env = get_runtime_env_from_logs(client, job_id)
assert runtime_env["env_vars"] == {"A": "1", "B": "2", "C": "1"}
assert runtime_env["pip"] == {"packages": ["requests"], "pip_check": False}

# Test worker process setuphook works.
print("Test key setup hook")
expected_str = "HELLOWORLD"
job_id = client.submit_job(
entrypoint=(
f"python {driver_script_path} "
f"--worker-process-setup-hook {expected_str}"
),
runtime_env={
"env_vars": {"A": "1", "B": "2"},
},
)
wait(job_id)
logs = client.get_job_logs(job_id)
assert expected_str in logs

# Test raise an exception upon key conflict
print("Test conflicting pip")
job_id = client.submit_job(
entrypoint=f"python {driver_script_path} --conflict=pip",
runtime_env={"pip": ["numpy"]},
)
wait(job_id)
status = client.get_job_status(job_id)
logs = client.get_job_logs(job_id)
assert status == JobStatus.FAILED
assert "Failed to merge the Job's runtime env" in logs

# Test raise an exception upon env var conflict
print("Test conflicting env vars")
job_id = client.submit_job(
entrypoint=f"python {driver_script_path} --conflict=env_vars",
runtime_env={
"env_vars": {"A": "1"},
},
)
wait(job_id)
status = client.get_job_status(job_id)
logs = client.get_job_logs(job_id)
assert status == JobStatus.FAILED
assert "Failed to merge the Job's runtime env" in logs
finally:
c.shutdown()


def test_job_driver_inheritance_override(monkeypatch):
monkeypatch.setenv("RAY_OVERRIDE_JOB_RUNTIME_ENV", "1")

try:
c = Cluster()
c.add_node(num_cpus=1)
# If using a remote cluster, replace 127.0.0.1 with the head node's IP address.
client = JobSubmissionClient("http://127.0.0.1:8265")
driver_script_path = _driver_script_path("driver_runtime_env_inheritance.py")
job_id = client.submit_job(
entrypoint=f"python {driver_script_path}",
runtime_env={
"env_vars": {"A": "1", "B": "2"},
"pip": ["requests"],
},
)

def wait(job_id):
wait_until_status(
client,
job_id,
{JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED},
timeout_seconds=60,
)

def get_runtime_env_from_logs(client, job_id):
wait(job_id)
logs = client.get_job_logs(job_id)
print(logs)
assert client.get_job_status(job_id) == JobStatus.SUCCEEDED
return json.loads(logs.strip().split("\n")[-1])

# Test conflict resolution regular field
job_id = client.submit_job(
entrypoint=f"python {driver_script_path} --conflict=pip",
runtime_env={"pip": ["torch"]},
)
runtime_env = get_runtime_env_from_logs(client, job_id)
print(runtime_env)
assert runtime_env["pip"] == {"packages": ["numpy"], "pip_check": False}

# Test raise an exception upon env var conflict
job_id = client.submit_job(
entrypoint=f"python {driver_script_path} --conflict=env_vars",
runtime_env={
"env_vars": {"A": "2"},
},
)
runtime_env = get_runtime_env_from_logs(client, job_id)
print(runtime_env)
assert runtime_env["env_vars"]["A"] == "1"
finally:
c.shutdown()


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
21 changes: 0 additions & 21 deletions dashboard/modules/job/tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,27 +582,6 @@ async def test_multiple_runtime_envs(self, job_manager):
"{'env_vars': {'TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR': 'JOB_2_VAR'}}" in logs
) # noqa: E501

async def test_env_var_and_driver_job_config_warning(self, job_manager):
"""Ensure we got error message from worker.py and job logs
if user provided runtime_env in both driver script and submit()
"""
job_id = await job_manager.submit_job(
entrypoint=f"python {_driver_script_path('override_env_var.py')}",
runtime_env={
"env_vars": {"TEST_SUBPROCESS_JOB_CONFIG_ENV_VAR": "JOB_1_VAR"}
},
)

await async_wait_for_condition_async_predicate(
check_job_succeeded, job_manager=job_manager, job_id=job_id
)
logs = job_manager.get_job_logs(job_id)
token = (
"Both RAY_JOB_CONFIG_JSON_ENV_VAR and ray.init(runtime_env) are provided"
)
assert token in logs, logs
assert "JOB_1_VAR" in logs

async def test_failed_runtime_env_validation(self, job_manager):
"""Ensure job status is correctly set as failed if job has an invalid
runtime_env.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,10 @@ Now let's try it with a runtime environment that pins the version of the ``reque
# Job 'raysubmit_vGGV4MiP9rYkYUnb' succeeded
# ------------------------------------------
.. warning::
.. note::

When using the Ray Jobs API, the runtime environment should be specified only in the Jobs API (e.g. in `ray job submit --runtime-env=...` or `JobSubmissionClient.submit_job(runtime_env=...)`), not via `ray.init(runtime_env=...)` in the driver script.
If both the Driver and Job specify a runtime environment, Ray tries to merge them and raises an exception if they conflict.
See :ref:`runtime environments <runtime-environments-job-conflict>` for more details.

- The full API reference for the Ray Jobs CLI can be found :ref:`here <ray-job-submission-cli-ref>`.
- The full API reference for the Ray Jobs SDK can be found :ref:`here <ray-job-submission-sdk-ref>`.
Expand Down
79 changes: 74 additions & 5 deletions doc/source/ray-core/handling-dependencies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,11 @@ You can specify a runtime environment for your whole job, whether running a scri
.. warning::

If using the Ray Jobs API (either the Python SDK or the CLI), specify the ``runtime_env`` argument in the ``submit_job`` call or the ``ray job submit``, not in the ``ray.init()`` call in the entrypoint script (in this example, ``my_ray_script.py``).
Specifying the ``runtime_env`` argument in the ``submit_job`` or ``ray job submit`` call ensures the runtime environment is installed on the cluster before the entrypoint script is run.

This ensures the runtime environment is installed on the cluster before the entrypoint script is run.
If ``runtime_env`` is specified from ``ray.init(runtime_env=...)``, the runtime env is only applied to all children Tasks and Actors, not the entrypoint script (Driver) itself.

If ``runtime_env`` is specified by both ``ray job submit`` and ``ray.init``, the runtime environments are merged. See :ref:`Runtime Environment Specified by Both Job and Driver <runtime-environments-job-conflict>` for more details.

.. note::

Expand Down Expand Up @@ -445,14 +447,81 @@ Caching and Garbage Collection
""""""""""""""""""""""""""""""
Runtime environment resources on each node (such as conda environments, pip packages, or downloaded ``working_dir`` or ``py_modules`` directories) will be cached on the cluster to enable quick reuse across different runtime environments within a job. Each field (``working_dir``, ``py_modules``, etc.) has its own cache whose size defaults to 10 GB. To change this default, you may set the environment variable ``RAY_RUNTIME_ENV_<field>_CACHE_SIZE_GB`` on each node in your cluster before starting Ray e.g. ``export RAY_RUNTIME_ENV_WORKING_DIR_CACHE_SIZE_GB=1.5``.

When the cache size limit is exceeded, resources not currently used by any actor, task or job will be deleted.
When the cache size limit is exceeded, resources not currently used by any Actor, Task or Job are deleted.

.. _runtime-environments-job-conflict:

Runtime Environment Specified by Both Job and Driver
""""""""""""""""""""""""""""""""""""""""""""""""""""

When running an entrypoint script (Driver), the runtime environment can be specified via `ray.init(runtime_env=...)` or `ray job submit --runtime-env` (See :ref:`Specifying a Runtime Environment Per-Job <rte-per-job>` for more details).

- If the runtime environment is specified by ``ray job submit --runtime-env=...``, the runtime environments are applied to the entrypoint script (Driver) and all the tasks and actors created from it.
- If the runtime environment is specified by ``ray.init(runtime_env=...)``, the runtime environments are applied to all the tasks and actors, but not the entrypoint script (Driver) itself.

Since ``ray job submit`` submits a Driver (that calls ``ray.init``), sometimes runtime environments are specified by both of them. When both the Ray Job and Driver specify runtime environments, their runtime environments are merged if there's no conflict.
It means the driver script uses the runtime environment specified by `ray job submit`, and all the tasks and actors are going to use the merged runtime environment.
Ray raises an exception if the runtime environments conflict.

* The ``runtime_env["env_vars"]`` of `ray job submit --runtime-env=...` is merged with the ``runtime_env["env_vars"]`` of `ray.init(runtime_env=...)`.
Note that each individual env_var keys are merged.
If the environment variables conflict, Ray raises an exception.
* Every other field in the ``runtime_env`` will be merged. If any key conflicts, it raises an exception.

Example:

.. testcode::

# `ray job submit --runtime_env=...`
{"pip": ["requests", "chess"],
"env_vars": {"A": "a", "B": "b"}}

# ray.init(runtime_env=...)
{"env_vars": {"C": "c"}}

# Driver's actual `runtime_env` (merged with Job's)
{"pip": ["requests", "chess"],
"env_vars": {"A": "a", "B": "b", "C": "c"}}

Conflict Example:

.. testcode::

# Example 1, env_vars conflicts
# `ray job submit --runtime_env=...`
{"pip": ["requests", "chess"],
"env_vars": {"C": "a", "B": "b"}}

# ray.init(runtime_env=...)
{"env_vars": {"C": "c"}}

# Ray raises an exception because the "C" env var conflicts.

# Example 2, other field (e.g., pip) conflicts
# `ray job submit --runtime_env=...`
{"pip": ["requests", "chess"]}

# ray.init(runtime_env=...)
{"pip": ["torch"]}

# Ray raises an exception because "pip" conflicts.

You can set an environment variable `RAY_OVERRIDE_JOB_RUNTIME_ENV=1`
to avoid raising an exception upon a conflict. In this case, the runtime environments
are inherited in the same way as :ref:`Driver and Task and Actor both specify
runtime environments <runtime-environments-inheritance>`, where ``ray job submit``
is a parent and ``ray.init`` is a child.

.. _runtime-environments-inheritance:

Inheritance
"""""""""""

The runtime environment is inheritable, so it will apply to all tasks/actors within a job and all child tasks/actors of a task or actor once set, unless it is overridden.
.. _runtime-env-driver-to-task-inheritance:

The runtime environment is inheritable, so it applies to all Tasks and Actors within a Job and all child Tasks and Actors of a Task or Actor once set, unless it is overridden.

If an actor or task specifies a new ``runtime_env``, it will override the parent’s ``runtime_env`` (i.e., the parent actor/task's ``runtime_env``, or the job's ``runtime_env`` if there is no parent actor or task) as follows:
If an Actor or Task specifies a new ``runtime_env``, it overrides the parent’s ``runtime_env`` (i.e., the parent Actor's or Task's ``runtime_env``, or the Job's ``runtime_env`` if Actor or Task doesn't have a parent) as follows:

* The ``runtime_env["env_vars"]`` field will be merged with the ``runtime_env["env_vars"]`` field of the parent.
This allows for environment variables set in the parent's runtime environment to be automatically propagated to the child, even if new environment variables are set in the child's runtime environment.
Expand Down
2 changes: 0 additions & 2 deletions doc/source/ray-observability/user-guides/configure-logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,6 @@ ray.get(f.remote("A log message for a task."))
:class: caution
This is an experimental feature. The semantic of the API is subject to change.
It doesn't support [Ray Client](ray-client-ref) yet.
Currently, all the runtime environment passed to a driver (`ray.init(runtime_env={...})`) will be ignored if you specify any runtime environment via [Ray Job Submission](jobs-quickstart) API (`ray job submit --working-dir` or `ray job submit --runtime-env`).
```

Use `worker_process_setup_hook` to apply the new logging configuration to all worker processes within a job.
Expand Down
Loading

0 comments on commit b1356d7

Please sign in to comment.