Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 70 additions & 59 deletions qiskit_experiments/database_service/db_experiment_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,25 +218,43 @@ def add_data(
"Not all post-processing has finished. Adding new data "
"may create unexpected analysis results."
)
if not isinstance(data, list):
data = [data]

# Extract job data and directly add non-job data
job_data = []
with self._data.lock:
for datum in data:
if isinstance(datum, (Job, BaseJob)):
job_data.append(datum)
elif isinstance(datum, dict):
self._add_single_data(datum)
elif isinstance(datum, Result):
self._add_result_data(datum)
else:
raise TypeError(f"Invalid data type {type(datum)}.")

if isinstance(data, (Job, BaseJob)):
if self.backend and self.backend.name() != data.backend().name():
# Add futures for job data
for job in job_data:
if self.backend and self.backend.name() != job.backend().name():
LOG.warning(
"Adding a job from a backend (%s) that is different "
"than the current backend (%s). "
"The new backend will be used, but "
"service is not changed if one already exists.",
data.backend(),
job.backend(),
self.backend,
)
self._backend = data.backend()
self._backend = job.backend()
if not self._service:
self._set_service_from_backend(self._backend)

self._jobs[data.job_id()] = data
self._jobs[job.job_id()] = job

if job_data:
job_kwargs = {
"job": data,
"job_done_callback": post_processing_callback,
"jobs": job_data,
"jobs_done_callback": post_processing_callback,
"timeout": timeout,
**kwargs,
}
Expand All @@ -246,57 +264,47 @@ def add_data(
self._executor.submit(self._wait_for_job, **job_kwargs),
)
)
if self.auto_save:
self.save_metadata()
return

if isinstance(data, dict):
self._add_single_data(data)
elif isinstance(data, Result):
self._add_result_data(data)
elif isinstance(data, list):
for dat in data:
self.add_data(dat)
else:
raise TypeError(f"Invalid data type {type(data)}.")

if post_processing_callback is not None:
elif post_processing_callback:
post_processing_callback(self, **kwargs)

if self.auto_save:
self.save_metadata()

def _wait_for_job(
self,
job: Union[Job, BaseJob],
job_done_callback: Optional[Callable] = None,
jobs: List[Union[Job, BaseJob]],
jobs_done_callback: Optional[Callable] = None,
timeout: Optional[float] = None,
**kwargs: Any,
) -> None:
"""Wait for a job to finish.

Args:
job: Job to wait for.
job_done_callback: Callback function to invoke when job finishes.
jobs: Jobs to wait for.
jobs_done_callback: Callback function to invoke when jobs finish.
timeout: Timeout waiting for job to finish.
**kwargs: Keyword arguments to be passed to the callback function.

Raises:
Exception: If post processing failed.
"""
LOG.debug("Waiting for job %s to finish.", job.job_id())
try:
for job in jobs:
LOG.debug("Waiting for jobs %s to finish.", job.job_id())
try:
job_result = job.result(timeout=timeout)
except TypeError: # Not all jobs take timeout.
job_result = job.result()
with self._data.lock:
# Hold the lock so we add the block of results together.
self._add_result_data(job_result)
except Exception: # pylint: disable=broad-except
LOG.warning("Job %s failed:\n%s", job.job_id(), traceback.format_exc())
raise
try:
job_result = job.result(timeout=timeout)
except TypeError: # Not all jobs take timeout.
job_result = job.result()
with self._data.lock:
# Hold the lock so we add the block of results together.
self._add_result_data(job_result)
except Exception: # pylint: disable=broad-except
LOG.warning("Job %s failed:\n%s", job.job_id(), traceback.format_exc())
raise

try:
if job_done_callback:
job_done_callback(self, **kwargs)
if jobs_done_callback:
jobs_done_callback(self, **kwargs)
except Exception: # pylint: disable=broad-except
LOG.warning("Post processing function failed:\n%s", traceback.format_exc())
raise
Expand Down Expand Up @@ -811,12 +819,13 @@ def load(cls, experiment_id: str, service: DatabaseServiceV1) -> "DbExperimentDa
def cancel_jobs(self) -> None:
"""Cancel any running jobs."""
for kwargs, fut in self._job_futures.copy():
job = kwargs["job"]
if not fut.done() and job.status() not in JOB_FINAL_STATES:
try:
job.cancel()
except Exception as err: # pylint: disable=broad-except
LOG.info("Unable to cancel job %s: %s", job.job_id(), err)
if not fut.done():
for job in kwargs["jobs"]:
if job.status() not in JOB_FINAL_STATES:
try:
job.cancel()
except Exception as err: # pylint: disable=broad-except
LOG.info("Unable to cancel job %s: %s", job.job_id(), err)

