# Imports

In [1]:
from functools import wraps

import numpy as np
import qcodes as qc

from typing import Callable, Dict, Generator

from labcore.measurement import *

from configuration import QMConfig
from qm.qua import *
from qm.QuantumMachinesManager import QuantumMachinesManager

from plottr.data import datadict_storage as dds, datadict as dd

# global module variable for the config file
global_config = None


# variable used to change the name of the saved DDH5
test_name_counter = 0


# Base Decorator

In [2]:
class BackgroundRecordingBase:
    """
    Base class decorator used to record asynchronous data from instrument.
    Use the decorator with create_background_sweep function to create Sweeps that collect asynchronous data from
    external devices running experiments independently of the measurement PC,
    e.i. the measuring happening is not being controlled by a Sweep but instead an external device (e.g. the OPX).
    Each instrument should have its own custom setup_wrapper (see setup_wrapper docstring for more info),
    and a custom collector.
    Auxiliary functions for the start_wrapper and collector should also be located in this class.

    :param *specs: A list of the DataSpecs to record the data produced.
    """

    def __init__(self, *specs):
        self.specs = specs
        self.communicator = {}

    def __call__(self, fun) -> Callable:
        """
        When the decorator is called the experiment function gets wrapped so that it returns an Sweep object composed
        of 2 different Sweeps, the setup sweep and the collector Sweep.
        """

        def sweep(**collector_kwargs) -> Sweep:
            """
            Returns a Sweep comprised of 2 different Sweeps: start_sweep and collector_sweep.
            start_sweep should perform any setup actions as well as starting the actual experiment. This sweep is only
            executed once. collector_sweep is iterated multiple time to collect all the data generated from the
            instrument.

            :param collector_kwargs: Any arguments that the collector needs.
            """

            start_sweep = once(self.start_wrapper(fun))
            collector_sweep = Sweep(record_as(self.collector(**collector_kwargs), *self.specs))
            return start_sweep + collector_sweep

        return sweep

    def start_wrapper(self, fun: Callable) -> Callable:
        """
        Wraps the start function. setup_wrapper should consist of another function inside of it decorated with @wraps
        with fun as its argument.
        In this case the wrapped function is setup.
        Setup should accept the *args and **kwargs of fun. It should also place any returns from fun in the communicator.
        setup_wrapper needs to return the wrapped function (setup)

        :param fun: The measurement function. In the case of the OPX this would be the function that returns the QUA
                    code with any arguments that it might use.
        """

        @wraps(fun)
        def start(*args, **kwargs) -> None:
            """
            Starts the experiment and saves anything that the collector needs from the startup of the measurement in the
            collector dictionary.

            :param args: Any args that fun needs.
            :param kwargs: Any kwargs that fun needs.
            """
            self.communicator['setup_return'] = fun(*args, **kwargs)
            return None

        return start

    def collector(self, **kwargs) -> Generator[Dict, None, None]:
        """
        Data collection generator. The generator should contain all the logic of waiting for the asynchronous data.
        Its should yield a dictionary with the name of the of the DataSpecs as keywords and numpy arrays with the values
        collected from the instrument. The generator should exhaust itself once all the data produced by the
        measurement has been generated

        :param kwargs: Any kwargs necessary for the specific implementation of the collector.
        """
        data = {}
        yield data


def create_background_sweep(decorated_measurement_function: Callable, **collector_kwargs) -> Sweep:
    """
    Creates the Sweep object from a measurement function decorated with any implementation of BackgroundRecordingBase.

    :param decorated_measurement_function: Measurement function decorated with
                                           a BackgroundRecordingBase class decorator.
    :param collector_kwargs: Any kwargs that the collector needs.
    """
    sweep = decorated_measurement_function(**collector_kwargs)
    return sweep


# Specific OPX Implementation

