Skip to content

Conversation

@verult
Copy link
Collaborator

@verult verult commented Jul 8, 2023

ResponseDemux implementation, from the prototype #6145

This is just a single dict, but with restricted access patterns. This data structure hopefully makes the broader streaming client code easy to reason about and more robust, with its dedicated unit tests.

This version is different from the prototype in that the caller is expected to generate the message ID. This is because the unsubscribe method in the prototype is broken: it cannot unsubscribe with just the caller-provided request because the message ID is missing.

Open to a better class name!

@maffoo @wcourtney

@verult verult requested a review from maffoo July 8, 2023 00:39
@verult verult requested review from a team, cduck, vtomole and wcourtney as code owners July 8, 2023 00:39
@CirqBot CirqBot added the size: M 50< lines changed <250 label Jul 8, 2023
Copy link
Contributor

@maffoo maffoo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments, then LGTM

Returns:
A future for the response, to be fulfilled when publish is called.
"""
response_future: asyncio.Future = asyncio.get_running_loop().create_future()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prefer to pass in a loop to __init__ and store it as a class attribute. This makes explicit that the class should be used with a single event loop (e.g. in a single thread).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this object is currently not initialized inside an asyncio coroutine (see https://github.com/quantumlib/Cirq/pull/6145/files#diff-064e02971da25c80aed74456243f329557be04f837822bfbdda8dfc403e23432R144). It makes sense to me to consider limiting the scope of ResponseDemux to just the _manage_stream() coroutine, but my preference is to do that in a follow-up PR.

for future in self._subscribers.values():
if not future.done():
future.set_exception(exception)
self._subscribers = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider call clear rather than creating a new dict:

Suggested change
self._subscribers = {}
self._subscribers.clear()

Comment on lines 49 to 54
if response.message_id not in self._subscribers:
return

future = self._subscribers.pop(response.message_id)
if not future.done():
future.set_result(response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can pass a default to pop to avoid a separate membership check:

Suggested change
if response.message_id not in self._subscribers:
return
future = self._subscribers.pop(response.message_id)
if not future.done():
future.set_result(response)
future = self._subscribers.pop(response.message_id, None)
if future and not future.done():
future.set_result(response)

Copy link
Collaborator

@wcourtney wcourtney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial comments on core logic, not yet tests. I think I need some clarification about the usage. Thanks!

"""A event demultiplexer for QuantumRunStreamResponses, as part of the async reactor pattern."""

def __init__(self):
self._subscribers: Dict[str, asyncio.Future] = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you documentation for this field, e.g. to describe the expected use/meaning of the keys and values?

async def test_no_subscribers_does_not_throw(self, demux):
demux.publish(RESPONSE0)

# expect no exceptions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? This sounds like it would typically be an error.

Copy link
Collaborator Author

@verult verult Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a response comes back from the server and the caller no longer wants it, it can be dropped. This scenario could happen and shouldn't be an error condition IMO.

Updated publish() docstring to reflect this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that this could also represent a bug that we would suppress, e.g. if either we forget to subscribe after sending the request or if the server sends an invalid message id. Can we disambiguate the legit and erroneous cases? I don't want to de-rail; could it be left as a TODO with an issue filed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think even if we caught the erroneous cases, because this is part of client code, the error message isn't actionable for the user. It would be better to catch those cases through tests, and the scenarios you described will be covered by StreamManager tests via timeouts.

@verult verult requested a review from wcourtney July 12, 2023 23:11
Copy link
Collaborator

@wcourtney wcourtney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM % some thing in the cancellation/unsubscription semantics it would be nice to resolve.

async def test_no_subscribers_does_not_throw(self, demux):
demux.publish(RESPONSE0)

# expect no exceptions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that this could also represent a bug that we would suppress, e.g. if either we forget to subscribe after sending the request or if the server sends an invalid message id. Can we disambiguate the legit and erroneous cases? I don't want to de-rail; could it be left as a TODO with an issue filed?

del self._subscribers[message_id]
"""Indicates that the caller is no longer waiting for the response matching message_id.
This helps ResponseDemux free up resources.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like something that the client should not be responsible for unless it's a real issue and is unresolvable without client intervention.

Copy link
Collaborator Author

@verult verult Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out this also isn't necessary after all. The only time unsubscribe is called is when the stream fails, at which point the entire ResponseDemux is cleared after publishing the exception all subscribers. Removing!

@verult verult mentioned this pull request Jul 15, 2023
@verult verult requested a review from wcourtney July 18, 2023 18:37
Copy link
Collaborator

@wcourtney wcourtney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@verult verult enabled auto-merge (squash) July 19, 2023 23:53
@verult verult merged commit 6e562e6 into quantumlib:master Jul 20, 2023
@verult verult mentioned this pull request Aug 4, 2023
8 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size: M 50< lines changed <250

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants