Skip to content

Commit

Permalink
Addressed maffoo's nits
Browse files Browse the repository at this point in the history
  • Loading branch information
verult committed Sep 7, 2023
1 parent a722cbb commit 7734b59
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions cirq-google/cirq_google/engine/stream_manager.py
Expand Up @@ -107,8 +107,6 @@ class StreamManager:
"""

_STOP_SIGNAL = None

def __init__(self, grpc_client: quantum.QuantumEngineServiceAsyncClient):
self._grpc_client = grpc_client
# Used to determine whether the stream coroutine is actively running, and provides a way to
Expand All @@ -123,11 +121,13 @@ def __init__(self, grpc_client: quantum.QuantumEngineServiceAsyncClient):
self._next_available_message_id = 0
# Construct queue in AsyncioExecutor to ensure it binds to the correct event loop, since it
# is used by asyncio coroutines.
self._request_queue: asyncio.Queue[
Optional[quantum.QuantumRunStreamRequest]
] = self._executor.submit(self._make_request_queue).result()
self._request_queue = self._executor.submit(self._make_request_queue).result()

async def _make_request_queue(self) -> asyncio.Queue[Optional[quantum.QuantumRunStreamRequest]]:
"""Returns a queue used to back the request iterator passed to the stream.
If `None` is put into the queue, the request iterator will stop.
"""
return asyncio.Queue()

def submit(
Expand Down Expand Up @@ -214,11 +214,11 @@ async def _manage_stream(
async for response in response_iterable:
self._response_demux.publish(response)
except asyncio.CancelledError:
await request_queue.put(StreamManager._STOP_SIGNAL)
await request_queue.put(None)
break
except BaseException as e:
# Note: the message ID counter is not reset upon a new stream.
await request_queue.put(StreamManager._STOP_SIGNAL)
await request_queue.put(None)
self._response_demux.publish_exception(e) # Raise to all request tasks

async def _manage_execution(
Expand Down

0 comments on commit 7734b59

Please sign in to comment.