In [3]:
class RecordOPX(BackgroundRecordingBase):
    """
    Implementation of BackgroundRecordingBase for use with the OPX machine.
    """

    def start_wrapper(self, fun: Callable) -> Callable:
        """
        start_wrapper for the OPX machine. Wraps the startup function.
        Returns the actual startup function to be executed when the sweep is iterated through.

        :param fun: Function that returns the QUA program.
        """

        @wraps(fun)
        def startup(*args, **kwargs) -> None:
            """
            Establishes connection with the OPX and starts the the measurement. The config of the OPX is passed through
            the module variable global_config. It saves the result handles and saves initial values to the communicator
            dictionary.
            """
            # Start the measurement in the OPX.
            qmachine_mgr = QuantumMachinesManager()
            qmachine = qmachine_mgr.open_qm(global_config)
            job = qmachine.execute(fun(*args, **kwargs))
            result_handles = job.result_handles

            # Save the result handle and create initial parameters in the communicator used in the collector.
            self.communicator['result_handles'] = result_handles
            self.communicator['active'] = True
            self.communicator['counter'] = 0

        return startup

    def _wait_for_data(self, batchsize: int) -> None:
        """
        Waits for the opx to have measured more data points than the ones indicated in the batchsize. Also checks that
        the OPX is still collecting data, when the OPX is no longer processing, turn communicator['active'] to False to
        exhaust the collector.

        :param batchsize: Size of batch. How many data-points is the minimum for the sweep to get in an iteration.
                          e.g. if 5, _control_progress will keep running until at least 5 new data-points
                          are available for collection.
        """

        # When ready becomes True, the infinite loop stops.
        ready = False

        # Collect necessary values from communicator.
        res_handle = self.communicator['result_handles']
        counter = self.communicator['counter']

        while not ready:
            for name, handle in res_handle:
                current_datapoint = handle.count_so_far()

                # Check if the OPX is still processing.
                if res_handle.is_processing():
                    # Check if enough data-points are available.
                    if current_datapoint - counter >= batchsize:
                        ready = True
                    else:
                        ready = False
                else:
                    # Once the OPX is done processing turn ready True and turn active False to exhaust the generator.
                    ready = True
                    self.communicator['active'] = False

    def collector(self, batchsize: int) -> Generator[Dict, None, None]:
        """
        Implementation of collector for the OPX. Collects new data-points from the OPX and yields them in a dictionary
        with the names of the recorded variables as keywords and numpy arrays with the values. Raises ValueError if a
        stream name inside the QUA program has a different name than a recorded variable and if the amount of recorded
        variables and streams are different.

        :param batchsize: Size of batch. How many data-points is the minimum for the sweep to get in an iteration.
                          e.g. if 5, _control_progress will keep running until at least 5 new data-points
                          are available for collection.
        """

        # Get the result_handles from the communicator.
        result_handle = self.communicator['result_handles']

        # Get the names of all variables from the specs.
        data_specs_names = [x.name for x in self.specs]
        variable_counter = 0
        for name, handle in result_handle:

            # Check that the stream names are present in the DataSpecs.
            if name not in data_specs_names:
                raise ValueError(f'{name} is not a recorded variable')
            else:
                variable_counter += 1

        # Check that the number of recorded variables and streams are the same.
        if variable_counter != len(data_specs_names):
            raise ValueError(f'Number of recorded variables ({variable_counter}) \
                             does not match number of variables gathered from the OPX ({len(data_specs_names)})')

        while self.communicator['active']:
            # Restart values for each iteration.
            data = {}
            counter = self.communicator['counter']  # Previous iteration data-point number.
            first = True
            current = 0

            # Make sure that the result_handle is active.
            if result_handle is None:
                yield None

            # Waits until new data-points are ready to be gathered.
            self._wait_for_data(batchsize)

            for name, handle in result_handle:

                # To ensure that we get the same number of data-points from every variable only get the current count
                # for the first variable in the stream.
                if first:
                    current = handle.count_so_far()  # Current data-point number
                    first = False

                    # if the current data-point number is the same as the previous data-point number, no new data
                    # has been gathered.
                    if current == counter:
                        yield None

                # Make sure that the OPX has actually measured the current value for all variables and fetch the
                # new data lines.
                handle.wait_for_values(current)
                data_temp = np.array(handle.fetch(slice(counter, current)))

                # If the trace is a raw measurement, we need to go through its shape to properly convert it
                # TODO: add the extra variable dependence
                #  (add a special kind of variable called OPXRaw to indicate that it needs to be added.
                if name[0:4] == 'raw_':
                    holding_converting = []
                    for i in data_temp:
                        i_holder = []
                        for j in i:
                            converted = j.astype(float)
                            i_holder.append(converted)
                        holding_converting.append(i_holder)
                    if len(holding_converting) == 1:
                        converted_data_temp = [np.squeeze(holding_converting)]
                    else:
                        converted_data_temp = np.squeeze(holding_converting)
                else:
                    # data comes from the OPX as numpy.void. Converts array to contain floats instead.
                    converted_data_temp = data_temp.astype(float)
                data[name] = converted_data_temp
            self.communicator['counter'] = current
            yield data

# Proposal for Base Running and Saving

