Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][client] Remove protocol version, instead just compare Ray and Python versions. #42760

Merged
merged 7 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,10 @@ def check_version_info(self):

if not cluster_metadata:
return
ray._private.utils.check_version_info(cluster_metadata)
node_ip_address = ray._private.services.get_node_ip_address()
ray._private.utils.check_version_info(
cluster_metadata, f"node {node_ip_address}"
)

def _register_shutdown_hooks(self):
# Register the atexit handler. In this case, we shouldn't call sys.exit
Expand Down
72 changes: 59 additions & 13 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1516,30 +1516,76 @@ def get_directory_size_bytes(path: Union[str, Path] = ".") -> int:
return total_size_bytes


def check_version_info(cluster_metadata):
def check_version_info(
cluster_metadata,
this_process_address,
raise_on_mismatch=True,
python_version_match_level="patch",
):
"""Check if the Python and Ray versions stored in GCS matches this process.
Args:
cluster_metadata: Ray cluster metadata from GCS.

this_process_address: Informational only. The address of this process.
e.g. "node address:port" or "Ray Client".
raise_on_mismatch: Raise an exception on True, log a warning otherwise.
python_version_match_level: "minor" or "patch". To which python version level we
try to match. Note if "minor" and the patch is different, we will still log
a warning.

Behavior:
- We raise or log a warning, based on raise_on_mismatch, if:
- Ray versions do not match; OR
- Python (major, minor) versions do not match,
if python_version_match_level == 'minor'; OR
- Python (major, minor, patch) versions do not match,
if python_version_match_level == 'patch'.
- We also log a warning if:
- Python (major, minor) versions match, AND
- Python patch versions do not match, AND
- python_version_match_level == 'minor' AND
- raise_on_mismatch == False.
Raises:
Exception: An exception is raised if there is a version mismatch.
"""
cluster_version_info = (
cluster_metadata["ray_version"],
cluster_metadata["python_version"],
)
version_info = compute_version_info()
if version_info != cluster_version_info:
node_ip_address = ray._private.services.get_node_ip_address()
error_message = (
"Version mismatch: The cluster was started with:\n"
" Ray: " + cluster_version_info[0] + "\n"
" Python: " + cluster_version_info[1] + "\n"
"This process on node " + node_ip_address + " was started with:" + "\n"
" Ray: " + version_info[0] + "\n"
" Python: " + version_info[1] + "\n"
my_version_info = compute_version_info()

# Calculate: ray_matches, python_matches, python_full_matches
ray_matches = cluster_version_info[0] == my_version_info[0]
python_full_matches = cluster_version_info[1] == my_version_info[1]
if python_version_match_level == "patch":
python_matches = cluster_version_info[1] == my_version_info[1]
elif python_version_match_level == "minor":
my_python_versions = my_version_info[1].split(".")
cluster_python_versions = cluster_version_info[1].split(".")
python_matches = my_python_versions[:2] == cluster_python_versions[:2]
else:
raise ValueError(
f"Invalid python_version_match_level: {python_version_match_level}, "
"want: 'minor' or 'patch'"
)
raise RuntimeError(error_message)

mismatch_msg = (
"The cluster was started with:\n"
f" Ray: {cluster_version_info[0]}\n"
f" Python: {cluster_version_info[1]}\n"
f"This process on {this_process_address} was started with:\n"
f" Ray: {my_version_info[0]}\n"
f" Python: {my_version_info[1]}\n"
)

if ray_matches and python_matches:
if not python_full_matches:
logger.warning(f"Python patch version mismatch: {mismatch_msg}")
else:
error_message = f"Version mismatch: {mismatch_msg}"
if raise_on_mismatch:
raise RuntimeError(error_message)
else:
logger.warning(error_message)


def get_runtime_env_info(
Expand Down
4 changes: 0 additions & 4 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,17 +1166,13 @@ class RayContext(BaseContext, Mapping):
python_version: str
ray_version: str
ray_commit: str
protocol_version: Optional[str]

def __init__(self, address_info: Dict[str, Optional[str]]):
super().__init__()
self.dashboard_url = get_dashboard_url()
self.python_version = "{}.{}.{}".format(*sys.version_info[:3])
self.ray_version = ray.__version__
self.ray_commit = ray.__commit__
# No client protocol version since this driver was intiialized
# directly
self.protocol_version = None
self.address_info = address_info

def __getitem__(self, key):
Expand Down
6 changes: 3 additions & 3 deletions python/ray/client_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@
class ClientContext(BaseContext):
"""
Basic context manager for a ClientBuilder connection.

`protocol_version` is no longer used.
"""

dashboard_url: Optional[str]
python_version: str
ray_version: str
ray_commit: str
protocol_version: Optional[str]
_num_clients: int
_context_to_restore: Optional[ray.util.client.RayAPIStub]
protocol_version: Optional[str] = None # Deprecated

def __enter__(self) -> "ClientContext":
self._swap_context()
Expand Down Expand Up @@ -185,7 +187,6 @@ def connect(self) -> ClientContext:
python_version=client_info_dict["python_version"],
ray_version=client_info_dict["ray_version"],
ray_commit=client_info_dict["ray_commit"],
protocol_version=client_info_dict["protocol_version"],
_num_clients=client_info_dict["num_clients"],
_context_to_restore=ray.util.client.ray.get_context(),
)
Expand Down Expand Up @@ -308,7 +309,6 @@ def connect(self) -> ClientContext:
),
ray_version=ray.__version__,
ray_commit=ray.__commit__,
protocol_version=None,
_num_clients=1,
_context_to_restore=None,
)
Expand Down
2 changes: 0 additions & 2 deletions python/ray/tests/test_client_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ def test_connect_to_cluster(ray_start_regular_shared):
assert client_context.python_version == python_version
assert client_context.ray_version == ray.__version__
assert client_context.ray_commit == ray.__commit__
protocol_version = ray.util.client.CURRENT_PROTOCOL_VERSION
assert client_context.protocol_version == protocol_version