def block_for_results(self, timeout: Optional[float] = None) -> "DbExperimentDataV1":
"""Block until all pending jobs and their post processing finish.
Expand All @@ -828,8 +837,8 @@ def block_for_results(self, timeout: Optional[float] = None) -> "DbExperimentDat
The experiment data with finished jobs and post-processing.
"""
for kwargs, fut in self._job_futures.copy():
job = kwargs["job"]
LOG.info("Waiting for job %s and its post processing to finish.", job.job_id())
job_ids = [job.job_id() for job in kwargs["jobs"]]
LOG.info("Waiting for jobs %s and its post processing to finish.", job_ids)
with contextlib.suppress(Exception):
fut.result(timeout)
return self
Expand Down Expand Up @@ -862,21 +871,23 @@ def status(self) -> str:
with self._job_futures.lock:
for idx, item in enumerate(self._job_futures):
kwargs, fut = item
job = kwargs["job"]
job_status = job.status()
statuses.add(job_status)
if job_status == JobStatus.ERROR:
job_err = "."
if hasattr(job, "error_message"):
job_err = ": " + job.error_message()
self._errors.append(f"Job {job.job_id()} failed{job_err}")
jobs = kwargs["jobs"]
for job in jobs:
job_status = job.status()
statuses.add(job_status)
if job_status == JobStatus.ERROR:
job_err = "."
if hasattr(job, "error_message"):
job_err = ": " + job.error_message()
self._errors.append(f"Job {job.job_id()} failed{job_err}")