In [4]:
def _create_datadict_structure(sweep: Sweep) -> dd.DataDict:
    """
    Returns a structured DataDict from the DataSpecs of a Sweep.

    :param sweep: Sweep object from which the DataDict is created.
    """

    data_specs = sweep.get_data_specs()
    data_dict = dd.DataDict()
    for spec in data_specs:

        depends_on = spec.depends_on
        unit = spec.unit
        name = spec.name

        # Checks which fields have information and which ones are None.
        if depends_on is None:
            if unit is None:
                data_dict[name] = dict()
            else:
                data_dict[name] = dict(unit=unit)
        else:
            if unit is None:
                data_dict[name] = dict(axes=depends_on)
            else:
                data_dict[name] = dict(axes=depends_on, unit=unit)

    data_dict.validate()

    return data_dict


def _check_none(line: Dict) -> bool:
    """
    Checks if the values in a Dict are all None. Returns True if all values are None, False otherwise.
    """
    for arg in line.keys():
        if line[arg] is not None:
            return False
    return True


def run_and_save_sweep(sweep: Sweep, data_dir: str, name: str, prt: bool = False) -> None:
    """
    Iterates through a sweep, saving the data coming through it into a file called <name> at <data_dir> directory.

    :param sweep: Sweep object to iterate through.
    :param data_dir: Directory of file location
    :param name: name of the file
    :param prt: Bool, if True, the function will print every result coming from the sweep. Default, False.
    """
    data_dict = _create_datadict_structure(sweep)
    with dds.DDH5Writer(data_dir, data_dict, name=name) as writer:
        for line in sweep:
            if not _check_none(line):
                if prt:
                    print(line)

                writer.add_data(**line)

    print('The measurement has finished and all of the data has been saved.')

# QUA experiment with implemented decorator

In [5]:
@RecordOPX(
        independent('repetition', type='array'),
        dependent('V', depends_on=['repetition'], type='array'),
        dependent('tracker', depends_on=['repetition'], type='array'),
        dependent('raw_values', depends_on=['repetition'], type='array'))
def my_qua_experiment(n_reps=1000):
    with program() as qua_measurement:
        raw_stream = declare_stream(adc_trace=True)
        v_stream = declare_stream()
        tracker_stream = declare_stream()
        i_stream = declare_stream()

        i = declare(int)
        v = declare(fixed)
        tracker = declare(int, value=0)

        with for_(i, 0, i<n_reps, i+1):
            save(i, i_stream)
            
            measure('box', 'readout', raw_stream, ("box_sin",v))
            save(v, v_stream)

            assign(tracker, tracker+2)
            save(tracker, tracker_stream)

            play('box', "readout")
            wait(1000000)

        with stream_processing():
            i_stream.save_all('repetition')
            v_stream.save_all('V')
            tracker_stream.save_all('tracker')
            raw_stream.input1().timestamps().save_all('raw_values')

    return qua_measurement

In [6]:
DATADIR = './data/'
config = QMConfig()
global_config = config.config()

sweep = create_background_sweep(my_qua_experiment, batchsize=5)

In [7]:
sweep.set_action_opts(
    my_qua_experiment=dict(n_reps=100)
)

In [8]:
run_and_save_sweep(sweep, DATADIR, f'OPX test standard #{test_name_counter}', prt=True)

Data location:  ./data/2021-07-30\2021-07-30_0010_OPX test standard #0\2021-07-30_0010_OPX test standard #0.ddh5
2021-07-30 20:16:21,413 - qm - INFO - Performing health check
2021-07-30 20:16:21,415 - qm - INFO - Health check passed
2021-07-30 20:16:21,729 - qm - INFO - Flags: 
2021-07-30 20:16:21,730 - qm - INFO - Executing high level program
{'repetition': array([ 0.,  1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.,  9., 10., 11., 12.]), 'V': array([-0.19561355, -0.19559618, -0.19558213, -0.19560778, -0.19556068,
       -0.19560808, -0.19555848, -0.19557359, -0.19554396, -0.19557497,
       -0.19551774, -0.19554113, -0.19553179]), 'tracker': array([ 2.,  4.,  6.,  8., 10., 12., 14., 16., 18., 20., 22., 24., 26.]), 'raw_values': array([[4.0000000e+02, 4.0100000e+02, 4.0200000e+02, ..., 1.0397000e+04,
        1.0398000e+04, 1.0399000e+04],
       [4.0204000e+06, 4.0204010e+06, 4.0204020e+06, ..., 4.0303970e+06,
        4.0303980e+06, 4.0303990e+06],
       [8.0404000e+06, 8.0404010e+06, 8.04040

# Notes

* Right now, to indicate that a variable is saving the raw trace, the variable needs to start with 'raw_'.
* The run_and_save function should work with any sweep, so that could be added to the general labcore package.
* If a measurement fails to be executed, the writter still creates a file but the file is corrupted and plottr jsut raises a warning.
* I'll start working on properly documenting the code tomorrow

