From 13f05fdf472a97e6a80fb5337d691ec55a2380eb Mon Sep 17 00:00:00 2001 From: Cheng Xing Date: Tue, 15 Aug 2023 20:04:16 +0000 Subject: [PATCH] Revert "Integrate StreamManager with run_sweep() (#6233)" (#6242) This reverts commit 86479ae738454a2e6b88f30124a4e699ce22f302. --- cirq-google/cirq_google/engine/engine.py | 36 +- .../cirq_google/engine/engine_client.py | 78 ---- .../cirq_google/engine/engine_client_test.py | 333 +++--------------- cirq-google/cirq_google/engine/engine_job.py | 30 +- .../cirq_google/engine/engine_job_test.py | 48 --- .../engine/engine_processor_test.py | 45 ++- cirq-google/cirq_google/engine/engine_test.py | 100 ++++-- 7 files changed, 160 insertions(+), 510 deletions(-) diff --git a/cirq-google/cirq_google/engine/engine.py b/cirq-google/cirq_google/engine/engine.py index a213dd1eee7..3b889df0378 100644 --- a/cirq-google/cirq_google/engine/engine.py +++ b/cirq-google/cirq_google/engine/engine.py @@ -278,7 +278,7 @@ async def run_sweep_async( job_description: Optional[str] = None, job_labels: Optional[Dict[str, str]] = None, ) -> engine_job.EngineJob: - """Runs the supplied Circuit via Quantum Engine. + """Runs the supplied Circuit via Quantum Engine.Creates In contrast to run, this runs across multiple parameter sweeps, and does not block until a result is returned. @@ -312,35 +312,20 @@ async def run_sweep_async( Raises: ValueError: If no gate set is provided. """ - if not program_id: - program_id = _make_random_id('prog-') - if not job_id: - job_id = _make_random_id('job-') - run_context = self.context._serialize_run_context(params, repetitions) - - stream_job_response_future = self.context.client.run_job_over_stream( - project_id=self.project_id, - program_id=str(program_id), - program_description=program_description, - program_labels=program_labels, - code=self.context._serialize_program(program), - job_id=str(job_id), - processor_ids=processor_ids, - run_context=run_context, - job_description=job_description, - job_labels=job_labels, + engine_program = await self.create_program_async( + program, program_id, description=program_description, labels=program_labels ) - return engine_job.EngineJob( - self.project_id, - str(program_id), - str(job_id), - self.context, - stream_job_response_future=stream_job_response_future, + return await engine_program.run_sweep_async( + job_id=job_id, + params=params, + repetitions=repetitions, + processor_ids=processor_ids, + description=job_description, + labels=job_labels, ) run_sweep = duet.sync(run_sweep_async) - # TODO(#5996) Migrate to stream client async def run_batch_async( self, programs: Sequence[cirq.AbstractCircuit], @@ -421,7 +406,6 @@ async def run_batch_async( run_batch = duet.sync(run_batch_async) - # TODO(#5996) Migrate to stream client async def run_calibration_async( self, layers: List['cirq_google.CalibrationLayer'], diff --git a/cirq-google/cirq_google/engine/engine_client.py b/cirq-google/cirq_google/engine/engine_client.py index 9a92a3b642e..db27266edc9 100644 --- a/cirq-google/cirq_google/engine/engine_client.py +++ b/cirq-google/cirq_google/engine/engine_client.py @@ -38,7 +38,6 @@ from cirq._compat import cached_property from cirq_google.cloud import quantum from cirq_google.engine.asyncio_executor import AsyncioExecutor -from cirq_google.engine import stream_manager _M = TypeVar('_M', bound=proto.Message) _R = TypeVar('_R') @@ -106,10 +105,6 @@ async def make_client(): return self._executor.submit(make_client).result() - @cached_property - def _stream_manager(self) -> stream_manager.StreamManager: - return stream_manager.StreamManager(self.grpc_client) - async def _send_request_async(self, func: Callable[[_M], Awaitable[_R]], request: _M) -> _R: """Sends a request by invoking an asyncio callable.""" return await self._run_retry_async(func, request) @@ -702,79 +697,6 @@ async def get_job_results_async( get_job_results = duet.sync(get_job_results_async) - def run_job_over_stream( - self, - project_id: str, - program_id: str, - code: any_pb2.Any, - job_id: str, - processor_ids: Sequence[str], - run_context: any_pb2.Any, - program_description: Optional[str] = None, - program_labels: Optional[Dict[str, str]] = None, - priority: Optional[int] = None, - job_description: Optional[str] = None, - job_labels: Optional[Dict[str, str]] = None, - ) -> duet.AwaitableFuture[Union[quantum.QuantumResult, quantum.QuantumJob]]: - """Runs a job with the given program and job information over a stream. - - Sends the request over the Quantum Engine QuantumRunStream bidirectional stream, and returns - a future for the stream response. The future will be completed with a `QuantumResult` if - the job is successful; otherwise, it will be completed with a QuantumJob. - - Args: - project_id: A project_id of the parent Google Cloud Project. - program_id: Unique ID of the program within the parent project. - code: Properly serialized program code. - job_id: Unique ID of the job within the parent program. - run_context: Properly serialized run context. - processor_ids: List of processor id for running the program. - program_description: An optional description to set on the program. - program_labels: Optional set of labels to set on the program. - priority: Optional priority to run at, 0-1000. - job_description: Optional description to set on the job. - job_labels: Optional set of labels to set on the job. - - Returns: - A future for the job result, or the job if the job has failed. - - Raises: - ValueError: If the priority is not between 0 and 1000. - """ - # Check program to run and program parameters. - if priority and not 0 <= priority < 1000: - raise ValueError('priority must be between 0 and 1000') - - project_name = _project_name(project_id) - - program_name = _program_name_from_ids(project_id, program_id) - program = quantum.QuantumProgram(name=program_name, code=code) - if program_description: - program.description = program_description - if program_labels: - program.labels.update(program_labels) - - job = quantum.QuantumJob( - name=_job_name_from_ids(project_id, program_id, job_id), - scheduling_config=quantum.SchedulingConfig( - processor_selector=quantum.SchedulingConfig.ProcessorSelector( - processor_names=[ - _processor_name_from_ids(project_id, processor_id) - for processor_id in processor_ids - ] - ) - ), - run_context=run_context, - ) - if priority: - job.scheduling_config.priority = priority - if job_description: - job.description = job_description - if job_labels: - job.labels.update(job_labels) - - return self._stream_manager.submit(project_name, program, job) - async def list_processors_async(self, project_id: str) -> List[quantum.QuantumProcessor]: """Returns a list of Processors that the user has visibility to in the current Engine project. The names of these processors are used to diff --git a/cirq-google/cirq_google/engine/engine_client_test.py b/cirq-google/cirq_google/engine/engine_client_test.py index d26240d171b..b416f75b7d1 100644 --- a/cirq-google/cirq_google/engine/engine_client_test.py +++ b/cirq-google/cirq_google/engine/engine_client_test.py @@ -24,25 +24,18 @@ from google.protobuf.timestamp_pb2 import Timestamp from cirq_google.engine.engine_client import EngineClient, EngineException -import cirq_google.engine.stream_manager as engine_stream_manager from cirq_google.cloud import quantum -def _setup_client_mock(client_constructor): +def setup_mock_(client_constructor): grpc_client = mock.AsyncMock() client_constructor.return_value = grpc_client return grpc_client -def _setup_stream_manager_mock(manager_constructor): - stream_manager = mock.MagicMock() - manager_constructor.return_value = stream_manager - return stream_manager - - @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_create_program(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) result = quantum.QuantumProgram(name='projects/proj/programs/prog') grpc_client.create_quantum_program.return_value = result @@ -102,7 +95,7 @@ def test_create_program(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_get_program(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) result = quantum.QuantumProgram(name='projects/proj/programs/prog') grpc_client.get_quantum_program.return_value = result @@ -121,7 +114,7 @@ def test_get_program(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_list_program(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) results = [ quantum.QuantumProgram(name='projects/proj/programs/prog1'), @@ -169,7 +162,7 @@ def test_list_program(client_constructor): def test_list_program_filters( client_constructor, expected_filter, created_before, created_after, labels ): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) client = EngineClient() client.list_programs( project_id='proj', @@ -188,7 +181,7 @@ def test_list_program_filters_invalid_type(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_set_program_description(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) result = quantum.QuantumProgram(name='projects/proj/programs/prog') grpc_client.update_quantum_program.return_value = result @@ -217,7 +210,7 @@ def test_set_program_description(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_set_program_labels(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) grpc_client.get_quantum_program.return_value = quantum.QuantumProgram( labels={'color': 'red', 'weather': 'sun', 'run': '1'}, label_fingerprint='hash' @@ -252,7 +245,7 @@ def test_set_program_labels(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_add_program_labels(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) existing = quantum.QuantumProgram( labels={'color': 'red', 'weather': 'sun', 'run': '1'}, label_fingerprint='hash' @@ -294,7 +287,7 @@ def test_add_program_labels(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_remove_program_labels(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) existing = quantum.QuantumProgram( labels={'color': 'red', 'weather': 'sun', 'run': '1'}, label_fingerprint='hash' @@ -334,7 +327,7 @@ def test_remove_program_labels(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_delete_program(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) client = EngineClient() assert not client.delete_program('proj', 'prog') @@ -350,7 +343,7 @@ def test_delete_program(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_create_job(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) result = quantum.QuantumJob(name='projects/proj/programs/prog/jobs/job0') grpc_client.create_quantum_job.return_value = result @@ -474,253 +467,9 @@ def test_create_job(client_constructor): ) -@pytest.mark.parametrize( - 'run_job_kwargs, expected_submit_args', - [ - ( - { - 'project_id': 'proj', - 'program_id': 'prog', - 'code': any_pb2.Any(), - 'job_id': 'job0', - 'processor_ids': ['processor0'], - 'run_context': any_pb2.Any(), - 'program_description': 'A program', - 'program_labels': {'hello': 'world'}, - 'priority': 10, - 'job_description': 'A job', - 'job_labels': {'hello': 'world'}, - }, - [ - 'projects/proj', - quantum.QuantumProgram( - name='projects/proj/programs/prog', - code=any_pb2.Any(), - description='A program', - labels={'hello': 'world'}, - ), - quantum.QuantumJob( - name='projects/proj/programs/prog/jobs/job0', - run_context=any_pb2.Any(), - scheduling_config=quantum.SchedulingConfig( - priority=10, - processor_selector=quantum.SchedulingConfig.ProcessorSelector( - processor_names=['projects/proj/processors/processor0'] - ), - ), - description='A job', - labels={'hello': 'world'}, - ), - ], - ), - # Missing program labels - ( - { - 'project_id': 'proj', - 'program_id': 'prog', - 'code': any_pb2.Any(), - 'job_id': 'job0', - 'processor_ids': ['processor0'], - 'run_context': any_pb2.Any(), - 'program_description': 'A program', - 'priority': 10, - 'job_description': 'A job', - 'job_labels': {'hello': 'world'}, - }, - [ - 'projects/proj', - quantum.QuantumProgram( - name='projects/proj/programs/prog', code=any_pb2.Any(), description='A program' - ), - quantum.QuantumJob( - name='projects/proj/programs/prog/jobs/job0', - run_context=any_pb2.Any(), - scheduling_config=quantum.SchedulingConfig( - priority=10, - processor_selector=quantum.SchedulingConfig.ProcessorSelector( - processor_names=['projects/proj/processors/processor0'] - ), - ), - description='A job', - labels={'hello': 'world'}, - ), - ], - ), - # Missing program description and labels - ( - { - 'project_id': 'proj', - 'program_id': 'prog', - 'code': any_pb2.Any(), - 'job_id': 'job0', - 'processor_ids': ['processor0'], - 'run_context': any_pb2.Any(), - 'priority': 10, - 'job_description': 'A job', - 'job_labels': {'hello': 'world'}, - }, - [ - 'projects/proj', - quantum.QuantumProgram(name='projects/proj/programs/prog', code=any_pb2.Any()), - quantum.QuantumJob( - name='projects/proj/programs/prog/jobs/job0', - run_context=any_pb2.Any(), - scheduling_config=quantum.SchedulingConfig( - priority=10, - processor_selector=quantum.SchedulingConfig.ProcessorSelector( - processor_names=['projects/proj/processors/processor0'] - ), - ), - description='A job', - labels={'hello': 'world'}, - ), - ], - ), - # Missing job labels - ( - { - 'project_id': 'proj', - 'program_id': 'prog', - 'code': any_pb2.Any(), - 'job_id': 'job0', - 'processor_ids': ['processor0'], - 'run_context': any_pb2.Any(), - 'program_description': 'A program', - 'program_labels': {'hello': 'world'}, - 'priority': 10, - 'job_description': 'A job', - }, - [ - 'projects/proj', - quantum.QuantumProgram( - name='projects/proj/programs/prog', - code=any_pb2.Any(), - description='A program', - labels={'hello': 'world'}, - ), - quantum.QuantumJob( - name='projects/proj/programs/prog/jobs/job0', - run_context=any_pb2.Any(), - scheduling_config=quantum.SchedulingConfig( - priority=10, - processor_selector=quantum.SchedulingConfig.ProcessorSelector( - processor_names=['projects/proj/processors/processor0'] - ), - ), - description='A job', - ), - ], - ), - # Missing job description and labels - ( - { - 'project_id': 'proj', - 'program_id': 'prog', - 'code': any_pb2.Any(), - 'job_id': 'job0', - 'processor_ids': ['processor0'], - 'run_context': any_pb2.Any(), - 'program_description': 'A program', - 'program_labels': {'hello': 'world'}, - 'priority': 10, - }, - [ - 'projects/proj', - quantum.QuantumProgram( - name='projects/proj/programs/prog', - code=any_pb2.Any(), - description='A program', - labels={'hello': 'world'}, - ), - quantum.QuantumJob( - name='projects/proj/programs/prog/jobs/job0', - run_context=any_pb2.Any(), - scheduling_config=quantum.SchedulingConfig( - priority=10, - processor_selector=quantum.SchedulingConfig.ProcessorSelector( - processor_names=['projects/proj/processors/processor0'] - ), - ), - ), - ], - ), - # Missing job priority, description, and labels - ( - { - 'project_id': 'proj', - 'program_id': 'prog', - 'code': any_pb2.Any(), - 'job_id': 'job0', - 'processor_ids': ['processor0'], - 'run_context': any_pb2.Any(), - 'program_description': 'A program', - 'program_labels': {'hello': 'world'}, - }, - [ - 'projects/proj', - quantum.QuantumProgram( - name='projects/proj/programs/prog', - code=any_pb2.Any(), - description='A program', - labels={'hello': 'world'}, - ), - quantum.QuantumJob( - name='projects/proj/programs/prog/jobs/job0', - run_context=any_pb2.Any(), - scheduling_config=quantum.SchedulingConfig( - processor_selector=quantum.SchedulingConfig.ProcessorSelector( - processor_names=['projects/proj/processors/processor0'] - ) - ), - ), - ], - ), - ], -) -@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) -@mock.patch.object(engine_stream_manager, 'StreamManager', autospec=True) -def test_run_job_over_stream( - manager_constructor, client_constructor, run_job_kwargs, expected_submit_args -): - _setup_client_mock(client_constructor) - stream_manager = _setup_stream_manager_mock(manager_constructor) - - result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0') - expected_future = duet.AwaitableFuture() - expected_future.try_set_result(result) - stream_manager.submit.return_value = expected_future - client = EngineClient() - - actual_future = client.run_job_over_stream(**run_job_kwargs) - - assert actual_future == expected_future - stream_manager.submit.assert_called_with(*expected_submit_args) - - -@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) -@mock.patch.object(engine_stream_manager, 'StreamManager', autospec=True) -def test_run_job_over_stream_with_priority_out_of_bound_raises( - manager_constructor, client_constructor -): - _setup_client_mock(client_constructor) - _setup_stream_manager_mock(manager_constructor) - client = EngineClient() - - with pytest.raises(ValueError): - client.run_job_over_stream( - project_id='proj', - program_id='prog', - code=any_pb2.Any(), - job_id='job0', - processor_ids=['processor0'], - run_context=any_pb2.Any(), - priority=9001, - ) - - @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_get_job(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) result = quantum.QuantumJob(name='projects/proj/programs/prog/jobs/job0') grpc_client.get_quantum_job.return_value = result @@ -743,7 +492,7 @@ def test_get_job(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_set_job_description(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) result = quantum.QuantumJob(name='projects/proj/programs/prog/jobs/job0') grpc_client.update_quantum_job.return_value = result @@ -772,7 +521,7 @@ def test_set_job_description(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_set_job_labels(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) grpc_client.get_quantum_job.return_value = quantum.QuantumJob( labels={'color': 'red', 'weather': 'sun', 'run': '1'}, label_fingerprint='hash' @@ -809,7 +558,7 @@ def test_set_job_labels(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_add_job_labels(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) existing = quantum.QuantumJob( labels={'color': 'red', 'weather': 'sun', 'run': '1'}, label_fingerprint='hash' @@ -853,7 +602,7 @@ def test_add_job_labels(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_remove_job_labels(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) existing = quantum.QuantumJob( labels={'color': 'red', 'weather': 'sun', 'run': '1'}, label_fingerprint='hash' @@ -893,7 +642,7 @@ def test_remove_job_labels(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_delete_job(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) client = EngineClient() assert not client.delete_job('proj', 'prog', 'job0') @@ -904,7 +653,7 @@ def test_delete_job(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_cancel_job(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) client = EngineClient() assert not client.cancel_job('proj', 'prog', 'job0') @@ -915,7 +664,7 @@ def test_cancel_job(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_job_results(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0') grpc_client.get_quantum_result.return_value = result @@ -929,7 +678,7 @@ def test_job_results(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_list_jobs(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) results = [ quantum.QuantumJob(name='projects/proj/programs/prog1/jobs/job1'), @@ -1042,7 +791,7 @@ def test_list_jobs_filters( executed_processor_ids, scheduled_processor_ids, ): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) client = EngineClient() client.list_jobs( project_id='proj', @@ -1071,7 +820,7 @@ async def __aiter__(self): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_list_processors(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) results = [ quantum.QuantumProcessor(name='projects/proj/processor/processor0'), @@ -1088,7 +837,7 @@ def test_list_processors(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_get_processor(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) result = quantum.QuantumProcessor(name='projects/proj/processors/processor0') grpc_client.get_quantum_processor.return_value = result @@ -1102,7 +851,7 @@ def test_get_processor(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_list_calibrations(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) results = [ quantum.QuantumCalibration(name='projects/proj/processor/processor0/calibrations/123456'), @@ -1119,7 +868,7 @@ def test_list_calibrations(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_get_calibration(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) result = quantum.QuantumCalibration( name='projects/proj/processors/processor0/calibrations/123456' @@ -1137,7 +886,7 @@ def test_get_calibration(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_get_current_calibration(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) result = quantum.QuantumCalibration( name='projects/proj/processors/processor0/calibrations/123456' @@ -1155,7 +904,7 @@ def test_get_current_calibration(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_get_current_calibration_does_not_exist(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) grpc_client.get_quantum_calibration.side_effect = exceptions.NotFound('not found') @@ -1170,7 +919,7 @@ def test_get_current_calibration_does_not_exist(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_get_current_calibration_error(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) grpc_client.get_quantum_calibration.side_effect = exceptions.BadRequest('boom') @@ -1181,7 +930,7 @@ def test_get_current_calibration_error(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_api_doesnt_retry_not_found_errors(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) grpc_client.get_quantum_program.side_effect = exceptions.NotFound('not found') client = EngineClient() @@ -1192,7 +941,7 @@ def test_api_doesnt_retry_not_found_errors(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_api_retry_5xx_errors(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) grpc_client.get_quantum_program.side_effect = exceptions.ServiceUnavailable('internal error') client = EngineClient(max_retry_delay_seconds=0.3) @@ -1204,7 +953,7 @@ def test_api_retry_5xx_errors(client_constructor): @mock.patch('duet.sleep', return_value=duet.completed_future(None)) @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_api_retry_times(client_constructor, mock_sleep): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) grpc_client.get_quantum_program.side_effect = exceptions.ServiceUnavailable('internal error') client = EngineClient(max_retry_delay_seconds=0.3) @@ -1218,7 +967,7 @@ def test_api_retry_times(client_constructor, mock_sleep): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_create_reservation(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) start = datetime.datetime.fromtimestamp(1000000000) end = datetime.datetime.fromtimestamp(1000003600) users = ['jeff@google.com'] @@ -1244,7 +993,7 @@ def test_create_reservation(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_cancel_reservation(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) name = 'projects/proj/processors/processor0/reservations/papar-party-44' result = quantum.QuantumReservation( name=name, @@ -1263,7 +1012,7 @@ def test_cancel_reservation(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_delete_reservation(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) name = 'projects/proj/processors/processor0/reservations/papar-party-44' result = quantum.QuantumReservation( name=name, @@ -1282,7 +1031,7 @@ def test_delete_reservation(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_get_reservation(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) name = 'projects/proj/processors/processor0/reservations/papar-party-44' result = quantum.QuantumReservation( name=name, @@ -1301,7 +1050,7 @@ def test_get_reservation(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_get_reservation_not_found(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) name = 'projects/proj/processors/processor0/reservations/papar-party-44' grpc_client.get_quantum_reservation.side_effect = exceptions.NotFound('not found') @@ -1314,7 +1063,7 @@ def test_get_reservation_not_found(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_get_reservation_exception(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) grpc_client.get_quantum_reservation.side_effect = exceptions.BadRequest('boom') client = EngineClient() @@ -1324,7 +1073,7 @@ def test_get_reservation_exception(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_list_reservation(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) name = 'projects/proj/processors/processor0/reservations/papar-party-44' results = [ quantum.QuantumReservation( @@ -1348,7 +1097,7 @@ def test_list_reservation(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_update_reservation(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) name = 'projects/proj/processors/processor0/reservations/papar-party-44' result = quantum.QuantumReservation( name=name, @@ -1381,7 +1130,7 @@ def test_update_reservation(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_update_reservation_remove_all_users(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) name = 'projects/proj/processors/processor0/reservations/papar-party-44' result = quantum.QuantumReservation(name=name, whitelisted_users=[]) grpc_client.update_quantum_reservation.return_value = result @@ -1402,7 +1151,7 @@ def test_update_reservation_remove_all_users(client_constructor): @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) def test_list_time_slots(client_constructor): - grpc_client = _setup_client_mock(client_constructor) + grpc_client = setup_mock_(client_constructor) results = [ quantum.QuantumTimeSlot( processor_name='potofgold', diff --git a/cirq-google/cirq_google/engine/engine_job.py b/cirq-google/cirq_google/engine/engine_job.py index 2e55703ae6a..88b26f3e0b8 100644 --- a/cirq-google/cirq_google/engine/engine_job.py +++ b/cirq-google/cirq_google/engine/engine_job.py @@ -14,7 +14,7 @@ """A helper for jobs that have been created on the Quantum Engine.""" import datetime -from typing import Dict, List, Optional, Sequence, Tuple, TYPE_CHECKING, Union +from typing import Dict, List, Optional, Sequence, Tuple, TYPE_CHECKING import duet from google.protobuf import any_pb2 @@ -69,9 +69,6 @@ def __init__( context: 'engine_base.EngineContext', _job: Optional[quantum.QuantumJob] = None, result_type: ResultType = ResultType.Program, - stream_job_response_future: Optional[ - duet.AwaitableFuture[Union[quantum.QuantumResult, quantum.QuantumJob]] - ] = None, ) -> None: """A job submitted to the engine. @@ -82,10 +79,7 @@ def __init__( context: Engine configuration and context to use. _job: The optional current job state. result_type: What type of results are expected, such as - batched results or the result of a focused calibration. - stream_job_response_future: If set, the job is sent over the Quantum Engine - QuantumRunStream bidirectional stream, and the future is completed when the Engine - responds over the stream. + batched results or the result of a focused calibration. """ self.project_id = project_id self.program_id = program_id @@ -96,7 +90,6 @@ def __init__( self._calibration_results: Optional[Sequence[CalibrationResult]] = None self._batched_results: Optional[Sequence[Sequence[EngineResult]]] = None self.result_type = result_type - self._stream_job_response_future = stream_job_response_future def id(self) -> str: """Returns the job id.""" @@ -286,8 +279,7 @@ async def results_async(self) -> Sequence[EngineResult]: import cirq_google.engine.engine as engine_base if self._results is None: - result_response = await self._await_result_async() - result = result_response.result + result = await self._await_result_async() result_type = result.type_url[len(engine_base.TYPE_PREFIX) :] if ( result_type == 'cirq.google.api.v1.Result' @@ -310,17 +302,6 @@ async def results_async(self) -> Sequence[EngineResult]: return self._results async def _await_result_async(self) -> quantum.QuantumResult: - if self._stream_job_response_future is not None: - response = await self._stream_job_response_future - if isinstance(response, quantum.QuantumResult): - return response - elif isinstance(response, quantum.QuantumJob): - self._job = response - _raise_on_failure(response) - else: - # coverage: ignore - raise ValueError('Internal error: The stream response type is not recognized.') - async with duet.timeout_scope(self.context.timeout): # type: ignore[arg-type] while True: job = await self._refresh_job_async() @@ -331,7 +312,7 @@ async def _await_result_async(self) -> quantum.QuantumResult: response = await self.context.client.get_job_results_async( self.project_id, self.program_id, self.job_id ) - return response + return response.result async def calibration_results_async(self) -> Sequence[CalibrationResult]: """Returns the results of a run_calibration() call. @@ -342,8 +323,7 @@ async def calibration_results_async(self) -> Sequence[CalibrationResult]: import cirq_google.engine.engine as engine_base if self._calibration_results is None: - result_response = await self._await_result_async() - result = result_response.result + result = await self._await_result_async() result_type = result.type_url[len(engine_base.TYPE_PREFIX) :] if result_type != 'cirq.google.api.v2.FocusedCalibrationResult': raise ValueError(f'Did not find calibration results, instead found: {result_type}') diff --git a/cirq-google/cirq_google/engine/engine_job_test.py b/cirq-google/cirq_google/engine/engine_job_test.py index 51898be95fd..82c07398300 100644 --- a/cirq-google/cirq_google/engine/engine_job_test.py +++ b/cirq-google/cirq_google/engine/engine_job_test.py @@ -16,7 +16,6 @@ from unittest import mock import pytest -import duet from google.protobuf import any_pb2, timestamp_pb2 from google.protobuf.text_format import Merge @@ -551,53 +550,6 @@ def test_results_getitem(get_job_results): _ = job[2] -def test_receives_results_via_stream_returns_correct_results(): - qjob = quantum.QuantumJob( - execution_status=quantum.ExecutionStatus(state=quantum.ExecutionStatus.State.SUCCESS), - update_time=UPDATE_TIME, - ) - response_future = duet.AwaitableFuture() - response_future.try_set_result(RESULTS) - - job = cg.EngineJob( - 'a', 'b', 'steve', EngineContext(), _job=qjob, stream_job_response_future=response_future - ) - data = job.results() - - assert len(data) == 2 - assert str(data[0]) == 'q=0110' - assert str(data[1]) == 'q=1010' - - -def test_receives_job_via_stream_raises_and_updates_underlying_job(): - expected_error_code = quantum.ExecutionStatus.Failure.Code.SYSTEM_ERROR - expected_error_message = 'system error' - qjob = quantum.QuantumJob( - execution_status=quantum.ExecutionStatus( - state=quantum.ExecutionStatus.State.SUCCESS, - failure=quantum.ExecutionStatus.Failure( - error_code=expected_error_code, error_message=expected_error_message - ), - ), - update_time=UPDATE_TIME, - ) - response_future = duet.AwaitableFuture() - - job = cg.EngineJob( - 'a', 'b', 'steve', EngineContext(), _job=qjob, stream_job_response_future=response_future - ) - qjob.execution_status.state = quantum.ExecutionStatus.State.FAILURE - response_future.try_set_result(qjob) - - with pytest.raises(RuntimeError): - job.results() - actual_error_code, actual_error_message = job.failure() - - # Checks that the underlying job has been updated by checking failure information. - assert actual_error_code == expected_error_code.name - assert actual_error_message == expected_error_message - - @mock.patch('cirq_google.engine.engine_client.EngineClient.get_job_results_async') def test_batched_results(get_job_results): qjob = quantum.QuantumJob( diff --git a/cirq-google/cirq_google/engine/engine_processor_test.py b/cirq-google/cirq_google/engine/engine_processor_test.py index 83aee9a3c2c..d600cc291a3 100644 --- a/cirq-google/cirq_google/engine/engine_processor_test.py +++ b/cirq-google/cirq_google/engine/engine_processor_test.py @@ -15,7 +15,6 @@ from unittest import mock import datetime -import duet import pytest import freezegun import numpy as np @@ -800,13 +799,22 @@ def test_list_reservations_time_filter_behavior(list_reservations): @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) def test_run_sweep_params(client): + client().create_program_async.return_value = ( + 'prog', + quantum.QuantumProgram(name='projects/proj/programs/prog'), + ) + client().create_job_async.return_value = ( + 'job-id', + quantum.QuantumJob( + name='projects/proj/programs/prog/jobs/job-id', execution_status={'state': 'READY'} + ), + ) client().get_job_async.return_value = quantum.QuantumJob( execution_status={'state': 'SUCCESS'}, update_time=_to_timestamp('2019-07-09T23:39:59Z') ) - expected_result = quantum.QuantumResult(result=util.pack_any(_RESULTS_V2)) - stream_future = duet.AwaitableFuture() - stream_future.try_set_result(expected_result) - client().run_job_over_stream.return_value = stream_future + client().get_job_results_async.return_value = quantum.QuantumResult( + result=util.pack_any(_RESULTS_V2) + ) processor = cg.EngineProcessor('a', 'p', EngineContext()) job = processor.run_sweep( @@ -823,15 +831,18 @@ def test_run_sweep_params(client): assert result.job_finished_time is not None assert results == cirq.read_json(json_text=cirq.to_json(results)) - client().run_job_over_stream.assert_called_once() + client().create_program_async.assert_called_once() + client().create_job_async.assert_called_once() run_context = v2.run_context_pb2.RunContext() - client().run_job_over_stream.call_args[1]['run_context'].Unpack(run_context) + client().create_job_async.call_args[1]['run_context'].Unpack(run_context) sweeps = run_context.parameter_sweeps assert len(sweeps) == 2 for i, v in enumerate([1.0, 2.0]): assert sweeps[i].repetitions == 1 assert sweeps[i].sweep.sweep_function.sweeps[0].single_sweep.points.points == [v] + client().get_job_async.assert_called_once() + client().get_job_results_async.assert_called_once() @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) @@ -932,14 +943,22 @@ def test_run_calibration(client): @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) def test_sampler(client): + client().create_program_async.return_value = ( + 'prog', + quantum.QuantumProgram(name='projects/proj/programs/prog'), + ) + client().create_job_async.return_value = ( + 'job-id', + quantum.QuantumJob( + name='projects/proj/programs/prog/jobs/job-id', execution_status={'state': 'READY'} + ), + ) client().get_job_async.return_value = quantum.QuantumJob( execution_status={'state': 'SUCCESS'}, update_time=_to_timestamp('2019-07-09T23:39:59Z') ) - expected_result = quantum.QuantumResult(result=util.pack_any(_RESULTS_V2)) - stream_future = duet.AwaitableFuture() - stream_future.try_set_result(expected_result) - client().run_job_over_stream.return_value = stream_future - + client().get_job_results_async.return_value = quantum.QuantumResult( + result=util.pack_any(_RESULTS_V2) + ) processor = cg.EngineProcessor('proj', 'mysim', EngineContext()) sampler = processor.get_sampler() results = sampler.run_sweep( @@ -950,7 +969,7 @@ def test_sampler(client): assert results[i].repetitions == 1 assert results[i].params.param_dict == {'a': v} assert results[i].measurements == {'q': np.array([[0]], dtype='uint8')} - assert client().run_job_over_stream.call_args[1]['project_id'] == 'proj' + assert client().create_program_async.call_args[0][0] == 'proj' def test_str(): diff --git a/cirq-google/cirq_google/engine/engine_test.py b/cirq-google/cirq_google/engine/engine_test.py index 3e8258e158f..89d4206e532 100644 --- a/cirq-google/cirq_google/engine/engine_test.py +++ b/cirq-google/cirq_google/engine/engine_test.py @@ -19,7 +19,6 @@ import numpy as np import pytest -import duet from google.protobuf import any_pb2, timestamp_pb2 from google.protobuf.text_format import Merge @@ -349,9 +348,6 @@ def setup_run_circuit_with_result_(client, result): execution_status={'state': 'SUCCESS'}, update_time=_DT ) client().get_job_results_async.return_value = quantum.QuantumResult(result=result) - stream_future = duet.AwaitableFuture() - stream_future.try_set_result(quantum.QuantumResult(result=result)) - client().run_job_over_stream.return_value = stream_future @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) @@ -367,10 +363,10 @@ def test_run_circuit(client): assert result.params.param_dict == {'a': 1} assert result.measurements == {'q': np.array([[0]], dtype='uint8')} client.assert_called_with(service_args={'client_info': 1}, verbose=None) - client().run_job_over_stream.assert_called_once_with( + client().create_program_async.assert_called_once() + client().create_job_async.assert_called_once_with( project_id='proj', program_id='prog', - code=mock.ANY, job_id='job-id', processor_ids=['mysim'], run_context=util.pack_any( @@ -378,11 +374,11 @@ def test_run_circuit(client): parameter_sweeps=[v2.run_context_pb2.ParameterSweep(repetitions=1)] ) ), - program_description=None, - program_labels=None, - job_description=None, - job_labels=None, + description=None, + labels=None, ) + client().get_job_async.assert_called_once_with('proj', 'prog', 'job-id', False) + client().get_job_results_async.assert_called_once_with('proj', 'prog', 'job-id') def test_no_gate_set(): @@ -398,7 +394,17 @@ def test_unsupported_program_type(): @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) def test_run_circuit_failed(client): - failed_job = quantum.QuantumJob( + client().create_program_async.return_value = ( + 'prog', + quantum.QuantumProgram(name='projects/proj/programs/prog'), + ) + client().create_job_async.return_value = ( + 'job-id', + quantum.QuantumJob( + name='projects/proj/programs/prog/jobs/job-id', execution_status={'state': 'READY'} + ), + ) + client().get_job_async.return_value = quantum.QuantumJob( name='projects/proj/programs/prog/jobs/job-id', execution_status={ 'state': 'FAILURE', @@ -406,9 +412,6 @@ def test_run_circuit_failed(client): 'failure': {'error_code': 'SYSTEM_ERROR', 'error_message': 'Not good'}, }, ) - stream_future = duet.AwaitableFuture() - stream_future.try_set_result(failed_job) - client().run_job_over_stream.return_value = stream_future engine = cg.Engine(project_id='proj') with pytest.raises( @@ -421,16 +424,23 @@ def test_run_circuit_failed(client): @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) def test_run_circuit_failed_missing_processor_name(client): - failed_job = quantum.QuantumJob( + client().create_program_async.return_value = ( + 'prog', + quantum.QuantumProgram(name='projects/proj/programs/prog'), + ) + client().create_job_async.return_value = ( + 'job-id', + quantum.QuantumJob( + name='projects/proj/programs/prog/jobs/job-id', execution_status={'state': 'READY'} + ), + ) + client().get_job_async.return_value = quantum.QuantumJob( name='projects/proj/programs/prog/jobs/job-id', execution_status={ 'state': 'FAILURE', 'failure': {'error_code': 'SYSTEM_ERROR', 'error_message': 'Not good'}, }, ) - stream_future = duet.AwaitableFuture() - stream_future.try_set_result(failed_job) - client().run_job_over_stream.return_value = stream_future engine = cg.Engine(project_id='proj') with pytest.raises( @@ -443,12 +453,19 @@ def test_run_circuit_failed_missing_processor_name(client): @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) def test_run_circuit_cancelled(client): - canceled_job = quantum.QuantumJob( + client().create_program_async.return_value = ( + 'prog', + quantum.QuantumProgram(name='projects/proj/programs/prog'), + ) + client().create_job_async.return_value = ( + 'job-id', + quantum.QuantumJob( + name='projects/proj/programs/prog/jobs/job-id', execution_status={'state': 'READY'} + ), + ) + client().get_job_async.return_value = quantum.QuantumJob( name='projects/proj/programs/prog/jobs/job-id', execution_status={'state': 'CANCELLED'} ) - stream_future = duet.AwaitableFuture() - stream_future.try_set_result(canceled_job) - client().run_job_over_stream.return_value = stream_future engine = cg.Engine(project_id='proj') with pytest.raises( @@ -457,6 +474,27 @@ def test_run_circuit_cancelled(client): engine.run(program=_CIRCUIT) +@mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) +def test_run_circuit_timeout(client): + client().create_program_async.return_value = ( + 'prog', + quantum.QuantumProgram(name='projects/proj/programs/prog'), + ) + client().create_job_async.return_value = ( + 'job-id', + quantum.QuantumJob( + name='projects/proj/programs/prog/jobs/job-id', execution_status={'state': 'READY'} + ), + ) + client().get_job_async.return_value = quantum.QuantumJob( + name='projects/proj/programs/prog/jobs/job-id', execution_status={'state': 'RUNNING'} + ) + + engine = cg.Engine(project_id='project-id', timeout=1) + with pytest.raises(TimeoutError): + engine.run(program=_CIRCUIT) + + @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) def test_run_sweep_params(client): setup_run_circuit_with_result_(client, _RESULTS) @@ -472,15 +510,18 @@ def test_run_sweep_params(client): assert results[i].params.param_dict == {'a': v} assert results[i].measurements == {'q': np.array([[0]], dtype='uint8')} - client().run_job_over_stream.assert_called_once() + client().create_program_async.assert_called_once() + client().create_job_async.assert_called_once() run_context = v2.run_context_pb2.RunContext() - client().run_job_over_stream.call_args[1]['run_context'].Unpack(run_context) + client().create_job_async.call_args[1]['run_context'].Unpack(run_context) sweeps = run_context.parameter_sweeps assert len(sweeps) == 2 for i, v in enumerate([1.0, 2.0]): assert sweeps[i].repetitions == 1 assert sweeps[i].sweep.sweep_function.sweeps[0].single_sweep.points.points == [v] + client().get_job_async.assert_called_once() + client().get_job_results_async.assert_called_once() @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) @@ -526,13 +567,16 @@ def test_run_sweep_v2(client): assert results[i].repetitions == 1 assert results[i].params.param_dict == {'a': v} assert results[i].measurements == {'q': np.array([[0]], dtype='uint8')} - client().run_job_over_stream.assert_called_once() + client().create_program_async.assert_called_once() + client().create_job_async.assert_called_once() run_context = v2.run_context_pb2.RunContext() - client().run_job_over_stream.call_args[1]['run_context'].Unpack(run_context) + client().create_job_async.call_args[1]['run_context'].Unpack(run_context) sweeps = run_context.parameter_sweeps assert len(sweeps) == 1 assert sweeps[0].repetitions == 1 assert sweeps[0].sweep.single_sweep.points.points == [1, 2] + client().get_job_async.assert_called_once() + client().get_job_results_async.assert_called_once() @mock.patch('cirq_google.engine.engine_client.EngineClient', autospec=True) @@ -688,7 +732,7 @@ def test_bad_program_proto(): engine = cg.Engine( project_id='project-id', proto_version=cg.engine.engine.ProtoVersion.UNDEFINED ) - with pytest.raises(ValueError, match='invalid (program|run context) proto version'): + with pytest.raises(ValueError, match='invalid program proto version'): engine.run_sweep(program=_CIRCUIT) with pytest.raises(ValueError, match='invalid program proto version'): engine.create_program(_CIRCUIT) @@ -773,7 +817,7 @@ def test_sampler(client): assert results[i].repetitions == 1 assert results[i].params.param_dict == {'a': v} assert results[i].measurements == {'q': np.array([[0]], dtype='uint8')} - assert client().run_job_over_stream.call_args[1]['project_id'] == 'proj' + assert client().create_program_async.call_args[0][0] == 'proj' with cirq.testing.assert_deprecated('sampler', deadline='1.0'): _ = engine.sampler(processor_id='tmp')