diff --git a/qiskit_experiments/database_service/db_experiment_data.py b/qiskit_experiments/database_service/db_experiment_data.py index 5b302798cb..963c5c6501 100644 --- a/qiskit_experiments/database_service/db_experiment_data.py +++ b/qiskit_experiments/database_service/db_experiment_data.py @@ -13,6 +13,8 @@ """Stored data class.""" import logging +import dataclasses +import threading import uuid from typing import Optional, List, Any, Union, Callable, Dict import copy @@ -36,6 +38,7 @@ save_data, qiskit_version, plot_to_svg_bytes, + combined_timeout, ThreadSafeOrderedDict, ThreadSafeList, ) @@ -65,6 +68,17 @@ def service_exception_to_warning(): LOG.warning("Experiment service operation failed: %s", traceback.format_exc()) +@dataclasses.dataclass +class CallbackStatus: + """Dataclass for analysis callback status""" + + callback: Callable + kwargs: Dict = dataclasses.field(default_factory=dict) + status: JobStatus = JobStatus.QUEUED + event: threading.Event = dataclasses.field(default_factory=threading.Event) + error_msg: Optional[str] = None + + class DbExperimentData: """Base common type for all versioned DbExperimentData classes. @@ -147,7 +161,8 @@ def __init__( self._jobs = ThreadSafeOrderedDict(job_ids or []) self._job_futures = ThreadSafeList() - self._errors = [] + self._callback_statuses = ThreadSafeOrderedDict() + self._callback_future = None self._data = ThreadSafeList() self._figures = ThreadSafeOrderedDict(figure_names or []) @@ -172,52 +187,29 @@ def _set_service_from_backend(self, backend: Union[Backend, BaseBackend]) -> Non def add_data( self, data: Union[Result, List[Result], Job, List[Job], Dict, List[Dict]], - post_processing_callback: Optional[Callable] = None, timeout: Optional[float] = None, - **kwargs: Any, ) -> None: """Add experiment data. - Note: - This method is not thread safe and should not be called by the - `post_processing_callback` function. - - Note: - If `data` is a ``Job``, this method waits for the job to finish - and calls the `post_processing_callback` function asynchronously. - Args: - data: Experiment data to add. - Several types are accepted for convenience: - - * Result: Add data from this ``Result`` object. - * List[Result]: Add data from the ``Result`` objects. - * Job: Add data from the job result. - * List[Job]: Add data from the job results. - * Dict: Add this data. - * List[Dict]: Add this list of data. - - post_processing_callback: Callback function invoked when data is - added. If `data` is a ``Job``, the callback is only invoked when - the job finishes successfully. - The following positional arguments are provided to the callback function: - - * This ``DbExperimentData`` object. - * Additional keyword arguments passed to this method. - + data: Experiment data to add. Several types are accepted for convenience + * Result: Add data from this ``Result`` object. + * List[Result]: Add data from the ``Result`` objects. + * Job: Add data from the job result. + * List[Job]: Add data from the job results. + * Dict: Add this data. + * List[Dict]: Add this list of data. timeout: Timeout waiting for job to finish, if `data` is a ``Job``. - **kwargs: Keyword arguments to be passed to the callback function. - Raises: TypeError: If the input data type is invalid. """ - with self._job_futures.lock: - if any(not fut.done() for _, fut in self._job_futures): - LOG.warning( - "Not all post-processing has finished. Adding new data " - "may create unexpected analysis results." - ) + if any(not status.event.is_set() for status in self._callback_statuses.values()): + LOG.warning( + "Not all post-processing has finished. Adding new data " + "may create unexpected analysis results." + ) + if not isinstance(data, list): data = [data] @@ -254,45 +246,98 @@ def add_data( if job_data: job_kwargs = { "jobs": job_data, - "jobs_done_callback": post_processing_callback, "timeout": timeout, - **kwargs, } self._job_futures.append( - ( - job_kwargs, - self._executor.submit(self._wait_for_job, **job_kwargs), - ) + (job_kwargs, self._executor.submit(self._add_jobs_data, **job_kwargs)) ) - elif post_processing_callback: - post_processing_callback(self, **kwargs) if self.auto_save: self.save_metadata() - def _wait_for_job( + def add_analysis_callback(self, callback: Callable, **kwargs: Any): + """Add analysis callback for running after experiment data jobs are finished. + + This method adds the `callback` function to a queue to be run + asynchronously after complition of any running jobs, or immediately + if no running jobs. If this method is called multiple times the + callback functions will be executed in the order they were + added. + + Args: + callback: Callback function invoked when job finishes successfully. + The callback function will be called as + ``callback(expdata, **kwargs)`` where `expdata` is this + ``DbExperimentData`` object, and `kwargs` are any additional + keywork arguments passed to this method. + **kwargs: Keyword arguments to be passed to the callback function. + """ + callback_id = uuid.uuid4() + self._callback_statuses[callback_id] = CallbackStatus(callback, kwargs=kwargs) + + # Wrap callback function to handle reporting status and catching + # any exceptions and their error messages + def _wrapped_callback(): + try: + self._callback_statuses[callback_id].status = JobStatus.RUNNING + callback(self, **kwargs) + self._callback_statuses[callback_id].status = JobStatus.DONE + except Exception as ex: # pylint: disable=broad-except + self._callback_statuses[callback_id].status = JobStatus.ERROR + error_msg = f"Analysis callback {callback} failed: \n" "".join( + traceback.format_exception(type(ex), ex, ex.__traceback__) + ) + self._callback_statuses[callback_id].error_msg = error_msg + LOG.warning("Analysis callback %s failed:\n%s", callback, traceback.format_exc()) + self._callback_statuses[callback_id].event.set() + + with self._job_futures.lock: + # Determine if a future is running that we need to add callback to + fut_done = True + if self._job_futures: + _, fut = self._job_futures[-1] + fut_done = fut.done() + if fut_done and self._callback_future is not None: + fut = self._callback_future + fut_done = fut.done() + if fut_done: + fut = None + + if fut_done: + # Submit future so analysis can run async even if there are no + # running jobs or running analysis. + self._callback_future = self._executor.submit(_wrapped_callback) + else: + # Wrap the wrapped function for the format expected by Python + # Future.add_done_callback + def _done_callback(fut): + if fut.cancelled(): + self._callback_statuses[callback_id].status = JobStatus.CANCELLED + self._callback_statuses[callback_id].event.set() + else: + _wrapped_callback() + + fut.add_done_callback(_done_callback) + + def _add_jobs_data( self, jobs: List[Union[Job, BaseJob]], - jobs_done_callback: Optional[Callable] = None, timeout: Optional[float] = None, - **kwargs: Any, ) -> None: - """Wait for a job to finish. + """Wait for a job to finish and add job result data. Args: jobs: Jobs to wait for. - jobs_done_callback: Callback function to invoke when jobs finish. timeout: Timeout waiting for job to finish. - **kwargs: Keyword arguments to be passed to the callback function. Raises: - Exception: If post processing failed. + Exception: If any of the jobs failed. """ for job in jobs: - LOG.debug("Waiting for jobs %s to finish.", job.job_id()) + LOG.debug("Waiting for job %s to finish.", job.job_id()) try: try: - job_result = job.result(timeout=timeout) + job_result, timeout = combined_timeout(job.result, timeout) except TypeError: # Not all jobs take timeout. job_result = job.result() with self._data.lock: @@ -302,13 +347,6 @@ def _wait_for_job( LOG.warning("Job %s failed:\n%s", job.job_id(), traceback.format_exc()) raise - try: - if jobs_done_callback: - jobs_done_callback(self, **kwargs) - except Exception: # pylint: disable=broad-except - LOG.warning("Post processing function failed:\n%s", traceback.format_exc()) - raise - def _add_result_data(self, result: Result) -> None: """Add data from a Result object @@ -833,7 +871,7 @@ def cancel_jobs(self) -> None: LOG.info("Unable to cancel job %s: %s", job.job_id(), err) def block_for_results(self, timeout: Optional[float] = None) -> "DbExperimentDataV1": - """Block until all pending jobs and their post processing finish. + """Block until all pending jobs and analysis callbacks finish. Args: timeout: Timeout waiting for results. @@ -841,11 +879,51 @@ def block_for_results(self, timeout: Optional[float] = None) -> "DbExperimentDat Returns: The experiment data with finished jobs and post-processing. """ + # Wait for jobs to finish for kwargs, fut in self._job_futures.copy(): - job_ids = [job.job_id() for job in kwargs["jobs"]] - LOG.info("Waiting for jobs %s and its post processing to finish.", job_ids) - with contextlib.suppress(Exception): - fut.result(timeout) + jobs = [job.job_id() for job in kwargs["jobs"]] + LOG.info("Waiting for data job %s to finish.", jobs) + try: + _, timeout = combined_timeout(fut.result, timeout) + except futures.TimeoutError: + LOG.warning( + "Possibly incomplete experiment data: Retrieving a job" " results timed out." + ) + except Exception: # pylint: disable = broad-except + LOG.warning( + "Possibly incomplete experiment data: Retrieving a job results" + " rased an exception." + ) + + # Check job status and show warning if cancelled or error + jobs_status = self._job_status() + if jobs_status == "CANCELLED": + LOG.warning("Possibly incomplete experiment data: a Job was cancelled.") + elif jobs_status == "ERROR": + LOG.warning("Possibly incomplete experiment data: A Job returned an error.") + + # Wait for analysis callbacks to finish + if self._callback_statuses: + for status in self._callback_statuses.values(): + LOG.info("Waiting for analysis callback %s to finish.", status.callback) + finished, timeout = combined_timeout(status.event.wait, timeout) + if not finished: + LOG.warning( + "Possibly incomplete analysis results:" + " analysis" + " callback %s timed out.", + status.callback, + ) + + # Check analysis status and show warning if cancelled or error + callback_status = self._callback_status() + if callback_status == "CANCELLED": + LOG.warning("Possibly incomplete analysis results: an analysis callback was cancelled.") + elif callback_status == "ERROR": + LOG.warning( + "Possibly incomplete analysis results: an analysis callback raised an error." + ) + return self def status(self) -> str: @@ -854,64 +932,126 @@ def status(self) -> str: If the experiment consists of multiple jobs, the returned status is mapped in the following order: - * INITIALIZING - if any job is being initialized. - * VALIDATING - if any job is being validated. - * QUEUED - if any job is queued. - * RUNNING - if any job is still running. * ERROR - if any job incurred an error. * CANCELLED - if any job is cancelled. - * POST_PROCESSING - if any of the post-processing functions is still running. - * DONE - if all jobs and their post-processing functions finished. + * RUNNING - if any job is still running. + * QUEUED - if any job is queued. + * VALIDATING - if any job is being validated. + * INITIALIZING - if any job is being initialized. + * POST_PROCESSING - if any analysis callbacks are still running + * DONE - if all jobs and analysis callbacks are finished. + + .. note:: + + If an experiment has status ERROR or CANCELLED there may still + be pending or running jobs. In these cases it may be beneficial + to call :meth:`cancel_jobs` to terminate these remaining jobs. Returns: Data processing status. """ if all( len(container) == 0 - for container in [self._data, self._jobs, self._figures, self._analysis_results] + for container in [ + self._data, + self._jobs, + self._callback_statuses, + self._figures, + self._analysis_results, + ] ): return "INITIALIZING" + job_status = self._job_status() + if job_status != "DONE": + return job_status + + callback_status = self._callback_status() + if callback_status in ["QUEUED", "RUNNING"]: + return "POST_PROCESSING" + + return callback_status + + def _job_status(self) -> str: + """Return the experiment job execution status. + + If the experiment consists of multiple jobs, the returned status is mapped + in the following order: + + * ERROR - if any job incurred an error. + * CANCELLED - if any job is cancelled. + * RUNNING - if any job is still running. + * QUEUED - if any job is queued. + * VALIDATING - if any job is being validated. + * INITIALIZING - if any job is being initialized. + * DONE - if all jobs are finished. + + Returns: + Job execution status. + """ + # Backend jobs statuses = set() with self._job_futures.lock: - for idx, item in enumerate(self._job_futures): - kwargs, fut = item - jobs = kwargs["jobs"] - for job in jobs: + for idx, (kwargs, fut) in enumerate(self._job_futures): + all_jobs_done = True + for job in kwargs["jobs"]: job_status = job.status() statuses.add(job_status) - if job_status == JobStatus.ERROR: - job_err = "." - if hasattr(job, "error_message"): - job_err = ": " + job.error_message() - self._errors.append(f"Job {job.job_id()} failed{job_err}") - + if job_status != JobStatus.DONE: + all_jobs_done = False if fut.done(): - self._job_futures[idx] = None - ex = fut.exception() - if ex: - job_ids = [job.job_id() for job in jobs] - self._errors.append( - f"Post processing for job {job_ids} failed: \n" - + "".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) - ) + if fut.exception(): statuses.add(JobStatus.ERROR) - + elif all_jobs_done: + # If all jobs ran successfully we can remove the future + self._job_futures[idx] = None self._job_futures = ThreadSafeList(list(filter(None, self._job_futures))) for stat in [ - JobStatus.INITIALIZING, - JobStatus.VALIDATING, - JobStatus.QUEUED, - JobStatus.RUNNING, JobStatus.ERROR, JobStatus.CANCELLED, + JobStatus.RUNNING, + JobStatus.QUEUED, + JobStatus.VALIDATING, + JobStatus.INITIALIZING, ]: if stat in statuses: return stat.name - if self._job_futures: - return "POST_PROCESSING" + return "DONE" + + def _callback_status(self) -> str: + """Return the data analysis callback post-processing status. + + If the experiment consists of multiple analysis callbacks, the returned + status is mapped in the following order: + + * ERROR - if any analysis callback incurred an error. + * CANCELLED - if any analysis callback is cancelled. + * RUNNING - if any analysis callbacks are still running. + * QUEUED - if any analysis callback is queued. + * DONE - if all analysis callbacks are finished. + + Returns: + Analysis callback status. + """ + statuses = set() + for status in self._callback_statuses.values(): + statuses.add(status.status) + + # Remove analysis future if it is done, since we store all statuses + # In the _callback_status field. + if self._callback_future is not None and self._callback_future.done(): + self._callback_future = None + + for stat in [ + JobStatus.ERROR, + JobStatus.CANCELLED, + JobStatus.RUNNING, + JobStatus.QUEUED, + ]: + if stat in statuses: + return stat.name return "DONE" @@ -921,8 +1061,32 @@ def errors(self) -> str: Returns: Experiment errors. """ - self.status() # Collect new errors. - return "\n".join(self._errors) + errors = [] + # Get any future errors + for fut_kwargs, fut in self._job_futures: + if fut.done(): + ex = fut.exception() + if ex: + jobs = [job.job_id() for job in fut_kwargs["jobs"]] + errors.append( + f"Job {jobs} failed: \n" + + "".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) + ) + + # Get any job errors + for job in self._jobs.values(): + if job and job.status() == JobStatus.ERROR: + job_err = "." + if hasattr(job, "error_message"): + job_err = ": " + job.error_message() + errors.append(f"Job {job.job_id()} failed{job_err}") + + # Get any analysis callback errors + for status in self._callback_statuses.values(): + if status.error_msg is not None: + errors.append(status.error_msg) + + return "\n".join(errors) def _copy_metadata( self, new_instance: Optional["DbExperimentDataV1"] = None @@ -951,7 +1115,6 @@ def _copy_metadata( new_instance._auto_save = self._auto_save new_instance._service = self._service new_instance._extra_data = self._extra_data - new_instance._errors = self._errors with self._data.lock: # Hold the lock so no new data can be added. new_instance._data = self._data.copy_object() @@ -962,12 +1125,11 @@ def _copy_metadata( # inherits an abstract class. extra_kwargs = {} for key, val in orig_kwargs.items(): - if key not in ["jobs", "jobs_done_callback", "timeout"]: + if key not in ["jobs", "timeout"]: extra_kwargs[key] = val new_instance.add_data( data=orig_kwargs["jobs"], - post_processing_callback=orig_kwargs["jobs_done_callback"], timeout=orig_kwargs["timeout"], **extra_kwargs, ) diff --git a/qiskit_experiments/database_service/utils.py b/qiskit_experiments/database_service/utils.py index c377cdcff9..4dcb381c03 100644 --- a/qiskit_experiments/database_service/utils.py +++ b/qiskit_experiments/database_service/utils.py @@ -14,6 +14,7 @@ import io import logging +import time import threading import traceback from abc import ABC, abstractmethod @@ -105,6 +106,27 @@ def plot_to_svg_bytes(figure: "pyplot.Figure") -> bytes: return figure_data +def combined_timeout( + func: Callable, timeout: Optional[float] = None +) -> Tuple[Any, Union[float, None]]: + """Call func(timeout) and return reduced timeout for subsequent funcs. + + Args: + func: A function with signature func(timeout). + timeout: The time to wait for function call. + + Returns: + A pair of the function return and the updated timeout variable + for remaining time left to wait for other functions. + """ + time_start = time.time() + ret = func(timeout) + time_stop = time.time() + if timeout is not None: + timeout = max(0, timeout + time_start - time_stop) + return ret, timeout + + def save_data( is_new: bool, new_func: Callable, diff --git a/qiskit_experiments/framework/base_experiment.py b/qiskit_experiments/framework/base_experiment.py index 04ac321fcf..2799cdef2a 100644 --- a/qiskit_experiments/framework/base_experiment.py +++ b/qiskit_experiments/framework/base_experiment.py @@ -152,12 +152,12 @@ def run( # Add experiment option metadata self._add_job_metadata(experiment_data, jobs, **run_opts) - # Add Job to ExperimentData and add analysis for post processing. - run_analysis = None - if analysis and self.__analysis_class__ is not None: - run_analysis = self.run_analysis + # Add jobs + experiment_data.add_data(jobs) - experiment_data.add_data(jobs, post_processing_callback=run_analysis) + # Optionally run analysis + if analysis and self.__analysis_class__: + experiment_data.add_analysis_callback(self.run_analysis) # Return the ExperimentData future return experiment_data diff --git a/releasenotes/notes/add-analysis-callback-018da6edf8a5a27f.yaml b/releasenotes/notes/add-analysis-callback-018da6edf8a5a27f.yaml new file mode 100644 index 0000000000..133b14d81e --- /dev/null +++ b/releasenotes/notes/add-analysis-callback-018da6edf8a5a27f.yaml @@ -0,0 +1,16 @@ +--- +features: + - | + Adds + :meth:`~qiskit_experiments.database_service.DbExperimentData.add_analysis_callback` + method to :class:`~qiskit_experiments.database_service.DbExperimentData` + for adding a post-processing analysis function to run as a callback after + currently executing experiment jobs are finished. +upgrade: + - | + The ``callback`` and ``**kwarg`` arguments have been removed from + :meth:`~qiskit_experiments.database_services.DbExperimentData.add_data` + To add a callback function to run after experiment jobs have finished + executing use the + :meth:`~qiskit_experiments.database_services.DbExperimentData.add_analysis_callback` + method instead. diff --git a/test/database_service/test_db_experiment_data.py b/test/database_service/test_db_experiment_data.py index 2a429a2b5c..b2364df7cb 100644 --- a/test/database_service/test_db_experiment_data.py +++ b/test/database_service/test_db_experiment_data.py @@ -148,7 +148,8 @@ def _callback(_exp_data): called_back = False exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") - exp_data.add_data(a_job, post_processing_callback=_callback) + exp_data.add_data(a_job) + exp_data.add_analysis_callback(_callback) exp_data.block_for_results() self.assertTrue(called_back) @@ -180,7 +181,9 @@ def _callback(_exp_data): for data, _ in subtests: with self.subTest(data=data): - exp_data.add_data(data, post_processing_callback=_callback) + exp_data.add_data(data) + exp_data.add_analysis_callback(_callback) + exp_data.block_for_results() self.assertEqual(len(subtests), called_back_count) @@ -199,7 +202,8 @@ def _callback(_exp_data, **kwargs): called_back = False callback_kwargs = "foo" exp_data = DbExperimentData(backend=self.backend, experiment_type="qiskit_test") - exp_data.add_data(a_job, _callback, foo=callback_kwargs) + exp_data.add_data(a_job) + exp_data.add_analysis_callback(_callback, foo=callback_kwargs) exp_data.block_for_results() self.assertTrue(called_back) @@ -216,7 +220,8 @@ def _callback(_exp_data, **kwargs): self.addCleanup(event.set) exp_data = DbExperimentData(experiment_type="qiskit_test") - exp_data.add_data(a_job, _callback, event=event) + exp_data.add_analysis_callback(_callback, event=event) + exp_data.add_data(a_job) with self.assertLogs("qiskit_experiments", "WARNING"): exp_data.add_data({"foo": "bar"}) @@ -530,13 +535,14 @@ def test_status_job_pending(self): event = threading.Event() job2 = mock.create_autospec(Job, instance=True) - job2.result = lambda *args, **kwargs: event.wait() + job2.result = lambda *args, **kwargs: event.wait(timeout=15) job2.status.return_value = JobStatus.RUNNING self.addCleanup(event.set) exp_data = DbExperimentData(experiment_type="qiskit_test") exp_data.add_data(job1) - exp_data.add_data(job2, lambda *args, **kwargs: event.wait()) + exp_data.add_data(job2) + exp_data.add_analysis_callback(lambda *args, **kwargs: event.wait(timeout=15)) self.assertEqual("RUNNING", exp_data.status()) # Cleanup @@ -569,8 +575,9 @@ def test_status_post_processing(self): exp_data = DbExperimentData(experiment_type="qiskit_test") exp_data.add_data(job) - exp_data.add_data(job, lambda *args, **kwargs: event.wait()) - self.assertEqual("POST_PROCESSING", exp_data.status()) + exp_data.add_analysis_callback((lambda *args, **kwargs: event.wait(timeout=15))) + status = exp_data.status() + self.assertEqual("POST_PROCESSING", status) def test_status_post_processing_error(self): """Test experiment status when post processing failed.""" @@ -584,7 +591,8 @@ def _post_processing(*args, **kwargs): exp_data = DbExperimentData(experiment_type="qiskit_test") exp_data.add_data(job) with self.assertLogs(logger="qiskit_experiments.database_service", level="WARN") as cm: - exp_data.add_data(job, _post_processing) + exp_data.add_data(job) + exp_data.add_analysis_callback(_post_processing) exp_data.block_for_results() self.assertEqual("ERROR", exp_data.status()) self.assertIn("Kaboom!", ",".join(cm.output)) @@ -595,7 +603,8 @@ def test_status_done(self): job.result.return_value = self._get_job_result(3) exp_data = DbExperimentData(experiment_type="qiskit_test") exp_data.add_data(job) - exp_data.add_data(job, lambda *args, **kwargs: time.sleep(1)) + exp_data.add_data(job) + exp_data.add_analysis_callback(lambda *args, **kwargs: time.sleep(1)) exp_data.block_for_results() self.assertEqual("DONE", exp_data.status()) @@ -610,7 +619,7 @@ def test_cancel_jobs(self): """Test canceling experiment jobs.""" def _job_result(): - event.wait() + event.wait(timeout=15) raise ValueError("Job was cancelled.") exp_data = DbExperimentData(experiment_type="qiskit_test") @@ -654,12 +663,13 @@ def _post_processing(*args, **kwargs): # pylint: disable=unused-argument exp_data = DbExperimentData(experiment_type="qiskit_test") with self.assertLogs(logger="qiskit_experiments.database_service", level="WARN") as cm: - exp_data.add_data(job1, _post_processing) + exp_data.add_data(job1) + exp_data.add_analysis_callback(_post_processing) exp_data.add_data(job2) exp_data.block_for_results() self.assertEqual("ERROR", exp_data.status()) - self.assertTrue(re.match(r".*1234.*Kaboom!.*5678", exp_data.errors(), re.DOTALL)) self.assertIn("Kaboom", ",".join(cm.output)) + self.assertTrue(re.match(r".*5678.*Kaboom!", exp_data.errors(), re.DOTALL)) def test_source(self): """Test getting experiment source.""" @@ -680,7 +690,8 @@ def _sleeper(*args, **kwargs): # pylint: disable=unused-argument job = mock.create_autospec(Job, instance=True) job.result = _sleeper exp_data = DbExperimentData(experiment_type="qiskit_test") - exp_data.add_data(job, _sleeper) + exp_data.add_data(job) + exp_data.add_analysis_callback(_sleeper) exp_data.block_for_results() self.assertEqual(2, sleep_count) @@ -714,11 +725,11 @@ def test_copy_metadata_pending_job(self): """Test copy metadata with a pending job.""" def _job1_result(): - event.wait() + event.wait(timeout=15) return job_results[0] def _job2_result(): - event.wait() + event.wait(timeout=15) return job_results[1] exp_data = DbExperimentData(experiment_type="qiskit_test")