if fut.done():
self._job_futures[idx] = None
ex = fut.exception()
if ex:
job_ids = [job.job_id() for job in jobs]
self._errors.append(
f"Post processing for job {job.job_id()} failed: \n"
f"Post processing for job {job_ids} failed: \n"
+ "".join(traceback.format_exception(type(ex), ex, ex.__traceback__))
)
statuses.add(JobStatus.ERROR)
Expand Down Expand Up @@ -946,12 +957,12 @@ def _copy_metadata(
# inherits an abstract class.
extra_kwargs = {}
for key, val in orig_kwargs.items():
if key not in ["job", "job_done_callback", "timeout"]:
if key not in ["jobs", "jobs_done_callback", "timeout"]:
extra_kwargs[key] = val

new_instance.add_data(
data=orig_kwargs["job"],
post_processing_callback=orig_kwargs["job_done_callback"],
data=orig_kwargs["jobs"],
post_processing_callback=orig_kwargs["jobs_done_callback"],
timeout=orig_kwargs["timeout"],
**extra_kwargs,
)
Expand Down
42 changes: 28 additions & 14 deletions qiskit_experiments/framework/base_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,22 +128,36 @@ def run(
circuits = transpile(self.circuits(backend), backend, **transpile_opts)
self._postprocess_transpiled_circuits(circuits, backend, **run_options)

if isinstance(backend, LegacyBackend):
qobj = assemble(circuits, backend=backend, **run_opts)
job = backend.run(qobj)
# Run experiment jobs
max_experiments = getattr(backend.configuration(), "max_experiments", None)
if max_experiments and len(circuits) > max_experiments:
# Split jobs for backends that have a maximum job size
job_circuits = [
circuits[i : i + max_experiments] for i in range(0, len(circuits), max_experiments)
]
else:
job = backend.run(circuits, **run_opts)

# Add Job to ExperimentData and add analysis for post processing.
run_analysis = None
# Run as single job
job_circuits = [circuits]

# Run jobs
jobs = []
for circs in job_circuits:
if isinstance(backend, LegacyBackend):
qobj = assemble(circs, backend=backend, **run_opts)
job = backend.run(qobj)
else:
job = backend.run(circs, **run_opts)
jobs.append(job)

# Add experiment option metadata
self._add_job_metadata(experiment_data, job, **run_opts)
self._add_job_metadata(experiment_data, jobs, **run_opts)

# Add Job to ExperimentData and add analysis for post processing.
run_analysis = None
if analysis and self.__analysis_class__ is not None:
run_analysis = self.run_analysis

experiment_data.add_data(job, post_processing_callback=run_analysis)
experiment_data.add_data(jobs, post_processing_callback=run_analysis)

# Return the ExperimentData future
return experiment_data
Expand All @@ -167,11 +181,11 @@ def _initialize_experiment_data(

return experiment_data._copy_metadata()

def run_analysis(self, experiment_data, **options) -> ExperimentData:
def run_analysis(self, experiment_data: ExperimentData, **options) -> ExperimentData:
"""Run analysis and update ExperimentData with analysis result.

Args:
experiment_data (ExperimentData): the experiment data to analyze.
experiment_data: the experiment data to analyze.
options: additional analysis options. Any values set here will
override the value from :meth:`analysis_options`
for the current run.
Expand Down Expand Up @@ -365,16 +379,16 @@ def _additional_metadata(self) -> Dict[str, any]:
"""
return {}

def _add_job_metadata(self, experiment_data: ExperimentData, job: BaseJob, **run_options):
def _add_job_metadata(self, experiment_data: ExperimentData, jobs: BaseJob, **run_options):
"""Add runtime job metadata to ExperimentData.

Args:
experiment_data: the experiment data container.
job: the job object.
jobs: the job objects.
run_options: backend run options for the job.
"""
metadata = {
"job_id": job.job_id(),
"job_ids": [job.job_id() for job in jobs],
"experiment_options": copy.copy(self.experiment_options.__dict__),
"transpile_options": copy.copy(self.transpile_options.__dict__),
"analysis_options": copy.copy(self.analysis_options.__dict__),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ def component_analysis(self, index):
"""Return the component experiment Analysis object"""
return self.component_experiment(index).analysis()

def _add_job_metadata(self, experiment_data, job, **run_options):
def _add_job_metadata(self, experiment_data, jobs, **run_options):
# Add composite metadata
super()._add_job_metadata(experiment_data, job, **run_options)
super()._add_job_metadata(experiment_data, jobs, **run_options)

# Add sub-experiment options
for i in range(self.num_experiments):
Expand All @@ -83,7 +83,7 @@ def _add_job_metadata(self, experiment_data, job, **run_options):
" are overridden by composite experiment options."
)
sub_data = experiment_data.component_experiment_data(i)
sub_exp._add_job_metadata(sub_data, job, **run_options)
sub_exp._add_job_metadata(sub_data, jobs, **run_options)

def _postprocess_transpiled_circuits(self, circuits, backend, **run_options):
for expr in self._experiments:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(

# Initialize sub experiments
self._components = [
expr.__experiment_data__(expr, backend, job_ids) for expr in experiment._experiments
expr.__experiment_data__(expr, backend) for expr in experiment._experiments
]

def __str__(self):
Expand Down Expand Up @@ -102,4 +102,4 @@ def _add_single_data(self, data):
sub_data["counts"] = marginal_counts(data["counts"], composite_clbits[i])
else:
sub_data["counts"] = data["counts"]
self._components[index].add_data(sub_data)
self._components[index]._add_single_data(sub_data)
5 changes: 4 additions & 1 deletion qiskit_experiments/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ class FakeJob(Job):

def __init__(self, backend: Union[Backend, BaseBackend], result: Optional[Result] = None):
"""Initialize FakeJob."""
job_id = uuid.uuid4().hex
if result:
job_id = result.job_id
else:
job_id = uuid.uuid4().hex
super().__init__(backend, job_id)
self._result = result

Expand Down
9 changes: 9 additions & 0 deletions releasenotes/notes/job-splitting-775dc9aed9cf20c2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
features:
- |
Adds automatic job splitting to
:class:`~qiskit_experiments.framework.BaseExperiment` for execution of
experiments with a larger number of circuits than can be run in a single
job on the target backend. This enables running large experiments on
legacy and non-IBM backends that don't handle job splitting
automatically.
8 changes: 5 additions & 3 deletions test/fake_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# that they have been altered from the originals.

"""Fake backend class for tests."""
import uuid
from qiskit.providers.backend import BackendV1
from qiskit.providers.models import QasmBackendConfiguration

Expand All @@ -25,7 +26,7 @@ class FakeBackend(BackendV1):
Fake backend for test purposes only.
"""

def __init__(self):
def __init__(self, max_experiments=None):
configuration = QasmBackendConfiguration(
backend_name="dummy_backend",
backend_version="0",
Expand All @@ -38,6 +39,7 @@ def __init__(self):
open_pulse=False,
memory=False,
max_shots=int(1e6),
max_experiments=max_experiments,
coupling_map=None,
)
super().__init__(configuration)
Expand All @@ -50,8 +52,8 @@ def run(self, run_input, **options):
result = {
"backend_name": "Dummmy backend",
"backend_version": "0",
"qobj_id": 0,
"job_id": 0,
"qobj_id": uuid.uuid4().hex,
"job_id": uuid.uuid4().hex,
"success": True,
"results": [],
}
Expand Down
Loading