Skip to content

Commit

Permalink
[SPARK-47035][SS][CONNECT][FOLLOWUP] Additional protobuf change to cl…
Browse files Browse the repository at this point in the history
…ient side listener

### What changes were proposed in this pull request?

Followup of previous protocol change apache#45091. Add the request proto `Command` and response proto message to `ExecutePlanResponse`

### Why are the changes needed?

Continuation of client side listener for spark connect

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Will be tested in subsequent PR, the proto change itself doesn't do any harm

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45444 from WweiL/SPARK-47035-protocol-followup.

Authored-by: Wei Liu <wei.liu@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
WweiL authored and jpcorreia99 committed Mar 12, 2024
1 parent 4d59796 commit 3364211
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ message ExecutePlanResponse {
// Response for commands on the streaming query manager.
StreamingQueryManagerCommandResult streaming_query_manager_command_result = 11;

// Response for commands on the client side streaming query listener.
StreamingQueryListenerEventsResult streaming_query_listener_events_result = 16;

// Response type informing if the stream is complete in reattachable execution.
ResultComplete result_complete = 14;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ message Command {
GetResourcesCommand get_resources_command = 8;
StreamingQueryManagerCommand streaming_query_manager_command = 9;
CommonInlineUserDefinedTableFunction register_table_function = 10;
StreamingQueryListenerBusCommand streaming_query_listener_bus_command = 11;

// This field is used to mark extensions to the protocol. When plugins generate arbitrary
// Commands they can add them here. During the planning the correct resolution is done.
Expand Down Expand Up @@ -456,6 +457,7 @@ message StreamingQueryListenerEvent {

message StreamingQueryListenerEventsResult {
repeated StreamingQueryListenerEvent events = 1;
optional bool listener_bus_listener_added = 2;
}

// Command to get the output of 'SparkContext.resources'
Expand Down
204 changes: 102 additions & 102 deletions python/pyspark/sql/connect/proto/base_pb2.py

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions python/pyspark/sql/connect/proto/base_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,7 @@ class ExecutePlanResponse(google.protobuf.message.Message):
STREAMING_QUERY_COMMAND_RESULT_FIELD_NUMBER: builtins.int
GET_RESOURCES_COMMAND_RESULT_FIELD_NUMBER: builtins.int
STREAMING_QUERY_MANAGER_COMMAND_RESULT_FIELD_NUMBER: builtins.int
STREAMING_QUERY_LISTENER_EVENTS_RESULT_FIELD_NUMBER: builtins.int
RESULT_COMPLETE_FIELD_NUMBER: builtins.int
EXTENSION_FIELD_NUMBER: builtins.int
METRICS_FIELD_NUMBER: builtins.int
Expand Down Expand Up @@ -1456,6 +1457,11 @@ class ExecutePlanResponse(google.protobuf.message.Message):
) -> pyspark.sql.connect.proto.commands_pb2.StreamingQueryManagerCommandResult:
"""Response for commands on the streaming query manager."""
@property
def streaming_query_listener_events_result(
self,
) -> pyspark.sql.connect.proto.commands_pb2.StreamingQueryListenerEventsResult:
"""Response for commands on the client side streaming query listener."""
@property
def result_complete(self) -> global___ExecutePlanResponse.ResultComplete:
"""Response type informing if the stream is complete in reattachable execution."""
@property
Expand Down Expand Up @@ -1493,6 +1499,8 @@ class ExecutePlanResponse(google.protobuf.message.Message):
| None = ...,
streaming_query_manager_command_result: pyspark.sql.connect.proto.commands_pb2.StreamingQueryManagerCommandResult
| None = ...,
streaming_query_listener_events_result: pyspark.sql.connect.proto.commands_pb2.StreamingQueryListenerEventsResult
| None = ...,
result_complete: global___ExecutePlanResponse.ResultComplete | None = ...,
extension: google.protobuf.any_pb2.Any | None = ...,
metrics: global___ExecutePlanResponse.Metrics | None = ...,
Expand Down Expand Up @@ -1521,6 +1529,8 @@ class ExecutePlanResponse(google.protobuf.message.Message):
b"sql_command_result",
"streaming_query_command_result",
b"streaming_query_command_result",
"streaming_query_listener_events_result",
b"streaming_query_listener_events_result",
"streaming_query_manager_command_result",
b"streaming_query_manager_command_result",
"write_stream_operation_start_result",
Expand Down Expand Up @@ -1558,6 +1568,8 @@ class ExecutePlanResponse(google.protobuf.message.Message):
b"sql_command_result",
"streaming_query_command_result",
b"streaming_query_command_result",
"streaming_query_listener_events_result",
b"streaming_query_listener_events_result",
"streaming_query_manager_command_result",
b"streaming_query_manager_command_result",
"write_stream_operation_start_result",
Expand All @@ -1574,6 +1586,7 @@ class ExecutePlanResponse(google.protobuf.message.Message):
"streaming_query_command_result",
"get_resources_command_result",
"streaming_query_manager_command_result",
"streaming_query_listener_events_result",
"result_complete",
"extension",
]
Expand Down
180 changes: 90 additions & 90 deletions python/pyspark/sql/connect/proto/commands_pb2.py

Large diffs are not rendered by default.

40 changes: 39 additions & 1 deletion python/pyspark/sql/connect/proto/commands_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class Command(google.protobuf.message.Message):
GET_RESOURCES_COMMAND_FIELD_NUMBER: builtins.int
STREAMING_QUERY_MANAGER_COMMAND_FIELD_NUMBER: builtins.int
REGISTER_TABLE_FUNCTION_FIELD_NUMBER: builtins.int
STREAMING_QUERY_LISTENER_BUS_COMMAND_FIELD_NUMBER: builtins.int
EXTENSION_FIELD_NUMBER: builtins.int
@property
def register_function(
Expand All @@ -124,6 +125,8 @@ class Command(google.protobuf.message.Message):
self,
) -> pyspark.sql.connect.proto.relations_pb2.CommonInlineUserDefinedTableFunction: ...
@property
def streaming_query_listener_bus_command(self) -> global___StreamingQueryListenerBusCommand: ...
@property
def extension(self) -> google.protobuf.any_pb2.Any:
"""This field is used to mark extensions to the protocol. When plugins generate arbitrary
Commands they can add them here. During the planning the correct resolution is done.
Expand All @@ -143,6 +146,8 @@ class Command(google.protobuf.message.Message):
streaming_query_manager_command: global___StreamingQueryManagerCommand | None = ...,
register_table_function: pyspark.sql.connect.proto.relations_pb2.CommonInlineUserDefinedTableFunction
| None = ...,
streaming_query_listener_bus_command: global___StreamingQueryListenerBusCommand
| None = ...,
extension: google.protobuf.any_pb2.Any | None = ...,
) -> None: ...
def HasField(
Expand All @@ -164,6 +169,8 @@ class Command(google.protobuf.message.Message):
b"sql_command",
"streaming_query_command",
b"streaming_query_command",
"streaming_query_listener_bus_command",
b"streaming_query_listener_bus_command",
"streaming_query_manager_command",
b"streaming_query_manager_command",
"write_operation",
Expand Down Expand Up @@ -193,6 +200,8 @@ class Command(google.protobuf.message.Message):
b"sql_command",
"streaming_query_command",
b"streaming_query_command",
"streaming_query_listener_bus_command",
b"streaming_query_listener_bus_command",
"streaming_query_manager_command",
b"streaming_query_manager_command",
"write_operation",
Expand All @@ -217,6 +226,7 @@ class Command(google.protobuf.message.Message):
"get_resources_command",
"streaming_query_manager_command",
"register_table_function",
"streaming_query_listener_bus_command",
"extension",
]
| None
Expand Down Expand Up @@ -1938,18 +1948,46 @@ class StreamingQueryListenerEventsResult(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

EVENTS_FIELD_NUMBER: builtins.int
LISTENER_BUS_LISTENER_ADDED_FIELD_NUMBER: builtins.int
@property
def events(
self,
) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
global___StreamingQueryListenerEvent
]: ...
listener_bus_listener_added: builtins.bool
def __init__(
self,
*,
events: collections.abc.Iterable[global___StreamingQueryListenerEvent] | None = ...,
listener_bus_listener_added: builtins.bool | None = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
"_listener_bus_listener_added",
b"_listener_bus_listener_added",
"listener_bus_listener_added",
b"listener_bus_listener_added",
],
) -> builtins.bool: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"_listener_bus_listener_added",
b"_listener_bus_listener_added",
"events",
b"events",
"listener_bus_listener_added",
b"listener_bus_listener_added",
],
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["events", b"events"]) -> None: ...
def WhichOneof(
self,
oneof_group: typing_extensions.Literal[
"_listener_bus_listener_added", b"_listener_bus_listener_added"
],
) -> typing_extensions.Literal["listener_bus_listener_added"] | None: ...

global___StreamingQueryListenerEventsResult = StreamingQueryListenerEventsResult

Expand Down

0 comments on commit 3364211

Please sign in to comment.