Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix protobuf breaking change by adding a compat layer. #43172

Merged
merged 6 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .buildkite/core.rayci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ steps:
tags:
- python
- oss
- skip-on-premerge
instance_type: medium
commands:
# validate minimal installation
Expand Down
2 changes: 1 addition & 1 deletion dashboard/modules/actor/actor_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def actor_table_data_to_dict(message):
"parentTaskId",
"sourceActorId",
},
including_default_value_fields=True,
always_print_fields_with_no_presence=True,
)
# The complete schema for actor table is here:
# src/ray/protobuf/gcs.proto
Expand Down
2 changes: 1 addition & 1 deletion dashboard/modules/actor/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def actor_table_data_to_dict(message):
"sourceActorId",
"placementGroupId",
},
including_default_value_fields=False,
always_print_fields_with_no_presence=False,
)

non_state_keys = ("actorId", "jobId")
Expand Down
4 changes: 2 additions & 2 deletions dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

def gcs_node_info_to_dict(message):
return dashboard_utils.message_to_dict(
message, {"nodeId"}, including_default_value_fields=True
message, {"nodeId"}, always_print_fields_with_no_presence=True
)


Expand Down Expand Up @@ -80,7 +80,7 @@ def node_stats_to_dict(message):
result = dashboard_utils.message_to_dict(message, decode_keys)
result["coreWorkersStats"] = [
dashboard_utils.message_to_dict(
m, decode_keys, including_default_value_fields=True
m, decode_keys, always_print_fields_with_no_presence=True
)
for m in core_workers_stats
]
Expand Down
11 changes: 6 additions & 5 deletions dashboard/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import aiosignal # noqa: F401

from google.protobuf.json_format import MessageToDict
import ray._private.protobuf_compat
from frozenlist import FrozenList # noqa: F401

from ray._private.utils import binary_to_hex, check_dashboard_dependencies_installed
Expand Down Expand Up @@ -217,12 +217,13 @@ def _decode_keys(d):
d[k] = v
return d

d = ray._private.protobuf_compat.message_to_dict(
message, use_integers_for_enums=False, **kwargs
)
if decode_keys:
return _decode_keys(
MessageToDict(message, use_integers_for_enums=False, **kwargs)
)
return _decode_keys(d)
else:
return MessageToDict(message, use_integers_for_enums=False, **kwargs)
return d


class SignalManager:
Expand Down
7 changes: 4 additions & 3 deletions python/ray/_private/event/event_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from typing import Dict, Optional
from datetime import datetime

from google.protobuf.json_format import MessageToDict, Parse
from google.protobuf.json_format import Parse

from ray.core.generated.event_pb2 import Event
from ray._private.protobuf_compat import message_to_dict

global_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -91,9 +92,9 @@ def _emit(self, severity: Event.Severity, message: str, **kwargs):

