diff --git a/qiskit_experiments/database_service/db_experiment_data.py b/qiskit_experiments/database_service/db_experiment_data.py index a30e81f989..92b3f4eb7c 100644 --- a/qiskit_experiments/database_service/db_experiment_data.py +++ b/qiskit_experiments/database_service/db_experiment_data.py @@ -218,25 +218,43 @@ def add_data( "Not all post-processing has finished. Adding new data " "may create unexpected analysis results." ) + if not isinstance(data, list): + data = [data] + + # Extract job data and directly add non-job data + job_data = [] + with self._data.lock: + for datum in data: + if isinstance(datum, (Job, BaseJob)): + job_data.append(datum) + elif isinstance(datum, dict): + self._add_single_data(datum) + elif isinstance(datum, Result): + self._add_result_data(datum) + else: + raise TypeError(f"Invalid data type {type(datum)}.") - if isinstance(data, (Job, BaseJob)): - if self.backend and self.backend.name() != data.backend().name(): + # Add futures for job data + for job in job_data: + if self.backend and self.backend.name() != job.backend().name(): LOG.warning( "Adding a job from a backend (%s) that is different " "than the current backend (%s). " "The new backend will be used, but " "service is not changed if one already exists.", - data.backend(), + job.backend(), self.backend, ) - self._backend = data.backend() + self._backend = job.backend() if not self._service: self._set_service_from_backend(self._backend) - self._jobs[data.job_id()] = data + self._jobs[job.job_id()] = job + + if job_data: job_kwargs = { - "job": data, - "job_done_callback": post_processing_callback, + "jobs": job_data, + "jobs_done_callback": post_processing_callback, "timeout": timeout, **kwargs, } @@ -246,57 +264,47 @@ def add_data( self._executor.submit(self._wait_for_job, **job_kwargs), ) ) - if self.auto_save: - self.save_metadata() - return - - if isinstance(data, dict): - self._add_single_data(data) - elif isinstance(data, Result): - self._add_result_data(data) - elif isinstance(data, list): - for dat in data: - self.add_data(dat) - else: - raise TypeError(f"Invalid data type {type(data)}.") - - if post_processing_callback is not None: + elif post_processing_callback: post_processing_callback(self, **kwargs) + if self.auto_save: + self.save_metadata() + def _wait_for_job( self, - job: Union[Job, BaseJob], - job_done_callback: Optional[Callable] = None, + jobs: List[Union[Job, BaseJob]], + jobs_done_callback: Optional[Callable] = None, timeout: Optional[float] = None, **kwargs: Any, ) -> None: """Wait for a job to finish. Args: - job: Job to wait for. - job_done_callback: Callback function to invoke when job finishes. + jobs: Jobs to wait for. + jobs_done_callback: Callback function to invoke when jobs finish. timeout: Timeout waiting for job to finish. **kwargs: Keyword arguments to be passed to the callback function. Raises: Exception: If post processing failed. """ - LOG.debug("Waiting for job %s to finish.", job.job_id()) - try: + for job in jobs: + LOG.debug("Waiting for jobs %s to finish.", job.job_id()) try: - job_result = job.result(timeout=timeout) - except TypeError: # Not all jobs take timeout. - job_result = job.result() - with self._data.lock: - # Hold the lock so we add the block of results together. - self._add_result_data(job_result) - except Exception: # pylint: disable=broad-except - LOG.warning("Job %s failed:\n%s", job.job_id(), traceback.format_exc()) - raise + try: + job_result = job.result(timeout=timeout) + except TypeError: # Not all jobs take timeout. + job_result = job.result() + with self._data.lock: + # Hold the lock so we add the block of results together. + self._add_result_data(job_result) + except Exception: # pylint: disable=broad-except + LOG.warning("Job %s failed:\n%s", job.job_id(), traceback.format_exc()) + raise try: - if job_done_callback: - job_done_callback(self, **kwargs) + if jobs_done_callback: + jobs_done_callback(self, **kwargs) except Exception: # pylint: disable=broad-except LOG.warning("Post processing function failed:\n%s", traceback.format_exc()) raise @@ -811,12 +819,13 @@ def load(cls, experiment_id: str, service: DatabaseServiceV1) -> "DbExperimentDa def cancel_jobs(self) -> None: """Cancel any running jobs.""" for kwargs, fut in self._job_futures.copy(): - job = kwargs["job"] - if not fut.done() and job.status() not in JOB_FINAL_STATES: - try: - job.cancel() - except Exception as err: # pylint: disable=broad-except - LOG.info("Unable to cancel job %s: %s", job.job_id(), err) + if not fut.done(): + for job in kwargs["jobs"]: + if job.status() not in JOB_FINAL_STATES: + try: + job.cancel() + except Exception as err: # pylint: disable=broad-except + LOG.info("Unable to cancel job %s: %s", job.job_id(), err) def block_for_results(self, timeout: Optional[float] = None) -> "DbExperimentDataV1": """Block until all pending jobs and their post processing finish. @@ -828,8 +837,8 @@ def block_for_results(self, timeout: Optional[float] = None) -> "DbExperimentDat The experiment data with finished jobs and post-processing. """ for kwargs, fut in self._job_futures.copy(): - job = kwargs["job"] - LOG.info("Waiting for job %s and its post processing to finish.", job.job_id()) + job_ids = [job.job_id() for job in kwargs["jobs"]] + LOG.info("Waiting for jobs %s and its post processing to finish.", job_ids) with contextlib.suppress(Exception): fut.result(timeout) return self @@ -862,21 +871,23 @@ def status(self) -> str: with self._job_futures.lock: for idx, item in enumerate(self._job_futures): kwargs, fut = item - job = kwargs["job"] - job_status = job.status() - statuses.add(job_status) - if job_status == JobStatus.ERROR: - job_err = "." - if hasattr(job, "error_message"): - job_err = ": " + job.error_message() - self._errors.append(f"Job {job.job_id()} failed{job_err}") + jobs = kwargs["jobs"] + for job in jobs: + job_status = job.status() + statuses.add(job_status) + if job_status == JobStatus.ERROR: + job_err = "." + if hasattr(job, "error_message"): + job_err = ": " + job.error_message() + self._errors.append(f"Job {job.job_id()} failed{job_err}") if fut.done(): self._job_futures[idx] = None ex = fut.exception() if ex: + job_ids = [job.job_id() for job in jobs] self._errors.append( - f"Post processing for job {job.job_id()} failed: \n" + f"Post processing for job {job_ids} failed: \n" + "".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) ) statuses.add(JobStatus.ERROR) @@ -946,12 +957,12 @@ def _copy_metadata( # inherits an abstract class. extra_kwargs = {} for key, val in orig_kwargs.items(): - if key not in ["job", "job_done_callback", "timeout"]: + if key not in ["jobs", "jobs_done_callback", "timeout"]: extra_kwargs[key] = val new_instance.add_data( - data=orig_kwargs["job"], - post_processing_callback=orig_kwargs["job_done_callback"], + data=orig_kwargs["jobs"], + post_processing_callback=orig_kwargs["jobs_done_callback"], timeout=orig_kwargs["timeout"], **extra_kwargs, ) diff --git a/qiskit_experiments/framework/base_experiment.py b/qiskit_experiments/framework/base_experiment.py index e6161ba1f3..04ac321fcf 100644 --- a/qiskit_experiments/framework/base_experiment.py +++ b/qiskit_experiments/framework/base_experiment.py @@ -128,22 +128,36 @@ def run( circuits = transpile(self.circuits(backend), backend, **transpile_opts) self._postprocess_transpiled_circuits(circuits, backend, **run_options) - if isinstance(backend, LegacyBackend): - qobj = assemble(circuits, backend=backend, **run_opts) - job = backend.run(qobj) + # Run experiment jobs + max_experiments = getattr(backend.configuration(), "max_experiments", None) + if max_experiments and len(circuits) > max_experiments: + # Split jobs for backends that have a maximum job size + job_circuits = [ + circuits[i : i + max_experiments] for i in range(0, len(circuits), max_experiments) + ] else: - job = backend.run(circuits, **run_opts) - - # Add Job to ExperimentData and add analysis for post processing. - run_analysis = None + # Run as single job + job_circuits = [circuits] + + # Run jobs + jobs = [] + for circs in job_circuits: + if isinstance(backend, LegacyBackend): + qobj = assemble(circs, backend=backend, **run_opts) + job = backend.run(qobj) + else: + job = backend.run(circs, **run_opts) + jobs.append(job) # Add experiment option metadata - self._add_job_metadata(experiment_data, job, **run_opts) + self._add_job_metadata(experiment_data, jobs, **run_opts) + # Add Job to ExperimentData and add analysis for post processing. + run_analysis = None if analysis and self.__analysis_class__ is not None: run_analysis = self.run_analysis - experiment_data.add_data(job, post_processing_callback=run_analysis) + experiment_data.add_data(jobs, post_processing_callback=run_analysis) # Return the ExperimentData future return experiment_data @@ -167,11 +181,11 @@ def _initialize_experiment_data( return experiment_data._copy_metadata() - def run_analysis(self, experiment_data, **options) -> ExperimentData: + def run_analysis(self, experiment_data: ExperimentData, **options) -> ExperimentData: """Run analysis and update ExperimentData with analysis result. Args: - experiment_data (ExperimentData): the experiment data to analyze. + experiment_data: the experiment data to analyze. options: additional analysis options. Any values set here will override the value from :meth:`analysis_options` for the current run. @@ -365,16 +379,16 @@ def _additional_metadata(self) -> Dict[str, any]: """ return {} - def _add_job_metadata(self, experiment_data: ExperimentData, job: BaseJob, **run_options): + def _add_job_metadata(self, experiment_data: ExperimentData, jobs: BaseJob, **run_options): """Add runtime job metadata to ExperimentData. Args: experiment_data: the experiment data container. - job: the job object. + jobs: the job objects. run_options: backend run options for the job. """ metadata = { - "job_id": job.job_id(), + "job_ids": [job.job_id() for job in jobs], "experiment_options": copy.copy(self.experiment_options.__dict__), "transpile_options": copy.copy(self.transpile_options.__dict__), "analysis_options": copy.copy(self.analysis_options.__dict__), diff --git a/qiskit_experiments/framework/composite/composite_experiment.py b/qiskit_experiments/framework/composite/composite_experiment.py index b58a642fe4..5c41d0e0f2 100644 --- a/qiskit_experiments/framework/composite/composite_experiment.py +++ b/qiskit_experiments/framework/composite/composite_experiment.py @@ -64,9 +64,9 @@ def component_analysis(self, index): """Return the component experiment Analysis object""" return self.component_experiment(index).analysis() - def _add_job_metadata(self, experiment_data, job, **run_options): + def _add_job_metadata(self, experiment_data, jobs, **run_options): # Add composite metadata - super()._add_job_metadata(experiment_data, job, **run_options) + super()._add_job_metadata(experiment_data, jobs, **run_options) # Add sub-experiment options for i in range(self.num_experiments): @@ -83,7 +83,7 @@ def _add_job_metadata(self, experiment_data, job, **run_options): " are overridden by composite experiment options." ) sub_data = experiment_data.component_experiment_data(i) - sub_exp._add_job_metadata(sub_data, job, **run_options) + sub_exp._add_job_metadata(sub_data, jobs, **run_options) def _postprocess_transpiled_circuits(self, circuits, backend, **run_options): for expr in self._experiments: diff --git a/qiskit_experiments/framework/composite/composite_experiment_data.py b/qiskit_experiments/framework/composite/composite_experiment_data.py index 8667e30e7c..20e35f3752 100644 --- a/qiskit_experiments/framework/composite/composite_experiment_data.py +++ b/qiskit_experiments/framework/composite/composite_experiment_data.py @@ -48,7 +48,7 @@ def __init__( # Initialize sub experiments self._components = [ - expr.__experiment_data__(expr, backend, job_ids) for expr in experiment._experiments + expr.__experiment_data__(expr, backend) for expr in experiment._experiments ] def __str__(self): @@ -102,4 +102,4 @@ def _add_single_data(self, data): sub_data["counts"] = marginal_counts(data["counts"], composite_clbits[i]) else: sub_data["counts"] = data["counts"] - self._components[index].add_data(sub_data) + self._components[index]._add_single_data(sub_data) diff --git a/qiskit_experiments/test/utils.py b/qiskit_experiments/test/utils.py index 297e737d86..a3700083cd 100644 --- a/qiskit_experiments/test/utils.py +++ b/qiskit_experiments/test/utils.py @@ -29,7 +29,10 @@ class FakeJob(Job): def __init__(self, backend: Union[Backend, BaseBackend], result: Optional[Result] = None): """Initialize FakeJob.""" - job_id = uuid.uuid4().hex + if result: + job_id = result.job_id + else: + job_id = uuid.uuid4().hex super().__init__(backend, job_id) self._result = result diff --git a/releasenotes/notes/job-splitting-775dc9aed9cf20c2.yaml b/releasenotes/notes/job-splitting-775dc9aed9cf20c2.yaml new file mode 100644 index 0000000000..cac2e8eada --- /dev/null +++ b/releasenotes/notes/job-splitting-775dc9aed9cf20c2.yaml @@ -0,0 +1,9 @@ +--- +features: + - | + Adds automatic job splitting to + :class:`~qiskit_experiments.framework.BaseExperiment` for execution of + experiments with a larger number of circuits than can be run in a single + job on the target backend. This enables running large experiments on + legacy and non-IBM backends that don't handle job splitting + automatically. diff --git a/test/fake_backend.py b/test/fake_backend.py index 7493ccac88..53dfb32be3 100644 --- a/test/fake_backend.py +++ b/test/fake_backend.py @@ -11,6 +11,7 @@ # that they have been altered from the originals. """Fake backend class for tests.""" +import uuid from qiskit.providers.backend import BackendV1 from qiskit.providers.models import QasmBackendConfiguration @@ -25,7 +26,7 @@ class FakeBackend(BackendV1): Fake backend for test purposes only. """ - def __init__(self): + def __init__(self, max_experiments=None): configuration = QasmBackendConfiguration( backend_name="dummy_backend", backend_version="0", @@ -38,6 +39,7 @@ def __init__(self): open_pulse=False, memory=False, max_shots=int(1e6), + max_experiments=max_experiments, coupling_map=None, ) super().__init__(configuration) @@ -50,8 +52,8 @@ def run(self, run_input, **options): result = { "backend_name": "Dummmy backend", "backend_version": "0", - "qobj_id": 0, - "job_id": 0, + "qobj_id": uuid.uuid4().hex, + "job_id": uuid.uuid4().hex, "success": True, "results": [], } diff --git a/test/test_framework.py b/test/test_framework.py new file mode 100644 index 0000000000..51085ecbd4 --- /dev/null +++ b/test/test_framework.py @@ -0,0 +1,55 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Tests for base experiment framework.""" + +from test.fake_backend import FakeBackend +from test.fake_experiment import FakeExperiment + +import ddt + +from qiskit import QuantumCircuit +from qiskit.test import QiskitTestCase + + +@ddt.ddt +class TestFramework(QiskitTestCase): + """Test Base Experiment""" + + @ddt.data(None, 1, 2, 3) + def test_job_splitting(self, max_experiments): + """Test job splitting""" + + num_circuits = 10 + backend = FakeBackend(max_experiments=max_experiments) + + class Experiment(FakeExperiment): + """Fake Experiment to test job splitting""" + + def circuits(self, backend=None): + """Generate fake circuits""" + qc = QuantumCircuit(1) + qc.measure_all() + return num_circuits * [qc] + + exp = Experiment(0) + expdata = exp.run(backend) + job_ids = expdata.job_ids + + # Comptue expected number of jobs + if max_experiments is None: + num_jobs = 1 + else: + num_jobs = num_circuits // max_experiments + if num_circuits % max_experiments: + num_jobs += 1 + self.assertEqual(len(job_ids), num_jobs)