diff --git a/qiskit_experiments/database_service/db_experiment_data.py b/qiskit_experiments/database_service/db_experiment_data.py index a30e81f989..c061d5c6a9 100644 --- a/qiskit_experiments/database_service/db_experiment_data.py +++ b/qiskit_experiments/database_service/db_experiment_data.py @@ -172,43 +172,21 @@ def _set_service_from_backend(self, backend: Union[Backend, BaseBackend]) -> Non def add_data( self, data: Union[Result, List[Result], Job, List[Job], Dict, List[Dict]], - post_processing_callback: Optional[Callable] = None, timeout: Optional[float] = None, - **kwargs: Any, ) -> None: """Add experiment data. - Note: - This method is not thread safe and should not be called by the - `post_processing_callback` function. - - Note: - If `data` is a ``Job``, this method waits for the job to finish - and calls the `post_processing_callback` function asynchronously. - Args: data: Experiment data to add. Several types are accepted for convenience: - * Result: Add data from this ``Result`` object. * List[Result]: Add data from the ``Result`` objects. * Job: Add data from the job result. * List[Job]: Add data from the job results. * Dict: Add this data. * List[Dict]: Add this list of data. - - post_processing_callback: Callback function invoked when data is - added. If `data` is a ``Job``, the callback is only invoked when - the job finishes successfully. - The following positional arguments are provided to the callback function: - - * This ``DbExperimentData`` object. - * Additional keyword arguments passed to this method. - timeout: Timeout waiting for job to finish, if `data` is a ``Job``. - **kwargs: Keyword arguments to be passed to the callback function. - Raises: TypeError: If the input data type is invalid. """ @@ -218,88 +196,115 @@ def add_data( "Not all post-processing has finished. Adding new data " "may create unexpected analysis results." ) + if not isinstance(data, list): + data = [data] + + # Extract job data and directly add non-job data + job_data = [] + for datum in data: + if isinstance(datum, (Job, BaseJob)): + job_data.append(datum) + elif isinstance(datum, dict): + self._add_single_data(datum) + elif isinstance(datum, Result): + self._add_result_data(datum) + else: + raise TypeError(f"Invalid data type {type(datum)}.") - if isinstance(data, (Job, BaseJob)): - if self.backend and self.backend.name() != data.backend().name(): + # Add futures for job data + for job in job_data: + if self.backend and self.backend.name() != job.backend().name(): LOG.warning( "Adding a job from a backend (%s) that is different " "than the current backend (%s). " "The new backend will be used, but " "service is not changed if one already exists.", - data.backend(), + job.backend(), self.backend, ) - self._backend = data.backend() + self._backend = job.backend() if not self._service: self._set_service_from_backend(self._backend) - self._jobs[data.job_id()] = data - job_kwargs = { - "job": data, - "job_done_callback": post_processing_callback, - "timeout": timeout, - **kwargs, - } - self._job_futures.append( - ( - job_kwargs, - self._executor.submit(self._wait_for_job, **job_kwargs), - ) + self._jobs[job.job_id()] = job + + job_kwargs = { + "jobs": job_data, + "timeout": timeout, + } + self._job_futures.append( + ( + job_kwargs, + self._executor.submit(self._add_jobs_result, **job_kwargs), ) - if self.auto_save: - self.save_metadata() - return + ) + if self.auto_save: + self.save_metadata() - if isinstance(data, dict): - self._add_single_data(data) - elif isinstance(data, Result): - self._add_result_data(data) - elif isinstance(data, list): - for dat in data: - self.add_data(dat) + def add_processing_callback( + self, + callback: Callable, + **kwargs: Any, + ): + """Add processing callback for after experiment job has run. + + This method waits for the last set of jobs to finish and calls + the `callback` function asynchronously. + + Note: + This method is not thread safe and should not be called by the + ` callback` function. + + Args: + callback: Callback function invoked when job finishes successfully. + The following positional arguments are provided to the callback function: + * This ``DbExperimentData`` object. + * Additional keyword arguments passed to this method. + **kwargs: Keyword arguments to be passed to the callback function. + """ + # Check if there are no futures to wait on + if not self._job_futures: + callback(self, **kwargs) else: - raise TypeError(f"Invalid data type {type(data)}.") + # Get the last added future and add a done callback + _, future = self._job_futures[-1] + + def future_callback(fut): + if not fut.cancelled(): + callback(self, **kwargs) - if post_processing_callback is not None: - post_processing_callback(self, **kwargs) + future.add_done_callback(future_callback) - def _wait_for_job( + if self.auto_save: + self.save_metadata() + + def _add_jobs_result( self, - job: Union[Job, BaseJob], - job_done_callback: Optional[Callable] = None, + jobs: List[Union[Job, BaseJob]], timeout: Optional[float] = None, - **kwargs: Any, ) -> None: - """Wait for a job to finish. + """Wait for a job to finish and add job result data. Args: - job: Job to wait for. - job_done_callback: Callback function to invoke when job finishes. + jobs: Jobs to wait for. timeout: Timeout waiting for job to finish. - **kwargs: Keyword arguments to be passed to the callback function. Raises: - Exception: If post processing failed. + Exception: If any of the jobs failed. """ - LOG.debug("Waiting for job %s to finish.", job.job_id()) - try: + for job in jobs: + LOG.debug("Waiting for job %s to finish.", job.job_id()) try: - job_result = job.result(timeout=timeout) - except TypeError: # Not all jobs take timeout. - job_result = job.result() - with self._data.lock: - # Hold the lock so we add the block of results together. - self._add_result_data(job_result) - except Exception: # pylint: disable=broad-except - LOG.warning("Job %s failed:\n%s", job.job_id(), traceback.format_exc()) - raise - - try: - if job_done_callback: - job_done_callback(self, **kwargs) - except Exception: # pylint: disable=broad-except - LOG.warning("Post processing function failed:\n%s", traceback.format_exc()) - raise + try: + job_result = job.result(timeout=timeout) + except TypeError: # Not all jobs take timeout. + job_result = job.result() + with self._data.lock: + # Hold the lock so we add the block of results together. + self._add_result_data(job_result) + except Exception: # pylint: disable=broad-except + LOG.warning("Job %s failed:\n%s", job.job_id(), traceback.format_exc()) + raise def _add_result_data(self, result: Result) -> None: """Add data from a Result object @@ -811,12 +816,13 @@ def load(cls, experiment_id: str, service: DatabaseServiceV1) -> "DbExperimentDa def cancel_jobs(self) -> None: """Cancel any running jobs.""" for kwargs, fut in self._job_futures.copy(): - job = kwargs["job"] - if not fut.done() and job.status() not in JOB_FINAL_STATES: - try: - job.cancel() - except Exception as err: # pylint: disable=broad-except - LOG.info("Unable to cancel job %s: %s", job.job_id(), err) + if not fut.done(): + for job in kwargs["jobs"]: + if job.status() not in JOB_FINAL_STATES: + try: + job.cancel() + except Exception as err: # pylint: disable=broad-except + LOG.info("Unable to cancel job %s: %s", job.job_id(), err) def block_for_results(self, timeout: Optional[float] = None) -> "DbExperimentDataV1": """Block until all pending jobs and their post processing finish. @@ -827,11 +833,21 @@ def block_for_results(self, timeout: Optional[float] = None) -> "DbExperimentDat Returns: The experiment data with finished jobs and post-processing. """ + job_ids = [] + job_futures = [] for kwargs, fut in self._job_futures.copy(): - job = kwargs["job"] - LOG.info("Waiting for job %s and its post processing to finish.", job.job_id()) + job_ids += [job.job_id() for job in kwargs["jobs"]] + job_futures.append(fut) + LOG.info("Waiting for jobs %s and its post processing to finish.", job_ids) + + # The Python concurrency module does not invoke the future done callback + # functions when calling Future.result(), so we force them to be run before + # returning by explicitly calling Future.set_result() which does invoke + # the done callback functions + for fut in job_futures: with contextlib.suppress(Exception): - fut.result(timeout) + fut_res = fut.result(timeout=timeout) + fut.set_result(fut_res) return self def status(self) -> str: @@ -862,24 +878,26 @@ def status(self) -> str: with self._job_futures.lock: for idx, item in enumerate(self._job_futures): kwargs, fut = item - job = kwargs["job"] - job_status = job.status() - statuses.add(job_status) - if job_status == JobStatus.ERROR: - job_err = "." - if hasattr(job, "error_message"): - job_err = ": " + job.error_message() - self._errors.append(f"Job {job.job_id()} failed{job_err}") - - if fut.done(): - self._job_futures[idx] = None - ex = fut.exception() - if ex: - self._errors.append( - f"Post processing for job {job.job_id()} failed: \n" - + "".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) - ) - statuses.add(JobStatus.ERROR) + for job in kwargs["jobs"]: + job_status = job.status() + statuses.add(job_status) + if job_status == JobStatus.ERROR: + job_err = "." + if hasattr(job, "error_message"): + job_err = ": " + job.error_message() + self._errors.append(f"Job {job.job_id()} failed{job_err}") + + if fut.done(): + self._job_futures[idx] = None + ex = fut.exception() + if ex: + self._errors.append( + f"Post processing for job {job.job_id()} failed: \n" + + "".join( + traceback.format_exception(type(ex), ex, ex.__traceback__) + ) + ) + statuses.add(JobStatus.ERROR) self._job_futures = ThreadSafeList(list(filter(None, self._job_futures))) @@ -946,12 +964,11 @@ def _copy_metadata( # inherits an abstract class. extra_kwargs = {} for key, val in orig_kwargs.items(): - if key not in ["job", "job_done_callback", "timeout"]: + if key not in ["jobs", "timeout"]: extra_kwargs[key] = val new_instance.add_data( - data=orig_kwargs["job"], - post_processing_callback=orig_kwargs["job_done_callback"], + data=orig_kwargs["jobs"], timeout=orig_kwargs["timeout"], **extra_kwargs, ) diff --git a/qiskit_experiments/framework/base_experiment.py b/qiskit_experiments/framework/base_experiment.py index e6161ba1f3..ff0fdd877b 100644 --- a/qiskit_experiments/framework/base_experiment.py +++ b/qiskit_experiments/framework/base_experiment.py @@ -128,22 +128,36 @@ def run( circuits = transpile(self.circuits(backend), backend, **transpile_opts) self._postprocess_transpiled_circuits(circuits, backend, **run_options) - if isinstance(backend, LegacyBackend): - qobj = assemble(circuits, backend=backend, **run_opts) - job = backend.run(qobj) + # Run experiment jobs + max_experiments = getattr(backend.configuration(), "max_experiments", None) + if max_experiments and len(circuits) > max_experiments: + # Split jobs for backends that have a maximum job size + job_circuits = [ + circuits[i : i + max_experiments] for i in range(0, len(circuits), max_experiments) + ] else: - job = backend.run(circuits, **run_opts) - - # Add Job to ExperimentData and add analysis for post processing. - run_analysis = None + # Run as single job + job_circuits = [circuits] + + # Run jobs + jobs = [] + for circs in job_circuits: + if isinstance(backend, LegacyBackend): + qobj = assemble(circs, backend=backend, **run_opts) + job = backend.run(qobj) + else: + job = backend.run(circs, **run_opts) + jobs.append(job) # Add experiment option metadata - self._add_job_metadata(experiment_data, job, **run_opts) + self._add_job_metadata(experiment_data, jobs, **run_opts) - if analysis and self.__analysis_class__ is not None: - run_analysis = self.run_analysis + # Add jobs + experiment_data.add_data(jobs) - experiment_data.add_data(job, post_processing_callback=run_analysis) + # Optionally run analysis + if analysis and self.__analysis_class__: + self.run_analysis(experiment_data) # Return the ExperimentData future return experiment_data @@ -189,7 +203,8 @@ def run_analysis(self, experiment_data, **options) -> ExperimentData: # Run analysis analysis = self.analysis() - analysis.run(experiment_data, **analysis_options) + experiment_data.add_processing_callback(analysis.run, **analysis_options) + return experiment_data @property @@ -365,16 +380,18 @@ def _additional_metadata(self) -> Dict[str, any]: """ return {} - def _add_job_metadata(self, experiment_data: ExperimentData, job: BaseJob, **run_options): + def _add_job_metadata( + self, experiment_data: ExperimentData, jobs: List[BaseJob], **run_options + ): """Add runtime job metadata to ExperimentData. Args: experiment_data: the experiment data container. - job: the job object. + jobs: the job objects. run_options: backend run options for the job. """ metadata = { - "job_id": job.job_id(), + "job_ids": [job.job_id() for job in jobs], "experiment_options": copy.copy(self.experiment_options.__dict__), "transpile_options": copy.copy(self.transpile_options.__dict__), "analysis_options": copy.copy(self.analysis_options.__dict__), diff --git a/qiskit_experiments/framework/composite/composite_experiment.py b/qiskit_experiments/framework/composite/composite_experiment.py index b58a642fe4..5c41d0e0f2 100644 --- a/qiskit_experiments/framework/composite/composite_experiment.py +++ b/qiskit_experiments/framework/composite/composite_experiment.py @@ -64,9 +64,9 @@ def component_analysis(self, index): """Return the component experiment Analysis object""" return self.component_experiment(index).analysis() - def _add_job_metadata(self, experiment_data, job, **run_options): + def _add_job_metadata(self, experiment_data, jobs, **run_options): # Add composite metadata - super()._add_job_metadata(experiment_data, job, **run_options) + super()._add_job_metadata(experiment_data, jobs, **run_options) # Add sub-experiment options for i in range(self.num_experiments): @@ -83,7 +83,7 @@ def _add_job_metadata(self, experiment_data, job, **run_options): " are overridden by composite experiment options." ) sub_data = experiment_data.component_experiment_data(i) - sub_exp._add_job_metadata(sub_data, job, **run_options) + sub_exp._add_job_metadata(sub_data, jobs, **run_options) def _postprocess_transpiled_circuits(self, circuits, backend, **run_options): for expr in self._experiments: diff --git a/test/database_service/test_db_experiment_data.py b/test/database_service/test_db_experiment_data.py index 2a429a2b5c..85402b987b 100644 --- a/test/database_service/test_db_experiment_data.py +++ b/test/database_service/test_db_experiment_data.py @@ -148,7 +148,8 @@ def _callback(_exp_data): called_back = False exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") - exp_data.add_data(a_job, post_processing_callback=_callback) + exp_data.add_data(a_job) + exp_data.add_processing_callback(_callback) exp_data.block_for_results() self.assertTrue(called_back) @@ -180,7 +181,8 @@ def _callback(_exp_data): for data, _ in subtests: with self.subTest(data=data): - exp_data.add_data(data, post_processing_callback=_callback) + exp_data.add_data(data) + exp_data.add_processing_callback(_callback) self.assertEqual(len(subtests), called_back_count) @@ -199,7 +201,8 @@ def _callback(_exp_data, **kwargs): called_back = False callback_kwargs = "foo" exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") - exp_data.add_data(a_job, _callback, foo=callback_kwargs) + exp_data.add_data(a_job) + exp_data.add_processing_callback(_callback, foo=callback_kwargs) exp_data.block_for_results() self.assertTrue(called_back) @@ -216,7 +219,8 @@ def _callback(_exp_data, **kwargs): self.addCleanup(event.set) exp_data = DbExperimentData(experiment_type="qiskit_test") - exp_data.add_data(a_job, _callback, event=event) + exp_data.add_data(a_job) + exp_data.add_processing_callback(_callback, event=event) with self.assertLogs("qiskit_experiments", "WARNING"): exp_data.add_data({"foo": "bar"}) @@ -530,13 +534,14 @@ def test_status_job_pending(self): event = threading.Event() job2 = mock.create_autospec(Job, instance=True) - job2.result = lambda *args, **kwargs: event.wait() + job2.result = lambda *args, **kwargs: event.wait(timeout=2) job2.status.return_value = JobStatus.RUNNING self.addCleanup(event.set) exp_data = DbExperimentData(experiment_type="qiskit_test") exp_data.add_data(job1) - exp_data.add_data(job2, lambda *args, **kwargs: event.wait()) + exp_data.add_data(job2) + exp_data.add_processing_callback(lambda *args, **kwargs: event.wait(timeout=2)) self.assertEqual("RUNNING", exp_data.status()) # Cleanup @@ -569,7 +574,8 @@ def test_status_post_processing(self): exp_data = DbExperimentData(experiment_type="qiskit_test") exp_data.add_data(job) - exp_data.add_data(job, lambda *args, **kwargs: event.wait()) + exp_data.add_data(job) + exp_data.add_processing_callback(lambda *args, **kwargs: event.wait(timeout=2)) self.assertEqual("POST_PROCESSING", exp_data.status()) def test_status_post_processing_error(self): @@ -584,7 +590,8 @@ def _post_processing(*args, **kwargs): exp_data = DbExperimentData(experiment_type="qiskit_test") exp_data.add_data(job) with self.assertLogs(logger="qiskit_experiments.database_service", level="WARN") as cm: - exp_data.add_data(job, _post_processing) + exp_data.add_data(job) + exp_data.add_processing_callback(_post_processing) exp_data.block_for_results() self.assertEqual("ERROR", exp_data.status()) self.assertIn("Kaboom!", ",".join(cm.output)) @@ -595,7 +602,8 @@ def test_status_done(self): job.result.return_value = self._get_job_result(3) exp_data = DbExperimentData(experiment_type="qiskit_test") exp_data.add_data(job) - exp_data.add_data(job, lambda *args, **kwargs: time.sleep(1)) + exp_data.add_data(job) + exp_data.add_processing_callback(lambda *args, **kwargs: time.sleep(1)) exp_data.block_for_results() self.assertEqual("DONE", exp_data.status()) @@ -610,7 +618,7 @@ def test_cancel_jobs(self): """Test canceling experiment jobs.""" def _job_result(): - event.wait() + event.wait(timeout=2) raise ValueError("Job was cancelled.") exp_data = DbExperimentData(experiment_type="qiskit_test") @@ -654,7 +662,8 @@ def _post_processing(*args, **kwargs): # pylint: disable=unused-argument exp_data = DbExperimentData(experiment_type="qiskit_test") with self.assertLogs(logger="qiskit_experiments.database_service", level="WARN") as cm: - exp_data.add_data(job1, _post_processing) + exp_data.add_data(job1) + exp_data.add_processing_callback(_post_processing) exp_data.add_data(job2) exp_data.block_for_results() self.assertEqual("ERROR", exp_data.status()) @@ -680,7 +689,8 @@ def _sleeper(*args, **kwargs): # pylint: disable=unused-argument job = mock.create_autospec(Job, instance=True) job.result = _sleeper exp_data = DbExperimentData(experiment_type="qiskit_test") - exp_data.add_data(job, _sleeper) + exp_data.add_data(job) + exp_data.add_processing_callback(_sleeper) exp_data.block_for_results() self.assertEqual(2, sleep_count) @@ -714,11 +724,11 @@ def test_copy_metadata_pending_job(self): """Test copy metadata with a pending job.""" def _job1_result(): - event.wait() + event.wait(timeout=2) return job_results[0] def _job2_result(): - event.wait() + event.wait(timeout=2) return job_results[1] exp_data = DbExperimentData(experiment_type="qiskit_test")