self.logger.info(
json.dumps(
MessageToDict(
message_to_dict(
event,
including_default_value_fields=True,
always_print_fields_with_no_presence=True,
preserving_proto_field_name=True,
use_integers_for_enums=False,
)
Expand Down
41 changes: 41 additions & 0 deletions python/ray/_private/protobuf_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from google.protobuf.json_format import MessageToDict
import inspect

"""
This module provides a compatibility layer for different versions of the protobuf
library.
"""


def rename_always_print_fields_with_no_presence(kwargs):
"""
Protobuf version 5.26.0rc2 renamed argument for `MessageToDict`:
`including_default_value_fields` -> `always_print_fields_with_no_presence`.
See https://github.com/protocolbuffers/protobuf/commit/06e7caba58ede0220b110b89d08f329e5f8a7537#diff-8de817c14d6a087981503c9aea38730b1b3e98f4e306db5ff9d525c7c304f234L129 # noqa: E501
We choose to always use the new argument name. If user used the old arg, we raise an
error.
If protobuf does not have the new arg name but have the old arg name, we rename our
arg to the old one.
"""
old_arg_name = "including_default_value_fields"
new_arg_name = "always_print_fields_with_no_presence"
if old_arg_name in kwargs:
raise ValueError(f"{old_arg_name} is deprecated, please use {new_arg_name}")
if new_arg_name not in kwargs:
return kwargs

params = inspect.signature(MessageToDict).parameters
if new_arg_name in params:
return kwargs
if old_arg_name in params:
kwargs[old_arg_name] = kwargs.pop(new_arg_name)
return kwargs
# Neither args are in the signature, do nothing.
return kwargs


def message_to_dict(*args, **kwargs):
kwargs = rename_always_print_fields_with_no_presence(kwargs)
return MessageToDict(*args, **kwargs)
4 changes: 2 additions & 2 deletions python/ray/_private/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections import defaultdict
from typing import Dict

from google.protobuf.json_format import MessageToDict
from ray._private.protobuf_compat import message_to_dict

import ray
from ray._private.client_mode_hook import client_mode_hook
Expand Down Expand Up @@ -346,7 +346,7 @@ def get_strategy(strategy):
"bundles": {
# The value here is needs to be dictionarified
# otherwise, the payload becomes unserializable.
bundle.bundle_id.bundle_index: MessageToDict(bundle)["unitResources"]
bundle.bundle_id.bundle_index: message_to_dict(bundle)["unitResources"]
for bundle in placement_group_info.bundles
},
"bundles_to_node_id": {
Expand Down
13 changes: 6 additions & 7 deletions python/ray/autoscaler/v2/instance_manager/reconciler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
from collections import defaultdict
from typing import Dict, List, Optional, Set, Tuple

from google.protobuf.json_format import MessageToDict

from ray._private.protobuf_compat import message_to_dict
from ray.autoscaler.v2.instance_manager.common import InstanceUtil
from ray.autoscaler.v2.instance_manager.instance_manager import InstanceManager
from ray.autoscaler.v2.instance_manager.node_provider import (
Expand Down Expand Up @@ -255,7 +254,7 @@ def _handle_cloud_instance_allocation(
"Updating {}({}) with {}".format(
instance.instance_id,
IMInstance.InstanceStatus.Name(instance.status),
MessageToDict(update_event),
message_to_dict(update_event),
)
)
updates[instance.instance_id] = update_event
Expand Down Expand Up @@ -345,7 +344,7 @@ def _handle_ray_install_failed(
"Updating {}({}) with {}".format(
instance_id,
IMInstance.InstanceStatus.Name(instance.status),
MessageToDict(updates[instance_id]),
message_to_dict(updates[instance_id]),
)
)

Expand Down Expand Up @@ -394,7 +393,7 @@ def _handle_cloud_instance_terminated(
"Updating {}({}) with {}".format(
instance.instance_id,
IMInstance.InstanceStatus.Name(instance.status),
MessageToDict(updates[instance.instance_id]),
message_to_dict(updates[instance.instance_id]),
)
)

Expand Down Expand Up @@ -447,7 +446,7 @@ def _handle_cloud_instance_termination_errors(
"Updating {}({}) with {}".format(
instance.instance_id,
IMInstance.InstanceStatus.Name(instance.status),
MessageToDict(updates[instance.instance_id]),
message_to_dict(updates[instance.instance_id]),
)
)

Expand Down Expand Up @@ -553,7 +552,7 @@ def _handle_ray_status_transition(
"Updating {}({}) with {}.".format(
im_instance.instance_id,
IMInstance.InstanceStatus.Name(im_instance.status),
MessageToDict(updates[im_instance.instance_id]),
message_to_dict(updates[im_instance.instance_id]),
)
)

Expand Down
9 changes: 5 additions & 4 deletions python/ray/autoscaler/v2/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from enum import Enum
from typing import Dict, List, Optional, Tuple

from google.protobuf.json_format import MessageToDict

from ray._private.protobuf_compat import message_to_dict
from ray.autoscaler._private.resource_demand_scheduler import UtilizationScore
from ray.autoscaler.v2.instance_manager.common import InstanceUtil
from ray.autoscaler.v2.instance_manager.config import NodeTypeConfig
Expand Down Expand Up @@ -242,15 +241,17 @@ def __repr__(self) -> str:
instance_id=self.im_instance_id,
ray_node_id=self.ray_node_id,
idle_duration_ms=self.idle_duration_ms,
termination_request=str(MessageToDict(self.termination_request))
termination_request=str(message_to_dict(self.termination_request))
if self.termination_request
else None,
status=self.status,
total_resources=self.total_resources,
available_resources=self.available_resources,
labels=self.labels,
launch_reason=self.launch_reason,
sched_requests="|".join(str(MessageToDict(r)) for r in self.sched_requests),
sched_requests="|".join(
str(message_to_dict(r)) for r in self.sched_requests
),
)


Expand Down
2 changes: 1 addition & 1 deletion python/ray/util/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1506,7 +1506,7 @@ def protobuf_message_to_dict(
return dashboard_utils.message_to_dict(
message,
fields_to_decode,
including_default_value_fields=True,
always_print_fields_with_no_presence=True,
preserving_proto_field_name=preserving_proto_field_name,
)

Expand Down
Loading