Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async methods to AbstractEngine and AbstractJob #5555

Merged
merged 15 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
59 changes: 48 additions & 11 deletions cirq-core/cirq/work/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
# limitations under the License.
"""Abstract base class for things sampling quantum circuits."""

import abc
import collections
from typing import Dict, FrozenSet, List, Optional, Sequence, Tuple, TYPE_CHECKING, Union

import duet
import pandas as pd

from cirq import ops, protocols, study
from cirq import ops, protocols, study, value
from cirq.work.observable_measurement import (
measure_observables,
RepetitionsStoppingCriteria,
Expand All @@ -31,7 +31,7 @@
import cirq


class Sampler(metaclass=abc.ABCMeta):
class Sampler(metaclass=value.ABCMetaImplementAnyOneOf):
"""Something capable of sampling quantum circuits. Simulator or hardware."""

def run(
Expand Down Expand Up @@ -177,7 +177,19 @@ def sample(

return pd.concat(results)

@abc.abstractmethod
def _run_sweep_impl(
self, program: 'cirq.AbstractCircuit', params: 'cirq.Sweepable', repetitions: int = 1
) -> Sequence['cirq.Result']:
"""Implements run_sweep using run_sweep_async"""
return duet.run(self.run_sweep_async, program, params, repetitions)

async def _run_sweep_async_impl(
self, program: 'cirq.AbstractCircuit', params: 'cirq.Sweepable', repetitions: int = 1
) -> Sequence['cirq.Result']:
"""Implements run_sweep_async using run_sweep"""
return self.run_sweep(program, params=params, repetitions=repetitions)

@value.alternative(requires='run_sweep_async', implementation=_run_sweep_impl)
def run_sweep(
self, program: 'cirq.AbstractCircuit', params: 'cirq.Sweepable', repetitions: int = 1
) -> Sequence['cirq.Result']:
Expand All @@ -200,6 +212,7 @@ def run_sweep(
Result list for this run; one for each possible parameter resolver.
"""

@value.alternative(requires='run_sweep', implementation=_run_sweep_async_impl)
async def run_sweep_async(
self, program: 'cirq.AbstractCircuit', params: 'cirq.Sweepable', repetitions: int = 1
) -> Sequence['cirq.Result']:
Expand All @@ -217,13 +230,12 @@ async def run_sweep_async(
Returns:
Result list for this run; one for each possible parameter resolver.
"""
return self.run_sweep(program, params=params, repetitions=repetitions)

def run_batch(
self,
programs: Sequence['cirq.AbstractCircuit'],
params_list: Optional[List['cirq.Sweepable']] = None,
repetitions: Union[int, List[int]] = 1,
params_list: Optional[Sequence['cirq.Sweepable']] = None,
repetitions: Union[int, Sequence[int]] = 1,
) -> Sequence[Sequence['cirq.Result']]:
"""Runs the supplied circuits.

Expand Down Expand Up @@ -263,6 +275,34 @@ def run_batch(
ValueError: If length of `programs` is not equal to the length
of `params_list` or the length of `repetitions`.
"""
params_list, repetitions = self._normalize_batch_args(programs, params_list, repetitions)
return [
self.run_sweep(circuit, params=params, repetitions=repetitions)
for circuit, params, repetitions in zip(programs, params_list, repetitions)
]

async def run_batch_async(
self,
programs: Sequence['cirq.AbstractCircuit'],
params_list: Optional[Sequence['cirq.Sweepable']] = None,
repetitions: Union[int, Sequence[int]] = 1,
) -> Sequence[Sequence['cirq.Result']]:
"""Runs the supplied circuits.

This is an asynchronous version of `run_batch`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document args / return value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea here was just to refer to run_batch rather than repeat the same docs here. I've added a note to make this more explicit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems OK since these are on the same page in the docs site. Would be nice if we could link, but I'm not 100% certain whether cirq.Sampler.run_batch would auto-link properly with the tfdocs tools.

"""
params_list, repetitions = self._normalize_batch_args(programs, params_list, repetitions)
return [
await self.run_sweep_async(circuit, params=params, repetitions=repetitions)
for circuit, params, repetitions in zip(programs, params_list, repetitions)
]

def _normalize_batch_args(
self,
programs: Sequence['cirq.AbstractCircuit'],
params_list: Optional[Sequence['cirq.Sweepable']] = None,
repetitions: Union[int, Sequence[int]] = 1,
) -> Tuple[Sequence['cirq.Sweepable'], Sequence[int]]:
if params_list is None:
params_list = [None] * len(programs)
if len(programs) != len(params_list):
Expand All @@ -277,10 +317,7 @@ def run_batch(
'len(programs) and len(repetitions) must match. '
f'Got {len(programs)} and {len(repetitions)}.'
)
return [
self.run_sweep(circuit, params=params, repetitions=repetitions)
for circuit, params, repetitions in zip(programs, params_list, repetitions)
]
return params_list, repetitions

def sample_expectation_values(
self,
Expand Down
36 changes: 36 additions & 0 deletions cirq-core/cirq/work/sampler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,42 @@ def run_sweep(self, program, params, repetitions: int = 1):
await FailingSampler().run_sweep_async(cirq.Circuit(), repetitions=1, params=None)


def test_run_sweep_impl():
"""Test run_sweep implemented in terms of run_sweep_async."""

class AsyncSampler(cirq.Sampler):
async def run_sweep_async(self, program, params, repetitions: int = 1):
await duet.sleep(0.001)
return cirq.Simulator().run_sweep(program, params, repetitions)

results = AsyncSampler().run_sweep(
cirq.Circuit(cirq.measure(cirq.GridQubit(0, 0), key='m')),
cirq.Linspace('foo', 0, 1, 10),
repetitions=10,
)
assert len(results) == 10
for result in results:
np.testing.assert_equal(result.records['m'], np.zeros((10, 1, 1)))


@duet.sync
async def test_run_sweep_async_impl():
"""Test run_sweep_async implemented in terms of run_sweep."""

class SyncSampler(cirq.Sampler):
def run_sweep(self, program, params, repetitions: int = 1):
return cirq.Simulator().run_sweep(program, params, repetitions)

results = await SyncSampler().run_sweep_async(
cirq.Circuit(cirq.measure(cirq.GridQubit(0, 0), key='m')),
cirq.Linspace('foo', 0, 1, 10),
repetitions=10,
)
assert len(results) == 10
for result in results:
np.testing.assert_equal(result.records['m'], np.zeros((10, 1, 1)))


def test_sampler_sample_multiple_params():
a, b = cirq.LineQubit.range(2)
s = sympy.Symbol('s')
Expand Down
2 changes: 1 addition & 1 deletion cirq-core/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# functools.cached_property was introduced in python 3.8
backports.cached_property~=1.0.1; python_version < '3.8'

duet~=0.2.6
duet~=0.2.7
matplotlib~=3.0
networkx~=2.4
numpy~=1.16
Expand Down
14 changes: 11 additions & 3 deletions cirq-google/cirq_google/engine/abstract_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import abc
from typing import Dict, Iterator, List, Optional, overload, Sequence, Tuple, TYPE_CHECKING

import duet

import cirq
from cirq_google.cloud import quantum
from cirq_google.engine.engine_result import EngineResult
Expand Down Expand Up @@ -162,25 +164,31 @@ def delete(self) -> Optional[bool]:
"""Deletes the job and result, if any."""

@abc.abstractmethod
def batched_results(self) -> Sequence[Sequence[EngineResult]]:
async def batched_results_async(self) -> Sequence[Sequence[EngineResult]]:
"""Returns the job results, blocking until the job is complete.

This method is intended for batched jobs. Instead of flattening
results into a single list, this will return a List[Result]
for each circuit in the batch.
"""

batched_results = duet.sync(batched_results_async)
95-martin-orion marked this conversation as resolved.
Show resolved Hide resolved

@abc.abstractmethod
def results(self) -> Sequence[EngineResult]:
async def results_async(self) -> Sequence[EngineResult]:
"""Returns the job results, blocking until the job is complete."""

results = duet.sync(results_async)

@abc.abstractmethod
def calibration_results(self) -> Sequence['calibration_result.CalibrationResult']:
async def calibration_results_async(self) -> Sequence['calibration_result.CalibrationResult']:
"""Returns the results of a run_calibration() call.

This function will fail if any other type of results were returned.
"""

calibration_results = duet.sync(calibration_results_async)

def __iter__(self) -> Iterator[cirq.Result]:
yield from self.results()

Expand Down
10 changes: 4 additions & 6 deletions cirq-google/cirq_google/engine/abstract_job_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,13 @@ def cancel(self) -> None:
def delete(self) -> None:
pass

def batched_results(self):
async def batched_results_async(self):
pass

def results(self):
return list(
cirq.ResultDict(params={}, measurements={'a': np.asarray([t])}) for t in range(5)
)
async def results_async(self):
return [cirq.ResultDict(params={}, measurements={'a': np.asarray([t])}) for t in range(5)]

def calibration_results(self):
async def calibration_results_async(self):
pass


Expand Down
9 changes: 3 additions & 6 deletions cirq-google/cirq_google/engine/abstract_local_engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,13 @@ def health(self, *args, **kwargs):
def list_calibrations(self, *args, **kwargs):
pass

def run(self, *args, **kwargs):
async def run_batch_async(self, *args, **kwargs):
pass

def run_batch(self, *args, **kwargs):
async def run_calibration_async(self, *args, **kwargs):
pass

def run_calibration(self, *args, **kwargs):
pass

def run_sweep(self, *args, **kwargs):
async def run_sweep_async(self, *args, **kwargs):
pass

def get_sampler(self, *args, **kwargs):
Expand Down
6 changes: 3 additions & 3 deletions cirq-google/cirq_google/engine/abstract_local_job_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ def cancel(self) -> None:
def delete(self) -> None:
pass

def batched_results(self) -> Sequence[Sequence[EngineResult]]:
async def batched_results_async(self) -> Sequence[Sequence[EngineResult]]:
return [] # coverage: ignore

def results(self) -> Sequence[EngineResult]:
async def results_async(self) -> Sequence[EngineResult]:
return [] # coverage: ignore

def calibration_results(self) -> Sequence[CalibrationResult]:
async def calibration_results_async(self) -> Sequence[CalibrationResult]:
return [] # coverage: ignore


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,13 @@ def health(self, *args, **kwargs):
def list_calibrations(self, *args, **kwargs):
pass

def run(self, *args, **kwargs):
async def run_batch_async(self, *args, **kwargs):
pass

def run_batch(self, *args, **kwargs):
async def run_calibration_async(self, *args, **kwargs):
pass

def run_calibration(self, *args, **kwargs):
pass

def run_sweep(self, *args, **kwargs):
async def run_sweep_async(self, *args, **kwargs):
pass

def get_sampler(self, *args, **kwargs):
Expand Down
32 changes: 26 additions & 6 deletions cirq-google/cirq_google/engine/abstract_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import abc
import datetime

from typing import Dict, Iterable, List, Optional, Sequence, TYPE_CHECKING, Union

import cirq
import duet

import cirq
from cirq_google.api import v2
from cirq_google.cloud import quantum
from cirq_google.engine import calibration, util
Expand Down Expand Up @@ -53,7 +53,7 @@ class AbstractProcessor(abc.ABC):
This is an abstract class. Inheritors should implement abstract methods.
"""

def run(
async def run_async(
self,
program: cirq.Circuit,
program_id: Optional[str] = None,
Expand Down Expand Up @@ -88,9 +88,23 @@ def run(
Returns:
A single Result for this run.
"""
job = await self.run_sweep_async(
program=program,
program_id=program_id,
job_id=job_id,
params=[param_resolver or cirq.ParamResolver({})],
repetitions=repetitions,
program_description=program_description,
program_labels=program_labels,
job_description=job_description,
job_labels=job_labels,
)
return job.results()[0]

run = duet.sync(run_async)
95-martin-orion marked this conversation as resolved.
Show resolved Hide resolved

@abc.abstractmethod
def run_sweep(
async def run_sweep_async(
self,
program: cirq.AbstractCircuit,
program_id: Optional[str] = None,
Expand Down Expand Up @@ -129,8 +143,10 @@ def run_sweep(
`cirq.Result`, one for each parameter sweep.
"""

run_sweep = duet.sync(run_sweep_async)

@abc.abstractmethod
def run_batch(
async def run_batch_async(
self,
programs: Sequence[cirq.AbstractCircuit],
program_id: Optional[str] = None,
Expand Down Expand Up @@ -180,8 +196,10 @@ def run_batch(
parameter sweep.
"""

run_batch = duet.sync(run_batch_async)

@abc.abstractmethod
def run_calibration(
async def run_calibration_async(
self,
layers: List['cg.CalibrationLayer'],
program_id: Optional[str] = None,
Expand Down Expand Up @@ -223,6 +241,8 @@ def run_calibration(
calibration_results().
"""

run_calibration = duet.sync(run_calibration_async)

@abc.abstractmethod
def get_sampler(self) -> 'cg.ProcessorSampler':
"""Returns a sampler backed by the processor."""
Expand Down