server.stop(0)
subprocess.check_output("ray stop --force", shell=True)
Expand Down
35 changes: 1 addition & 34 deletions python/ray/tests/test_client_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import ray.util.client.server.server as ray_client_server
import ray.core.generated.ray_client_pb2 as ray_client_pb2

from ray.util.client import _ClientContext, CURRENT_PROTOCOL_VERSION
from ray.util.client import _ClientContext
from ray.cluster_utils import cluster_not_supported

import ray
Expand Down Expand Up @@ -144,7 +144,6 @@ def get_job_id(api):
assert isinstance(info3["ray_version"], str), info3
assert isinstance(info3["ray_commit"], str), info3
assert isinstance(info3["python_version"], str), info3
assert isinstance(info3["protocol_version"], str), info3
api3.disconnect()


Expand All @@ -164,38 +163,6 @@ def mock_connection_response():
python_version="2.7.12",
ray_version="",
ray_commit="",
protocol_version=CURRENT_PROTOCOL_VERSION,
)

# inject mock connection function
server_handle.data_servicer._build_connection_response = mock_connection_response

ray = _ClientContext()
with pytest.raises(RuntimeError):
_ = ray.connect("localhost:50051")

ray = _ClientContext()
info3 = ray.connect("localhost:50051", ignore_version=True)
assert info3["num_clients"] == 1, info3
ray.disconnect()


def test_protocol_version(init_and_serve):
server_handle = init_and_serve
ray = _ClientContext()
info1 = ray.connect("localhost:50051")
local_py_version = ".".join([str(x) for x in list(sys.version_info)[:3]])
assert info1["protocol_version"] == CURRENT_PROTOCOL_VERSION, info1
ray.disconnect()
time.sleep(1)

def mock_connection_response():
return ray_client_pb2.ConnectionInfoResponse(
num_clients=1,
python_version=local_py_version,
ray_version="",
ray_commit="",
protocol_version="2050-01-01", # from the future
)

# inject mock connection function
Expand Down
37 changes: 10 additions & 27 deletions python/ray/util/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import os
import sys
import threading
from typing import Any, Dict, List, Optional, Tuple

Expand All @@ -12,12 +11,10 @@
from ray._private.ray_logging import setup_logger
from ray.job_config import JobConfig
from ray.util.annotations import DeveloperAPI
from ray._private.utils import check_version_info

logger = logging.getLogger(__name__)

# This version string is incremented to indicate breaking changes in the
# protocol that require upgrading the client version.
CURRENT_PROTOCOL_VERSION = "2023-06-27"
logger = logging.getLogger(__name__)


class _ClientContext:
Expand Down Expand Up @@ -119,28 +116,14 @@ def _register_serializers(self):
ray.util.serialization_addons.apply(ctx)

def _check_versions(self, conn_info: Dict[str, Any], ignore_version: bool) -> None:
local_major_minor = f"{sys.version_info[0]}.{sys.version_info[1]}"
if not conn_info["python_version"].startswith(local_major_minor):
version_str = f"{local_major_minor}.{sys.version_info[2]}"
msg = (
"Python minor versions differ between client and server:"
+ f" client is {version_str},"
+ f" server is {conn_info['python_version']}"
)
if ignore_version or "RAY_IGNORE_VERSION_MISMATCH" in os.environ:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we removing behavior for RAY_IGNORE_VERSION_MISMATCH intentionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added back. Thanks for spotting it!

logger.warning(msg)
else:
raise RuntimeError(msg)
if CURRENT_PROTOCOL_VERSION != conn_info["protocol_version"]:
msg = (
"Client Ray installation incompatible with server:"
+ f" client is {CURRENT_PROTOCOL_VERSION},"
+ f" server is {conn_info['protocol_version']}"
)
if ignore_version or "RAY_IGNORE_VERSION_MISMATCH" in os.environ:
logger.warning(msg)
else:
raise RuntimeError(msg)
# conn_info has "python_version" and "ray_version" so it can be used to compare.
ignore_version = ignore_version or ("RAY_IGNORE_VERSION_MISMATCH" in os.environ)
check_version_info(
conn_info,
"Ray Client",
raise_on_mismatch=not ignore_version,
python_version_match_level="minor",
)

def disconnect(self):
"""Disconnect the Ray Client."""
Expand Down
2 changes: 0 additions & 2 deletions python/ray/util/client/server/dataservicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
_propagate_error_in_context,
OrderedResponseCache,
)
from ray.util.client import CURRENT_PROTOCOL_VERSION
from ray.util.debug import log_once
from ray._private.client_mode_hook import disable_client_hook

Expand Down Expand Up @@ -412,5 +411,4 @@ def _build_connection_response(self):
),
ray_version=ray.__version__,
ray_commit=ray.__commit__,
protocol_version=CURRENT_PROTOCOL_VERSION,
)
1 change: 0 additions & 1 deletion python/ray/util/client/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ def connection_info(self):
"python_version": data.python_version,
"ray_version": data.ray_version,
"ray_commit": data.ray_commit,
"protocol_version": data.protocol_version,
}

def register_callback(
Expand Down
1 change: 1 addition & 0 deletions src/ray/protobuf/ray_client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ message ConnectionInfoResponse {
// The Python version (e.g., "3.7.2").
string python_version = 4;
// The protocol version of the server (e.g., "2020-02-01").
// DEPRECATED. Not used anywhere. Will remove.
string protocol_version = 5;
}

Expand Down