New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add StreamManager #6199
Add StreamManager #6199
Conversation
Btw, if there's a way to set a PR as diffbase, please let me know! |
My recommendation is to review methods with long docstrings first - they are the most important pieces. |
3dacd5a
to
7635c9c
Compare
Fixed a race involving The lingering cancellation failure is because the latest fixes in duet, which contains cancellation fixes, is not yet released. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass % tests. Really great overall, mostly nits and edge cases to consider. Thanks for all the attention to detail!
|
||
def __init__(self, grpc_client: quantum.QuantumEngineServiceAsyncClient): | ||
self._grpc_client = grpc_client | ||
self._request_queue: asyncio.Queue = asyncio.Queue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like lines 95-100 duplicate the contents of _reset()
. Could we just call that instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used that approach at first, but I got an error somewhere. I think it was a mypy error about self._response_demux
being uninitialized before being read. Seems like assignments in __init__()
is regarded as special here.
# TODO(#5996) add exponential backoff | ||
raise e | ||
|
||
# Either when this request is canceled or the _manage_stream() loop is canceled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either when this request is canceled or...
What's the client interface to cancel a request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarified in the class and submit()
docstrings.
|
||
If it is, returns the next stream request to send upon retry. Otherwise, raises an error. | ||
""" | ||
if error.code == Code.PROGRAM_DOES_NOT_EXIST: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have Python 3.10 for match/case statements? https://docs.python.org/3/whatsnew/3.10.html#pep-634-structural-pattern-matching
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know this feature is in Python now! Unfortunately Cirq still supports 3.9.
parent=project_name, quantum_program=program, quantum_job=job | ||
), | ||
) | ||
create_job_request = quantum.QuantumRunStreamRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this get used as a retry request type somewhere? I didn't find it ctrl+f'ing around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in _decide_retry_request_or_raise()
when there is a StreamError
.
current_request.create_quantum_program_and_job.quantum_program.name | ||
) | ||
elif error.code == Code.JOB_DOES_NOT_EXIST: | ||
if 'get_quantum_result' in current_request: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you talk through this error mechanism? The best cause I can imagine is that the job was deleted, but then we shouldn't create a new one.
If that's true, then we wouldn't call create_job_request
anywhere, which also makes thePROGRAM_DOES_NOT_EXIST
error handling obsolete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. What about the case where CreateProgramAndJobRequest
isn't sent properly before the stream breaks? A GetResultRequest
will be issued but the job doesn't exist. Unfortunately this is indistinguishable from the case where the job was deleted. I would say it's OK for the job to be recreated because if a user truly wants to stop a job execution, they could cancel the future returned by submit()
. WDYT?
b42424c
to
861752e
Compare
861752e
to
13d5cef
Compare
Addressed Will's comments and rebased with master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % some smallish suggestions. Thanks!!! 🙌
google.api_core.exceptions.GoogleAPICallError if the stream breaks with a non-retryable | ||
error. | ||
""" | ||
if self._manage_stream_loop_future is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like this is updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments, but mostly looks good. I still have some concerns about both the duet and asyncio threads mutating the StreamManager without synchronization, but I think we can address those issues in future PRs.
|
||
There is at most a single instance of this coroutine running. | ||
""" | ||
self._request_queue = asyncio.Queue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This coroutine runs in the AsyncioExecutor in a separate thread, so I think it should not touch self
, since that is also accessed separately in the duet thread. Or at the very least, any access to these fields should be protected with a lock so we don't have multiple threads accessing and modifying self
without synchronization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for the catch. I also noticed that response_demux
is written to in both the duet and asyncio threads. With that and your comment here, I'm getting a better understanding why having local scope is preferred, especially in a multithreaded environment. Thanks!
while self._request_queue is None: | ||
# Wait for the stream coroutine to start. | ||
# Ignoring coverage since this is rarely triggered. | ||
await asyncio.sleep(1) # coverage: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels awkward. I think a better interface would be not to set _request_stream
to a value or None
and then check for None
here, but rather to use a duet.AwaitableFuture[asyncio.Queue[...]]
. Then can just await this future here. This will avoid extra time from an arbitrary pause here, and it also helps with the problem of _manage_stream
mutating self
, because it can just complete a future instead, which is safe to do from a different thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using duet.AwaitableFuture
failed with "task got bad yield", probably because it's written and read in the asyncio thread. I could try resolving this issue again after moving the queue local to the asyncio scope in a follow-up PR.
# TODO(#5996) Consider moving the request future cancellation logic into a future | ||
# done callback, so that the the cancellation caller can wait for it to complete. | ||
# TODO(#5996) Check the condition that response_future is not done before | ||
# cancelling, once request cancellation is moved to a callback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to use callbacks just to get the semantics that we can wait for server-side cancellation. If we want such semantics, I think it would be better to subclass AwaitableFuture
so that the future completion can be delayed compared to the cancel
call. One has to be careful though when the actual cancellation involves sending a request to the server, since that is always a fallible operation and we can't wait arbitrarily long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to use callbacks just to get the semantics that we can wait for server-side cancellation. If we want such semantics, I think it would be better to subclass AwaitableFuture so that the future completion can be delayed compared to the cancel call.
Just for my understanding: why do you think the latter approach is better? Is it because cancel()
is a blocking call? I can also see that executing the cancellation routine in the same event loop as the main routine that fulfills the future would be very convenient.
def _to_create_job_request( | ||
create_program_and_job_request: quantum.QuantumRunStreamRequest, | ||
) -> quantum.QuantumRunStreamRequest: | ||
"""Converted the QuantumRunStreamRequest from a CreateQuantumProgramAndJobRequest to a | ||
CreateQuantumJobRequest. | ||
""" | ||
program = create_program_and_job_request.create_quantum_program_and_job.quantum_program | ||
job = create_program_and_job_request.create_quantum_program_and_job.quantum_job | ||
return quantum.QuantumRunStreamRequest( | ||
parent=create_program_and_job_request.parent, | ||
create_quantum_job=quantum.CreateQuantumJobRequest(parent=program.name, quantum_job=job), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: call the arg just request
to ease readability (the rest of the name duplicates info from the type)
def _to_create_job_request( | |
create_program_and_job_request: quantum.QuantumRunStreamRequest, | |
) -> quantum.QuantumRunStreamRequest: | |
"""Converted the QuantumRunStreamRequest from a CreateQuantumProgramAndJobRequest to a | |
CreateQuantumJobRequest. | |
""" | |
program = create_program_and_job_request.create_quantum_program_and_job.quantum_program | |
job = create_program_and_job_request.create_quantum_program_and_job.quantum_job | |
return quantum.QuantumRunStreamRequest( | |
parent=create_program_and_job_request.parent, | |
create_quantum_job=quantum.CreateQuantumJobRequest(parent=program.name, quantum_job=job), | |
) | |
def _to_create_job_request( | |
request: quantum.QuantumRunStreamRequest, | |
) -> quantum.QuantumRunStreamRequest: | |
"""Converts the QuantumRunStreamRequest from a CreateQuantumProgramAndJobRequest to a | |
CreateQuantumJobRequest. | |
""" | |
program = request.create_quantum_program_and_job.quantum_program | |
job = request.create_quantum_program_and_job.quantum_job | |
return quantum.QuantumRunStreamRequest( | |
parent=request.parent, | |
create_quantum_job=quantum.CreateQuantumJobRequest(parent=program.name, quantum_job=job), | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had this name here because the type is actually QuantumRunStreamRequest
.
def _to_get_result_request( | ||
create_program_and_job_request: quantum.QuantumRunStreamRequest, | ||
) -> quantum.QuantumRunStreamRequest: | ||
"""Converted the QuantumRunStreamRequest from a CreateQuantumProgramAndJobRequest to a | ||
GetQuantumResultRequest. | ||
""" | ||
job = create_program_and_job_request.create_quantum_program_and_job.quantum_job | ||
return quantum.QuantumRunStreamRequest( | ||
parent=create_program_and_job_request.parent, | ||
get_quantum_result=quantum.GetQuantumResultRequest(parent=job.name), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here too, I'd suggest calling the arg just request
def _to_get_result_request( | |
create_program_and_job_request: quantum.QuantumRunStreamRequest, | |
) -> quantum.QuantumRunStreamRequest: | |
"""Converted the QuantumRunStreamRequest from a CreateQuantumProgramAndJobRequest to a | |
GetQuantumResultRequest. | |
""" | |
job = create_program_and_job_request.create_quantum_program_and_job.quantum_job | |
return quantum.QuantumRunStreamRequest( | |
parent=create_program_and_job_request.parent, | |
get_quantum_result=quantum.GetQuantumResultRequest(parent=job.name), | |
) | |
def _to_get_result_request( | |
request: quantum.QuantumRunStreamRequest, | |
) -> quantum.QuantumRunStreamRequest: | |
"""Converts the QuantumRunStreamRequest from a CreateQuantumProgramAndJobRequest to a | |
GetQuantumResultRequest. | |
""" | |
job = request.create_quantum_program_and_job.quantum_job | |
return quantum.QuantumRunStreamRequest( | |
parent=request.parent, | |
get_quantum_result=quantum.GetQuantumResultRequest(parent=job.name), | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
…t/stream-manager-asyncio-heavy
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## master #6199 +/- ##
========================================
Coverage 97.37% 97.37%
========================================
Files 1116 1116
Lines 95708 96042 +334
========================================
+ Hits 93191 93520 +329
- Misses 2517 2522 +5
☔ View full report in Codecov by Sentry. |
…t/stream-manager-asyncio-heavy
The full StreamManager implementation prototyped in #6145.
Some tasks are deferred:
This PR depends on the latest version of duet which has not been released. It contains fixes for
AwaitableFuture
cancellations.I'm planning to add a README with design and sequence diagrams. Please let me know if you'd like to hold off on reviewing this PR until that's ready.
Only the last commit needs to be reviewed. All other commits are from #6190.
@maffoo @wcourtney