From 47f9f2d0835803dfae246ff188087dff8caae3b9 Mon Sep 17 00:00:00 2001 From: Benjamin Papajewski Date: Mon, 8 Mar 2021 17:01:10 +0100 Subject: [PATCH 1/2] New awg abstraction based on features + implementation for tabor + tests --- .gitignore | 3 + qupulse/_program/tabor.py | 5 +- qupulse/hardware/feature_awg/__init__.py | 0 qupulse/hardware/feature_awg/base.py | 187 +++ qupulse/hardware/feature_awg/base_features.py | 98 ++ .../feature_awg/channel_tuple_wrapper.py | 55 + qupulse/hardware/feature_awg/features.py | 265 ++++ qupulse/hardware/feature_awg/tabor.py | 1384 +++++++++++++++++ tests/hardware/feature_awg/__init__.py | 0 .../feature_awg/awg_new_driver_base_tests.py | 283 ++++ .../tabor_new_driver_clock_tests.py | 135 ++ .../tabor_new_driver_dummy_based_tests.py | 809 ++++++++++ .../feature_awg/tabor_new_driver_exex_test.py | 136 ++ .../tabor_new_driver_simulator_based_tests.py | 283 ++++ .../feature_awg/tabor_new_driver_tests.py | 43 + 15 files changed, 3684 insertions(+), 2 deletions(-) create mode 100644 qupulse/hardware/feature_awg/__init__.py create mode 100644 qupulse/hardware/feature_awg/base.py create mode 100644 qupulse/hardware/feature_awg/base_features.py create mode 100644 qupulse/hardware/feature_awg/channel_tuple_wrapper.py create mode 100644 qupulse/hardware/feature_awg/features.py create mode 100644 qupulse/hardware/feature_awg/tabor.py create mode 100644 tests/hardware/feature_awg/__init__.py create mode 100644 tests/hardware/feature_awg/awg_new_driver_base_tests.py create mode 100644 tests/hardware/feature_awg/tabor_new_driver_clock_tests.py create mode 100644 tests/hardware/feature_awg/tabor_new_driver_dummy_based_tests.py create mode 100644 tests/hardware/feature_awg/tabor_new_driver_exex_test.py create mode 100644 tests/hardware/feature_awg/tabor_new_driver_simulator_based_tests.py create mode 100644 tests/hardware/feature_awg/tabor_new_driver_tests.py diff --git a/.gitignore b/.gitignore index aef5c6c97..7124aaa3e 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,6 @@ doc/source/examples/.ipynb_checkpoints/* *.orig MATLAB/+qc/personalPaths.mat /doc/source/_autosummary/* +.idea/ +.mypy_cache/* +tests/hardware/WX2184C.exe diff --git a/qupulse/_program/tabor.py b/qupulse/_program/tabor.py index 96359b34c..64002d969 100644 --- a/qupulse/_program/tabor.py +++ b/qupulse/_program/tabor.py @@ -11,7 +11,6 @@ from qupulse.utils.types import ChannelID, TimeType from qupulse.hardware.awgs.base import ProgramEntry from qupulse.hardware.util import get_sample_times, voltage_to_uint16 -from qupulse.pulses.parameters import Parameter from qupulse._program.waveforms import Waveform from qupulse._program._loop import Loop from qupulse._program.volatile import VolatileRepetitionCount, VolatileProperty @@ -389,7 +388,8 @@ def __init__(self, offsets: Tuple[float, float], voltage_transformations: Tuple[Optional[callable], Optional[callable]], sample_rate: TimeType, - mode: TaborSequencing = None + mode: TaborSequencing = None, + repetition_mode: str = "infinite", ): if len(channels) != device_properties['chan_per_part']: raise TaborException('TaborProgram only supports {} channels'.format(device_properties['chan_per_part'])) @@ -420,6 +420,7 @@ def __init__(self, self._parsed_program = None # type: Optional[ParsedProgram] self._mode = None self._device_properties = device_properties + self._repetition_mode = repetition_mode assert mode in (TaborSequencing.ADVANCED, TaborSequencing.SINGLE), "Invalid mode" if mode == TaborSequencing.SINGLE: diff --git a/qupulse/hardware/feature_awg/__init__.py b/qupulse/hardware/feature_awg/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/qupulse/hardware/feature_awg/base.py b/qupulse/hardware/feature_awg/base.py new file mode 100644 index 000000000..259442345 --- /dev/null +++ b/qupulse/hardware/feature_awg/base.py @@ -0,0 +1,187 @@ +from abc import ABC, abstractmethod +from typing import Optional +import weakref + +from qupulse.hardware.feature_awg import channel_tuple_wrapper +from qupulse.hardware.feature_awg.base_features import Feature, FeatureAble +from qupulse.utils.types import Collection + +__all__ = ["AWGDevice", "AWGChannelTuple", "AWGChannel", "AWGMarkerChannel", "AWGDeviceFeature", "AWGChannelFeature", + "AWGChannelTupleFeature"] + + +class AWGDeviceFeature(Feature): + """Base class for features that are used for `AWGDevice`s""" + def __init__(self): + super().__init__(AWGDevice) + + +class AWGChannelFeature(Feature): + """Base class for features that are used for `AWGChannel`s""" + def __init__(self): + super().__init__(_BaseAWGChannel) + + +class AWGChannelTupleFeature(Feature): + """Base class for features that are used for `AWGChannelTuple`s""" + def __init__(self, channel_tuple: 'AWGChannelTuple'): + super().__init__(AWGChannelTuple) + self._channel_tuple = weakref.proxy(channel_tuple) + + +class AWGDevice(FeatureAble[AWGDeviceFeature], ABC): + """Base class for all drivers of all arbitrary waveform generators""" + + def __init__(self, name: str): + """ + Args: + name: The name of the device as a String + """ + super().__init__() + self._name = name + + @property + def name(self) -> str: + """Returns the name of a Device as a String""" + return self._name + + @abstractmethod + def cleanup(self) -> None: + """Function for cleaning up the dependencies of the device""" + raise NotImplementedError() + + @property + @abstractmethod + def channels(self) -> Collection["AWGChannel"]: + """Returns a list of all channels of a Device""" + raise NotImplementedError() + + @property + @abstractmethod + def marker_channels(self) -> Collection["AWGMarkerChannel"]: + """Returns a list of all marker channels of a device. The collection may be empty""" + raise NotImplementedError() + + @property + @abstractmethod + def channel_tuples(self) -> Collection["AWGChannelTuple"]: + """Returns a list of all channel tuples of a list""" + raise NotImplementedError() + + +class AWGChannelTuple(FeatureAble[AWGChannelTupleFeature], ABC): + """Base class for all groups of synchronized channels of an AWG""" + + def __init__(self, idn: int): + """ + Args: + idn: The identification number of a channel tuple + """ + super().__init__() + + self._idn = idn + + @property + @abstractmethod + def channel_tuple_adapter(self) -> channel_tuple_wrapper: + pass + + @property + def idn(self) -> int: + """Returns the identification number of a channel tuple""" + return self._idn + + @property + def name(self) -> str: + """Returns the name of a channel tuple""" + return "{dev}_CT{idn}".format(dev=self.device.name, idn=self.idn) + + @property + @abstractmethod + def sample_rate(self) -> float: + """Returns the sample rate of a channel tuple as a float""" + raise NotImplementedError() + + @property + @abstractmethod + def device(self) -> AWGDevice: + """Returns the device which the channel tuple belong to""" + raise NotImplementedError() + + @property + @abstractmethod + def channels(self) -> Collection["AWGChannel"]: + """Returns a list of all channels of the channel tuple""" + raise NotImplementedError() + + @property + @abstractmethod + def marker_channels(self) -> Collection["AWGMarkerChannel"]: + """Returns a list of all marker channels of the channel tuple. The collection may be empty""" + raise NotImplementedError() + + +class _BaseAWGChannel(FeatureAble[AWGChannelFeature], ABC): + """Base class for a single channel of an AWG""" + + def __init__(self, idn: int): + """ + Args: + idn: The identification number of a channel + """ + super().__init__() + self._idn = idn + + @property + def idn(self) -> int: + """Returns the identification number of a channel""" + return self._idn + + @property + @abstractmethod + def device(self) -> AWGDevice: + """Returns the device which the channel belongs to""" + raise NotImplementedError() + + @property + @abstractmethod + def channel_tuple(self) -> Optional[AWGChannelTuple]: + """Returns the channel tuple which a channel belongs to""" + raise NotImplementedError() + + @abstractmethod + def _set_channel_tuple(self, channel_tuple) -> None: + """ + Sets the channel tuple which a channel belongs to + + Args: + channel_tuple: reference to the channel tuple + """ + raise NotImplementedError() + + +class AWGChannel(_BaseAWGChannel, ABC): + """Base class for a single channel of an AWG""" + @property + def name(self) -> str: + """Returns the name of a channel""" + return "{dev}_C{idn}".format(dev=self.device.name, idn=self.idn) + + +class AWGMarkerChannel(_BaseAWGChannel, ABC): + """Base class for a single marker channel of an AWG""" + @property + def name(self) -> str: + """Returns the name of a marker channel""" + return "{dev}_M{idn}".format(dev=self.device.name, idn=self.idn) + + +class ProgramOverwriteException(RuntimeError): + + def __init__(self, name) -> None: + super().__init__() + self.name = name + + def __str__(self) -> str: + return "A program with the given name '{}' is already present on the device." \ + " Use force to overwrite.".format(self.name) diff --git a/qupulse/hardware/feature_awg/base_features.py b/qupulse/hardware/feature_awg/base_features.py new file mode 100644 index 000000000..754504ff3 --- /dev/null +++ b/qupulse/hardware/feature_awg/base_features.py @@ -0,0 +1,98 @@ +from types import MappingProxyType +from typing import Callable, Generic, Mapping, Optional, Type, TypeVar +from abc import ABC + +__all__ = ["Feature", "FeatureAble"] + + +class Feature: + """ + Base class for features of `FeatureAble`s. + """ + def __init__(self, target_type: Type["FeatureAble"]): + self._target_type = target_type + + @property + def target_type(self) -> Type["FeatureAble"]: + return self._target_type + + +FeatureType = TypeVar("FeatureType", bound=Feature) +GetItemFeatureType = TypeVar("GetItemFeatureType", bound=Feature) + + +class FeatureAble(Generic[FeatureType]): + """ + Base class for all types that are able to handle features. The features are saved in a dictionary and the methods + can be called with the __getitem__-operator. + """ + + def __init__(self): + super().__init__() + self._features = {} + + def __getitem__(self, feature_type: Type[GetItemFeatureType]) -> GetItemFeatureType: + if isinstance(feature_type, str): + return self._features[feature_type] + if not isinstance(feature_type, type): + raise TypeError("Expected type-object as key, got \"{ftt}\" instead".format( + ftt=type(feature_type).__name__)) + key_type = _get_base_feature_type(feature_type) + if key_type is None: + raise TypeError("Unexpected type of feature: {ft}".format(ft=feature_type.__name__)) + if key_type not in self._features: + raise KeyError("Could not get feature for type: {ft}".format(ft=feature_type.__name__)) + return self._features[key_type] + + def add_feature(self, feature: FeatureType) -> None: + """ + The method adds the feature to a Dictionary with all features + + Args: + feature: A certain feature which functions should be added to the dictionary _features + """ + feature_type = _get_base_feature_type(type(feature)) + if feature_type is None: + raise TypeError("Unexpected type of feature: {ft}".format(ft=type(feature).__name__)) + if not isinstance(self, feature.target_type): + raise TypeError("Features with type \"{ft}\" belong to \"{tt}\"-objects".format( + ft=type(feature).__name__, tt=feature.target_type.__name__)) + if feature_type in self._features: + raise KeyError(self, "Feature with type \"{ft}\" already exists".format(ft=feature_type.__name__)) + self._features[feature_type] = feature + # Also adding the feature with the string as the key. With this you can you the name as a string for __getitem__ + self._features[feature_type.__name__] = feature + + @property + def features(self) -> Mapping[FeatureType, Callable]: + """Returns the dictionary with all features of a FeatureAble""" + return MappingProxyType(self._features) + + +def _get_base_feature_type(feature_type: Type[Feature]) -> Type[Optional[Feature]]: + """ + This function searches for the second inheritance level under `Feature` (i.e. level under `AWGDeviceFeature`, + `AWGChannelFeature` or `AWGChannelTupleFeature`). This is done to ensure, that nobody adds the same feature + twice, but with a type of a different inheritance level as key. + + Args: + feature_type: Type of the feature + + Returns: + Base type of the feature_type, two inheritance levels under `Feature` + """ + if not issubclass(feature_type, Feature): + return type(None) + + # Search for base class on the inheritance line of Feature + for base in feature_type.__bases__: + if issubclass(base, Feature): + result_type = base + break + else: + return type(None) + + if Feature in result_type.__bases__: + return feature_type + else: + return _get_base_feature_type(result_type) diff --git a/qupulse/hardware/feature_awg/channel_tuple_wrapper.py b/qupulse/hardware/feature_awg/channel_tuple_wrapper.py new file mode 100644 index 000000000..ad8d6597c --- /dev/null +++ b/qupulse/hardware/feature_awg/channel_tuple_wrapper.py @@ -0,0 +1,55 @@ +from typing import Tuple, Optional, Callable, Set + +from qupulse._program._loop import Loop +from qupulse.hardware.feature_awg.base import AWGChannelTuple +from qupulse.hardware.awgs.base import AWG + + +class ChannelTupleAdapter(AWG): + """ + This class serves as an adapter between the old Class AWG and the new driver abstraction. It routes all the methods + the AWG class to the corresponding methods of the new driver. + """ + def __copy__(self) -> None: + pass + + def __init__(self, channel_tuple: AWGChannelTuple): + self._channel_tuple = channel_tuple + + def identifier(self) -> str: + return self._channel_tuple.name + + def num_channels(self) -> int: + return self._channel_tuple.num_channels + + def num_markers(self) -> int: + return self._channel_tuple.num_markers + + def upload(self, name: str, + program: Loop, + channels: Tuple[Optional["ChannelID"], ...], + markers: Tuple[Optional["ChannelID"], ...], + voltage_transformation: Tuple[Optional[Callable], ...], + force: bool = False) -> None: + from qupulse.hardware.feature_awg.tabor import ProgramManagement + return self._channel_tuple[ProgramManagement].upload(name, program, channels, markers, + voltage_transformation, force) + + def remove(self, name: str) -> None: + from qupulse.hardware.feature_awg.tabor import ProgramManagement + return self._channel_tuple[ProgramManagement].remove(name) + + def clear(self) -> None: + from qupulse.hardware.feature_awg.tabor import ProgramManagement + return self._channel_tuple[ProgramManagement].clear() + + def arm(self, name: Optional[str]) -> None: + from qupulse.hardware.feature_awg.tabor import ProgramManagement + return self._channel_tuple[ProgramManagement].arm(name) + + def programs(self) -> Set[str]: + from qupulse.hardware.feature_awg.tabor import ProgramManagement + return self._channel_tuple[ProgramManagement].programs + + def sample_rate(self) -> float: + return self._channel_tuple.sample_rate diff --git a/qupulse/hardware/feature_awg/features.py b/qupulse/hardware/feature_awg/features.py new file mode 100644 index 000000000..5a88fba40 --- /dev/null +++ b/qupulse/hardware/feature_awg/features.py @@ -0,0 +1,265 @@ +from abc import ABC, abstractmethod +from typing import Callable, Optional, Set, Tuple, Dict, Union, Any, Mapping +from numbers import Real +from enum import Enum + +from qupulse._program._loop import Loop +from qupulse.hardware.feature_awg.base import AWGDeviceFeature, AWGChannelFeature, AWGChannelTupleFeature,\ + AWGChannelTuple +from qupulse.utils.types import ChannelID + +import pyvisa + + +######################################################################################################################## +# device features +######################################################################################################################## +class SCPI(AWGDeviceFeature, ABC): + """Represents the ability to communicate via SCPI. + + https://en.wikipedia.org/wiki/Standard_Commands_for_Programmable_Instruments + """ + + def __init__(self, visa: pyvisa.resources.MessageBasedResource): + super().__init__() + self._socket = visa + + def send_cmd(self, cmd_str): + self._socket.write(cmd_str) + + def send_query(self, query_str): + self._socket.query(query_str) + + def close(self): + self._socket.close() + + +class ChannelSynchronization(AWGDeviceFeature, ABC): + """This Feature is used to synchronise a certain ammount of channels""" + + @abstractmethod + def synchronize_channels(self, group_size: int) -> None: + """ + Synchronize in groups of `group_size` channels. Groups of synchronized channels will be provided as + AWGChannelTuples. + + Args: + group_size: Number of channels per channel tuple + """ + raise NotImplementedError() + + +class DeviceControl(AWGDeviceFeature, ABC): + """This feature is used for basic communication with a AWG""" + + @abstractmethod + def reset(self) -> None: + """ + Resetting the whole device. A command for resetting is send to the Device, the device is initialized again and + all channel tuples are cleared. + """ + raise NotImplementedError() + + @abstractmethod + def trigger(self) -> None: + """ + This method triggers a device remotely. + """ + raise NotImplementedError() + + +class StatusTable(AWGDeviceFeature, ABC): + @abstractmethod + def get_status_table(self) -> Dict[str, Union[str, float, int]]: + """ + Send a lot of queries to the AWG about its settings. A good way to visualize is using pandas.DataFrame + + Returns: + An ordered dictionary with the results + """ + raise NotImplementedError() + + +######################################################################################################################## +# channel tuple features +######################################################################################################################## + +class ReadProgram(AWGChannelTupleFeature, ABC): + """Read the currently armed and uploaded program from the device. The returned object is highly device specific.""" + + @abstractmethod + def read_complete_program(self) -> Any: + raise NotImplementedError() + + +class VolatileParameters(AWGChannelTupleFeature, ABC): + """Ability to set the values of parameters which were marked as volatile on program creation.""" + @abstractmethod + def set_volatile_parameters(self, program_name: str, parameters: Mapping[str, Real]) -> None: + """Set the values of parameters which were marked as volatile on program creation.""" + raise NotImplementedError() + + +class RepetitionMode(Enum): + """Some devices support playing a program indefinitely or only once.""" + # Arm once, trigger once -> infinite repetitions + INFINITE = "infinite" + # Arm once, trigger N times -> N playbacks + AUTO_REARM = "auto_rearm" + # Arm once, trigger N times -> 1 playback + ONCE = "once" + + +class ProgramManagement(AWGChannelTupleFeature, ABC): + def __init__(self, channel_tuple: 'AWGChannelTuple'): + super().__init__(channel_tuple=channel_tuple) + self._default_repetition_mode = RepetitionMode.ONCE + + @abstractmethod + def upload(self, name: str, + program: Loop, + channels: Tuple[Optional[ChannelID], ...], + marker_channels: Tuple[Optional[ChannelID], ...], + voltage_transformation: Tuple[Optional[Callable], ...], + repetition_mode: Union[RepetitionMode, str] = None, + force: bool = False) -> None: + """ + Upload a program to the AWG. + + Physically uploads all waveforms required by the program - excluding those already present - + to the device and sets up playback sequences accordingly. + This method should be cheap for program already on the device and can therefore be used + for syncing. Programs that are uploaded should be fast(~1 sec) to arm. + + Raises: + ValueError: if one of channels, marker_channels, voltage_transformation or repetition_mode is invalid + + Args: + name: A name for the program on the AWG. + program: The program (a sequence of instructions) to upload. + channels: Tuple of length num_channels that ChannelIDs of in the program to use. Position in the list corresponds to the AWG channel + marker_channels: List of channels in the program to use. Position in the List in the list corresponds to the AWG channel + voltage_transformation: transformations applied to the waveforms extracted rom the program. Position + in the list corresponds to the AWG channel + repetition_mode: how often the program should be played + force: If a different sequence is already present with the same name, it is + overwritten if force is set to True. (default = False) + """ + raise NotImplementedError() + + @abstractmethod + def remove(self, name: str) -> None: + """ + Remove a program from the AWG. + + Also discards all waveforms referenced only by the program identified by name. + + Args: + name: The name of the program to remove. + """ + raise NotImplementedError() + + @abstractmethod + def clear(self) -> None: + """ + Removes all programs and waveforms from the AWG. + + Caution: This affects all programs and waveforms on the AWG, not only those uploaded using qupulse! + """ + raise NotImplementedError() + + @abstractmethod + def arm(self, name: Optional[str]) -> None: + """ + Load the program 'name' and arm the device for running it. If name is None the awg will "dearm" its current + program. + """ + raise NotImplementedError() + + @property + @abstractmethod + def programs(self) -> Set[str]: + """The set of program names that can currently be executed on the hardware AWG.""" + raise NotImplementedError() + + @abstractmethod + def run_current_program(self) -> None: + """This method starts running the active program""" + raise NotImplementedError() + + @property + @abstractmethod + def supported_repetition_modes(self) -> Set[RepetitionMode]: + """Return set of supported repetition modes in the current configuration.""" + raise NotImplementedError() + + @property + def default_repetition_mode(self) -> RepetitionMode: + return self._default_repetition_mode + + @default_repetition_mode.setter + def default_repetition_mode(self, repetition_mode: RepetitionMode): + repetition_mode = RepetitionMode(repetition_mode) + if repetition_mode not in self.supported_repetition_modes: + raise ValueError(f"The repetition mode {repetition_mode} is not supported by {self._channel_tuple}") + self._default_repetition_mode = repetition_mode + + +######################################################################################################################## +# channel features +######################################################################################################################## + +class AmplitudeOffsetHandling(Enum): + IGNORE_OFFSET = 'ignore_offset' # Offset is ignored. + CONSIDER_OFFSET = 'consider_offset' # Offset is discounted from the waveforms. + + +class VoltageRange(AWGChannelFeature): + @property + @abstractmethod + def offset(self) -> float: + """Get offset of AWG channel""" + raise NotImplementedError() + + @property + @abstractmethod + def amplitude(self) -> float: + """Get amplitude of AWG channel""" + raise NotImplementedError() + + @property + @abstractmethod + def amplitude_offset_handling(self) -> AmplitudeOffsetHandling: + """ + Gets the amplitude and offset handling of this channel. The amplitude-offset controls if the amplitude and + offset settings are constant or if these should be optimized by the driver + """ + raise NotImplementedError() + + @amplitude_offset_handling.setter + @abstractmethod + def amplitude_offset_handling(self, amp_offs_handling: Union[str, AmplitudeOffsetHandling]) -> None: + """ + amp_offs_handling: See possible values at `AWGAmplitudeOffsetHandling` + """ + raise NotImplementedError() + + +class ActivatableChannels(AWGChannelFeature): + @property + @abstractmethod + def enabled(self) -> bool: + """ + Returns the the state a channel has at the moment. A channel is either activated or deactivated + """ + raise NotImplementedError() + + @abstractmethod + def enable(self): + """Enables the output of a certain channel""" + raise NotImplementedError() + + @abstractmethod + def disable(self): + """Disables the output of a certain channel""" + raise NotImplementedError() diff --git a/qupulse/hardware/feature_awg/tabor.py b/qupulse/hardware/feature_awg/tabor.py new file mode 100644 index 000000000..09c997218 --- /dev/null +++ b/qupulse/hardware/feature_awg/tabor.py @@ -0,0 +1,1384 @@ +import functools +import logging +import numbers +import sys +import weakref +from typing import List, Tuple, Set, Callable, Optional, Any, cast, Union, Dict, Mapping, NamedTuple, Iterable +from collections import OrderedDict +import numpy as np +from qupulse import ChannelID +from qupulse._program._loop import Loop, make_compatible + +from qupulse.hardware.feature_awg.channel_tuple_wrapper import ChannelTupleAdapter +from qupulse.hardware.feature_awg.features import ChannelSynchronization, AmplitudeOffsetHandling, VoltageRange, \ + ProgramManagement, ActivatableChannels, DeviceControl, StatusTable, SCPI, VolatileParameters, \ + ReadProgram, RepetitionMode +from qupulse.hardware.util import voltage_to_uint16, find_positions + +from qupulse.utils.types import Collection, TimeType +from qupulse.hardware.feature_awg.base import AWGChannelTuple, AWGChannel, AWGDevice, AWGMarkerChannel +from typing import Sequence +from qupulse._program.tabor import TaborSegment, TaborException, TaborProgram, PlottableProgram, TaborSequencing, \ + make_combined_wave +import pyvisa +import warnings + +# Provided by Tabor electronics for python 2.7 +# a python 3 version is in a private repository on https://git.rwth-aachen.de/qutech +# Beware of the string encoding change! +import teawg + +assert (sys.byteorder == "little") + +__all__ = ["TaborDevice", "TaborChannelTuple", "TaborChannel"] + +TaborProgramMemory = NamedTuple("TaborProgramMemory", [("waveform_to_segment", np.ndarray), + ("program", TaborProgram)]) + + +def with_configuration_guard(function_object: Callable[["TaborChannelTuple", Any], Any]) -> Callable[ + ["TaborChannelTuple"], Any]: + """This decorator assures that the AWG is in configuration mode while the decorated method runs.""" + + @functools.wraps(function_object) + def guarding_method(channel_pair: "TaborChannelTuple", *args, **kwargs) -> Any: + + if channel_pair._configuration_guard_count == 0: + channel_pair._enter_config_mode() + channel_pair._configuration_guard_count += 1 + + try: + return function_object(channel_pair, *args, **kwargs) + finally: + channel_pair._configuration_guard_count -= 1 + if channel_pair._configuration_guard_count == 0: + channel_pair._exit_config_mode() + + return guarding_method + + +def with_select(function_object: Callable[["TaborChannelTuple", Any], Any]) -> Callable[["TaborChannelTuple"], Any]: + """Asserts the channel pair is selcted when the wrapped function is called""" + + @functools.wraps(function_object) + def selector(channel_tuple: "TaborChannelTuple", *args, **kwargs) -> Any: + channel_tuple._select() + return function_object(channel_tuple, *args, **kwargs) + + return selector + + +######################################################################################################################## +# Device +######################################################################################################################## +# Features +class TaborSCPI(SCPI): + def __init__(self, device: "TaborDevice", visa: pyvisa.resources.MessageBasedResource): + super().__init__(visa) + + self._parent = weakref.ref(device) + + def send_cmd(self, cmd_str, paranoia_level=None): + for instr in self._parent().all_devices: + instr.send_cmd(cmd_str=cmd_str, paranoia_level=paranoia_level) + + def send_query(self, query_str, query_mirrors=False) -> Any: + if query_mirrors: + return tuple(instr.send_query(query_str) for instr in self._parent().all_devices) + else: + return self._parent().main_instrument.send_query(query_str) + + def _send_cmd(self, cmd_str, paranoia_level=None) -> Any: + """Overwrite send_cmd for paranoia_level > 3""" + if paranoia_level is None: + paranoia_level = self._parent().paranoia_level + + if paranoia_level < 3: + self._parent().super().send_cmd(cmd_str=cmd_str, paranoia_level=paranoia_level) # pragma: no cover + else: + cmd_str = cmd_str.rstrip() + + if len(cmd_str) > 0: + ask_str = cmd_str + "; *OPC?; :SYST:ERR?" + else: + ask_str = "*OPC?; :SYST:ERR?" + + *answers, opc, error_code_msg = self._parent()._visa_inst.ask(ask_str).split(";") + + error_code, error_msg = error_code_msg.split(",") + error_code = int(error_code) + if error_code != 0: + _ = self._parent()._visa_inst.ask("*CLS; *OPC?") + + if error_code == -450: + # query queue overflow + self.send_cmd(cmd_str) + else: + raise RuntimeError("Cannot execute command: {}\n{}: {}".format(cmd_str, error_code, error_msg)) + + assert len(answers) == 0 + + +class TaborChannelSynchronization(ChannelSynchronization): + """This Feature is used to synchronise a certain ammount of channels""" + + def __init__(self, device: "TaborDevice"): + super().__init__() + self._parent = weakref.ref(device) + + def synchronize_channels(self, group_size: int) -> None: + """ + Synchronize in groups of `group_size` channels. Groups of synchronized channels will be provided as + AWGChannelTuples. The channel_size must be evenly dividable by the number of channels + + Args: + group_size: Number of channels per channel tuple + """ + if group_size == 2: + self._parent()._channel_tuples = [] + for i in range((int)(len(self._parent().channels) / group_size)): + self._parent()._channel_tuples.append( + TaborChannelTuple((i + 1), + self._parent(), + self._parent().channels[(i * group_size):((i * group_size) + group_size)], + self._parent().marker_channels[(i * group_size):((i * group_size) + group_size)]) + ) + self._parent()[SCPI].send_cmd(":INST:COUP:STAT OFF") + elif group_size == 4: + self._parent()._channel_tuples = [TaborChannelTuple(1, + self._parent(), + self._parent().channels, + self._parent().marker_channels)] + self._parent()[SCPI].send_cmd(":INST:COUP:STAT ON") + else: + raise TaborException("Invalid group size") + + +class TaborDeviceControl(DeviceControl): + """This feature is used for basic communication with a AWG""" + + def __init__(self, device: "TaborDevice"): + super().__init__() + self._parent = weakref.ref(device) + + def reset(self) -> None: + """ + Resetting the whole device. A command for resetting is send to the Device, the device is initialized again and + all channel tuples are cleared. + """ + self._parent()[SCPI].send_cmd(":RES") + self._parent()._coupled = None + + self._parent()._initialize() + for channel_tuple in self._parent().channel_tuples: + channel_tuple[TaborProgramManagement].clear() + + def trigger(self) -> None: + """ + This method triggers a device remotely. + """ + self._parent()[SCPI].send_cmd(":TRIG") + + +class TaborStatusTable(StatusTable): + def __init__(self, device: "TaborDevice"): + super().__init__() + self._parent = device + + def get_status_table(self) -> Dict[str, Union[str, float, int]]: + """ + Send a lot of queries to the AWG about its settings. A good way to visualize is using pandas.DataFrame + + Returns: + An ordered dictionary with the results + """ + name_query_type_list = [("channel", ":INST:SEL?", int), + ("coupling", ":OUTP:COUP?", str), + ("volt_dc", ":SOUR:VOLT:LEV:AMPL:DC?", float), + ("volt_hv", ":VOLT:HV?", float), + ("offset", ":VOLT:OFFS?", float), + ("outp", ":OUTP?", str), + ("mode", ":SOUR:FUNC:MODE?", str), + ("shape", ":SOUR:FUNC:SHAPE?", str), + ("dc_offset", ":SOUR:DC?", float), + ("freq_rast", ":FREQ:RAST?", float), + + ("gated", ":INIT:GATE?", str), + ("continuous", ":INIT:CONT?", str), + ("continuous_enable", ":INIT:CONT:ENAB?", str), + ("continuous_source", ":INIT:CONT:ENAB:SOUR?", str), + ("marker_source", ":SOUR:MARK:SOUR?", str), + ("seq_jump_event", ":SOUR:SEQ:JUMP:EVEN?", str), + ("seq_adv_mode", ":SOUR:SEQ:ADV?", str), + ("aseq_adv_mode", ":SOUR:ASEQ:ADV?", str), + + ("marker", ":SOUR:MARK:SEL?", int), + ("marker_high", ":MARK:VOLT:HIGH?", str), + ("marker_low", ":MARK:VOLT:LOW?", str), + ("marker_width", ":MARK:WIDT?", int), + ("marker_state", ":MARK:STAT?", str)] + + data = OrderedDict((name, []) for name, *_ in name_query_type_list) + for ch in (1, 2, 3, 4): + self._parent.channels[ch - 1]._select() + self._parent.marker_channels[(ch - 1) % 2]._select() + for name, query, dtype in name_query_type_list: + data[name].append(dtype(self._parent[SCPI].send_query(query))) + return data + + +# Implementation +class TaborDevice(AWGDevice): + def __init__(self, device_name: str, instr_addr=None, paranoia_level=1, external_trigger=False, reset=False, + mirror_addresses=()): + """ + Constructor for a Tabor device + + Args: + device_name (str): Name of the device + instr_addr: Instrument address that is forwarded to teawag + paranoia_level (int): Paranoia level that is forwarded to teawg + external_trigger (bool): Not supported yet + reset (bool): + mirror_addresses: list of devices on which the same things as on the main device are done. + For example you can a simulator and a real Device at once + """ + super().__init__(device_name) + self._instr = teawg.TEWXAwg(instr_addr, paranoia_level) + self._mirrors = tuple(teawg.TEWXAwg(address, paranoia_level) for address in mirror_addresses) + self._coupled = None + self._clock_marker = [0, 0, 0, 0] + + self.add_feature(TaborSCPI(self, self.main_instrument._visa_inst)) + self.add_feature(TaborDeviceControl(self)) + self.add_feature(TaborStatusTable(self)) + + if reset: + self[SCPI].send_cmd(":RES") + + # Channel + self._channels = [TaborChannel(i + 1, self) for i in range(4)] + + # MarkerChannels + self._marker_channels = [TaborMarkerChannel(i + 1, self) for i in range(4)] + + self._initialize() + + # ChannelTuple + self._channel_tuples = [] + + self.add_feature(TaborChannelSynchronization(self)) + self[TaborChannelSynchronization].synchronize_channels(2) + + if external_trigger: + raise NotImplementedError() # pragma: no cover + + def enable(self) -> None: + """ + This method immediately generates the selected output waveform, if the device is in continuous and armed + repetition mode. + """ + self[SCPI].send_cmd(":ENAB") + + def abort(self) -> None: + """ + With abort you can terminate the current generation of the output waveform. When the output waveform is + terminated the output starts generating an idle waveform. + """ + self[SCPI].send_cmd(":ABOR") + + def set_coupled(self, coupled: bool) -> None: + """ + Thats the coupling of the device to 'coupled' + """ + if coupled: + self[SCPI].send_cmd("INST:COUP:STAT ON") + else: + self[SCPI].send_cmd("INST:COUP:STAT OFF") + + def _is_coupled(self) -> bool: + """ + Returns true if the coupling of the device is 'coupled' otherwise false + """ + if self._coupled is None: + return self[SCPI].send_query(":INST:COUP:STAT?") == "ON" + else: + return self._coupled + + def cleanup(self) -> None: + for channel_tuple in self.channel_tuples: + channel_tuple.cleanup() + + @property + def channels(self) -> Collection["TaborChannel"]: + """Returns a list of all channels of a Device""" + return self._channels + + @property + def marker_channels(self) -> Collection["TaborMarkerChannel"]: + """Returns a list of all marker channels of a device. The collection may be empty""" + return self._marker_channels + + @property + def channel_tuples(self) -> Collection["TaborChannelTuple"]: + """Returns a list of all channel tuples of a list""" + return self._channel_tuples + + @property + def main_instrument(self) -> teawg.TEWXAwg: + return self._instr + + @property + def mirrored_instruments(self) -> Sequence[teawg.TEWXAwg]: + return self._mirrors + + @property + def all_devices(self) -> Sequence[teawg.TEWXAwg]: + return (self._instr,) + self._mirrors + + @property + def _paranoia_level(self) -> int: + return self._instr.paranoia_level + + @_paranoia_level.setter + def _paranoia_level(self, val): + for instr in self.all_devices: + instr.paranoia_level = val + + @property + def dev_properties(self) -> dict: + return self._instr.dev_properties + + def _send_binary_data(self, pref, bin_dat, paranoia_level=None): + for instr in self.all_devices: + instr.send_binary_data(pref, bin_dat=bin_dat, paranoia_level=paranoia_level) + + def _download_segment_lengths(self, seg_len_list, pref=":SEGM:DATA", paranoia_level=None): + for instr in self.all_devices: + instr.download_segment_lengths(seg_len_list, pref=pref, paranoia_level=paranoia_level) + + def _download_sequencer_table(self, seq_table, pref=":SEQ:DATA", paranoia_level=None): + for instr in self.all_devices: + instr.download_sequencer_table(seq_table, pref=pref, paranoia_level=paranoia_level) + + def _download_adv_seq_table(self, seq_table, pref=":ASEQ:DATA", paranoia_level=None): + for instr in self.all_devices: + instr.download_adv_seq_table(seq_table, pref=pref, paranoia_level=paranoia_level) + + make_combined_wave = staticmethod(teawg.TEWXAwg.make_combined_wave) + + def _initialize(self) -> None: + # 1. Select channel + # 2. Turn off gated mode + # 3. Turn on continous mode + # 4. Armed mode (only generate waveforms after enab command) + # 5. Expect enable signal from (USB / LAN / GPIB) + # 6. Use arbitrary waveforms as marker source + # 7. Expect jump command for sequencing from (USB / LAN / GPIB) + + setup_command = ( + ":INIT:GATE OFF; :INIT:CONT ON; " + ":INIT:CONT:ENAB ARM; :INIT:CONT:ENAB:SOUR BUS;" + ":SOUR:MARK:SOUR USER; :SOUR:SEQ:JUMP:EVEN BUS ") + self[SCPI].send_cmd(":INST:SEL 1") + self[SCPI].send_cmd(setup_command) + self[SCPI].send_cmd(":INST:SEL 3") + self[SCPI].send_cmd(setup_command) + + def _get_readable_device(self, simulator=True) -> teawg.TEWXAwg: + """ + A method to get the first readable device out of all devices. + A readable device is a device which you can read data from like a simulator. + + Returns: + The first readable device out of all devices + + Throws: + TaborException: this exception is thrown if there is no readable device in the list of all devices + """ + for device in self.all_devices: + if device.fw_ver >= 3.0: + if simulator: + if device.is_simulator: + return device + else: + return device + raise TaborException("No device capable of device data read") + + +######################################################################################################################## +# Channel +######################################################################################################################## +# Features +class TaborVoltageRange(VoltageRange): + def __init__(self, channel: "TaborChannel"): + super().__init__() + self._parent = weakref.ref(channel) + + @property + @with_select + def offset(self) -> float: + """Get offset of AWG channel""" + return float( + self._parent().device[SCPI].send_query(":VOLT:OFFS?".format(channel=self._parent().idn))) + + @property + @with_select + def amplitude(self) -> float: + """Get amplitude of AWG channel""" + coupling = self._parent().device[SCPI].send_query(":OUTP:COUP?") + if coupling == "DC": + return float(self._parent().device[SCPI].send_query(":VOLT?")) + elif coupling == "HV": + return float(self._parent().device[SCPI].send_query(":VOLT:HV?")) + else: + raise TaborException("Unknown coupling: {}".format(coupling)) + + @property + def amplitude_offset_handling(self) -> AmplitudeOffsetHandling: + """ + Gets the amplitude and offset handling of this channel. The amplitude-offset controls if the amplitude and + offset settings are constant or if these should be optimized by the driver + """ + return self._parent()._amplitude_offset_handling + + @amplitude_offset_handling.setter + def amplitude_offset_handling(self, amp_offs_handling: Union[AmplitudeOffsetHandling, str]) -> None: + """ + amp_offs_handling: See possible values at `AWGAmplitudeOffsetHandling` + """ + amp_offs_handling = AmplitudeOffsetHandling(AmplitudeOffsetHandling) + self._parent()._amplitude_offset_handling = amp_offs_handling + + def _select(self) -> None: + self._parent()._select() + + +class TaborActivatableChannels(ActivatableChannels): + def __init__(self, channel: "TaborChannel"): + super().__init__() + self._parent = weakref.ref(channel) + + @property + def enabled(self) -> bool: + """ + Returns the the state a channel has at the moment. A channel is either activated or deactivated + True stands for activated and false for deactivated + """ + return self._parent().device[SCPI].send_query(":OUTP ?") == "ON" + + @with_select + def enable(self): + """Enables the output of a certain channel""" + command_string = ":OUTP ON".format(ch_id=self._parent().idn) + self._parent().device[SCPI].send_cmd(command_string) + + @with_select + def disable(self): + """Disables the output of a certain channel""" + command_string = ":OUTP OFF".format(ch_id=self._parent().idn) + self._parent().device[SCPI].send_cmd(command_string) + + def _select(self) -> None: + self._parent()._select() + +# Implementation +class TaborChannel(AWGChannel): + def __init__(self, idn: int, device: TaborDevice): + super().__init__(idn) + + self._device = weakref.ref(device) + self._amplitude_offset_handling = AmplitudeOffsetHandling.IGNORE_OFFSET + + # adding Features + self.add_feature(TaborVoltageRange(self)) + self.add_feature(TaborActivatableChannels(self)) + + @property + def device(self) -> TaborDevice: + """Returns the device that the channel belongs to""" + return self._device() + + @property + def channel_tuple(self) -> "TaborChannelTuple": + """Returns the channel tuple that this channel belongs to""" + return self._channel_tuple() + + def _set_channel_tuple(self, channel_tuple: "TaborChannelTuple") -> None: + """ + The channel tuple "channel_tuple" is assigned to this channel + + Args: + channel_tuple (TaborChannelTuple): the channel tuple that this channel belongs to + """ + self._channel_tuple = weakref.ref(channel_tuple) + + def _select(self) -> None: + self.device[SCPI].send_cmd(":INST:SEL {channel}".format(channel=self.idn)) + + +######################################################################################################################## +# ChannelTuple +######################################################################################################################## +# Features +class TaborProgramManagement(ProgramManagement): + def __init__(self, channel_tuple: "TaborChannelTuple"): + super().__init__(channel_tuple) + self._programs = {} + self._armed_program = None + + self._idle_sequence_table = [(1, 1, 0), (1, 1, 0), (1, 1, 0)] + self._trigger_source = 'BUS' + + def get_repetition_mode(self, program_name: str) -> str: + """ + Returns the default repetition mode of a certain program + Args: + program_name (str): name of the program whose repetition mode should be returned + """ + return self._channel_tuple._known_programs[program_name].program._repetition_mode + + def set_repetition_mode(self, program_name: str, repetition_mode: str) -> None: + """ + Changes the default repetition mode of a certain program + + Args: + program_name (str): name of the program whose repetition mode should be changed + + Throws: + ValueError: this Exception is thrown when an invalid repetition mode is given + """ + if repetition_mode is "infinite" or repetition_mode is "once": + self._channel_tuple._known_programs[program_name].program._repetition_mode = repetition_mode + else: + raise ValueError("{} is no vaild repetition mode".format(repetition_mode)) + + @property + def supported_repetition_modes(self) -> Set[RepetitionMode]: + return {RepetitionMode.INFINITE} + + @with_configuration_guard + @with_select + def upload(self, name: str, + program: Loop, + channels: Tuple[Optional[ChannelID], Optional[ChannelID]], + marker_channels: Tuple[Optional[ChannelID], Optional[ChannelID]], + voltage_transformation: Tuple[Callable, Callable], + repetition_mode: str = None, + force: bool = False) -> None: + """ + Upload a program to the AWG. + + The policy is to prefer amending the unknown waveforms to overwriting old ones. + """ + + if repetition_mode is None: + repetition_mode = self._default_repetition_mode + else: + repetition_mode = RepetitionMode(repetition_mode) + + if repetition_mode not in self.supported_repetition_modes: + raise ValueError(f"{repetition_mode} is not supported on {self._channel_tuple}") + if len(channels) != len(self._channel_tuple.channels): + raise ValueError("Wrong number of channels") + if len(marker_channels) != len(self._channel_tuple.marker_channels): + raise ValueError("Wrong number of marker") + if len(voltage_transformation) != len(self._channel_tuple.channels): + raise ValueError("Wrong number of voltage transformations") + + # adjust program to fit criteria + sample_rate = self._channel_tuple.device.channel_tuples[0].sample_rate + make_compatible(program, + minimal_waveform_length=192, + waveform_quantum=16, + sample_rate=sample_rate / 10 ** 9) + + if name in self._channel_tuple._known_programs: + if force: + self._channel_tuple.free_program(name) + else: + raise ValueError('{} is already known on {}'.format(name, self._channel_tuple.idn)) + + # They call the peak to peak range amplitude + + ranges = tuple(ch[VoltageRange].amplitude for ch in self._channel_tuple.channels) + + voltage_amplitudes = tuple(range / 2 for range in ranges) + + voltage_offsets = [] + for channel in self._channel_tuple.channels: + if channel._amplitude_offset_handling == AmplitudeOffsetHandling.IGNORE_OFFSET: + voltage_offsets.append(0) + elif channel._amplitude_offset_handling == AmplitudeOffsetHandling.CONSIDER_OFFSET: + voltage_offsets.append(channel[VoltageRange].offset) + else: + raise NotImplementedError( + '{} is invalid as AWGAmplitudeOffsetHandling'.format(channel._amplitude_offset_handling)) + voltage_offsets = tuple(voltage_offsets) + + # parse to tabor program + tabor_program = TaborProgram(program, + channels=tuple(channels), + markers=marker_channels, + device_properties=self._channel_tuple.device.dev_properties, + sample_rate=sample_rate / 10 ** 9, + amplitudes=voltage_amplitudes, + offsets=voltage_offsets, + voltage_transformations=voltage_transformation) + + segments, segment_lengths = tabor_program.get_sampled_segments() + + waveform_to_segment, to_amend, to_insert = self._channel_tuple._find_place_for_segments_in_memory(segments, + segment_lengths) + + self._channel_tuple._segment_references[waveform_to_segment[waveform_to_segment >= 0]] += 1 + + for wf_index in np.flatnonzero(to_insert > 0): + segment_index = to_insert[wf_index] + self._channel_tuple._upload_segment(to_insert[wf_index], segments[wf_index]) + waveform_to_segment[wf_index] = segment_index + + if np.any(to_amend): + segments_to_amend = [segments[idx] for idx in np.flatnonzero(to_amend)] + waveform_to_segment[to_amend] = self._channel_tuple._amend_segments(segments_to_amend) + + self._channel_tuple._known_programs[name] = TaborProgramMemory(waveform_to_segment=waveform_to_segment, + program=tabor_program) + + # set the default repetionmode for a programm + self.set_repetition_mode(program_name=name, repetition_mode=repetition_mode) + + def remove(self, name: str) -> None: + """ + Remove a program from the AWG. + + Also discards all waveforms referenced only by the program identified by name. + + Args: + name (str): The name of the program to remove. + """ + self._channel_tuple.free_program(name) + self._channel_tuple.cleanup() + + def clear(self) -> None: + """ + Removes all programs and waveforms from the AWG. + + Caution: This affects all programs and waveforms on the AWG, not only those uploaded using qupulse! + """ + + self._channel_tuple.device.channels[0]._select() + self._channel_tuple.device[SCPI].send_cmd(":TRAC:DEL:ALL") + self._channel_tuple.device[SCPI].send_cmd(":SOUR:SEQ:DEL:ALL") + self._channel_tuple.device[SCPI].send_cmd(":ASEQ:DEL") + + self._channel_tuple.device[SCPI].send_cmd(":TRAC:DEF 1, 192") + self._channel_tuple.device[SCPI].send_cmd(":TRAC:SEL 1") + self._channel_tuple.device[SCPI].send_cmd(":TRAC:MODE COMB") + self._channel_tuple.device._send_binary_data(pref=":TRAC:DATA", bin_dat=self._channel_tuple._idle_segment.get_as_binary()) + + self._channel_tuple._segment_lengths = 192 * np.ones(1, dtype=np.uint32) + self._channel_tuple._segment_capacity = 192 * np.ones(1, dtype=np.uint32) + self._channel_tuple._segment_hashes = np.ones(1, dtype=np.int64) * hash(self._channel_tuple._idle_segment) + self._channel_tuple._segment_references = np.ones(1, dtype=np.uint32) + + self._channel_tuple._advanced_sequence_table = [] + self._channel_tuple._sequencer_tables = [] + + self._channel_tuple._known_programs = dict() + self._change_armed_program(None) + + @with_select + def arm(self, name: Optional[str]) -> None: + """ + Load the program 'name' and arm the device for running it. + + Args: + name (str): the program the device should change to + """ + if self._channel_tuple._current_program == name: + self._channel_tuple.device[SCPI].send_cmd("SEQ:SEL 1") + else: + self._change_armed_program(name) + + @property + def programs(self) -> Set[str]: + """The set of program names that can currently be executed on the hardware AWG.""" + return set(program.name for program in self._channel_tuple._known_programs.keys()) + + @with_select + def run_current_program(self) -> None: + """ + This method starts running the active program + + Throws: + RuntimeError: This exception is thrown if there is no active program for this device + """ + if (self._channel_tuple.device._is_coupled()): + # channel tuple is the first channel tuple + if (self._channel_tuple.device._channel_tuples[0] == self): + if self._channel_tuple._current_program: + repetition_mode = self._channel_tuple._known_programs[ + self._channel_tuple._current_program].program._repetition_mode + if repetition_mode is "infinite": + self._cont_repetition_mode() + self._channel_tuple.device[SCPI].send_cmd(':TRIG', + paranoia_level=self._channel_tuple.internal_paranoia_level) + else: + raise ValueError("{} is no vaild repetition mode".format(repetition_mode)) + else: + raise RuntimeError("No program active") + else: + warnings.warn( + "TaborWarning - run_current_program() - the device is coupled - runthe program via the first channel tuple") + + else: + if self._channel_tuple._current_program: + repetition_mode = self._channel_tuple._known_programs[ + self._channel_tuple._current_program].program._repetition_mode + if repetition_mode is "infinite": + self._cont_repetition_mode() + self._channel_tuple.device[SCPI].send_cmd(':TRIG', paranoia_level=self._channel_tuple.internal_paranoia_level) + else: + raise ValueError("{} is no vaild repetition mode".format(repetition_mode)) + else: + raise RuntimeError("No program active") + + @with_select + @with_configuration_guard + def _change_armed_program(self, name: Optional[str]) -> None: + """The armed program of the channel tuple is changed to the program with the name 'name'""" + if name is None: + sequencer_tables = [self._idle_sequence_table] + advanced_sequencer_table = [(1, 1, 0)] + else: + waveform_to_segment_index, program = self._channel_tuple._known_programs[name] + waveform_to_segment_number = waveform_to_segment_index + 1 + + # translate waveform number to actual segment + sequencer_tables = [[(rep_count, waveform_to_segment_number[wf_index], jump_flag) + for ((rep_count, wf_index, jump_flag), _) in sequencer_table] + for sequencer_table in program.get_sequencer_tables()] + + # insert idle sequence + sequencer_tables = [self._idle_sequence_table] + sequencer_tables + + # adjust advanced sequence table entries by idle sequence table offset + advanced_sequencer_table = [(rep_count, seq_no + 1, jump_flag) + for rep_count, seq_no, jump_flag in program.get_advanced_sequencer_table()] + + if program.waveform_mode == TaborSequencing.SINGLE: + assert len(advanced_sequencer_table) == 1 + assert len(sequencer_tables) == 2 + + while len(sequencer_tables[1]) < self._channel_tuple.device.dev_properties["min_seq_len"]: + assert advanced_sequencer_table[0][0] == 1 + sequencer_tables[1].append((1, 1, 0)) + + # insert idle sequence in advanced sequence table + advanced_sequencer_table = [(1, 1, 0)] + advanced_sequencer_table + + while len(advanced_sequencer_table) < self._channel_tuple.device.dev_properties["min_aseq_len"]: + advanced_sequencer_table.append((1, 1, 0)) + + self._channel_tuple.device[SCPI].send_cmd("SEQ:DEL:ALL", paranoia_level=self._channel_tuple.internal_paranoia_level) + self._channel_tuple._sequencer_tables = [] + self._channel_tuple.device[SCPI].send_cmd("ASEQ:DEL", paranoia_level=self._channel_tuple.internal_paranoia_level) + self._channel_tuple._advanced_sequence_table = [] + + # download all sequence tables + for i, sequencer_table in enumerate(sequencer_tables): + self._channel_tuple.device[SCPI].send_cmd("SEQ:SEL {}".format(i + 1), + paranoia_level=self._channel_tuple.internal_paranoia_level) + self._channel_tuple.device._download_sequencer_table(sequencer_table) + self._channel_tuple._sequencer_tables = sequencer_tables + self._channel_tuple.device[SCPI].send_cmd("SEQ:SEL 1", paranoia_level=self._channel_tuple.internal_paranoia_level) + + self._channel_tuple.device._download_adv_seq_table(advanced_sequencer_table) + self._channel_tuple._advanced_sequence_table = advanced_sequencer_table + + self._channel_tuple._current_program = name + + def _select(self): + self._channel_tuple.channels[0]._select() + + @property + def _configuration_guard_count(self): + return self._channel_tuple._configuration_guard_count + + @_configuration_guard_count.setter + def _configuration_guard_count(self, configuration_guard_count): + self._channel_tuple._configuration_guard_count = configuration_guard_count + + def _enter_config_mode(self): + self._channel_tuple._enter_config_mode() + + def _exit_config_mode(self): + self._channel_tuple._exit_config_mode() + + @with_select + def _cont_repetition_mode(self): + """Changes the run mode of this channel tuple to continous mode""" + self._channel_tuple.device[SCPI].send_cmd(f":TRIG:SOUR:ADV EXT") + self._channel_tuple.device[SCPI].send_cmd( + f":INIT:GATE OFF; :INIT:CONT ON; :INIT:CONT:ENAB ARM; :INIT:CONT:ENAB:SOUR {self._trigger_source}") + + +class TaborVolatileParameters(VolatileParameters): + def __init__(self, channel_tuple: "TaborChannelTuple", ): + super().__init__(channel_tuple=channel_tuple) + + def set_volatile_parameters(self, program_name: str, parameters: Mapping[str, numbers.Number]) -> None: + """ Set the values of parameters which were marked as volatile on program creation. Sets volatile parameters + in program memory and device's (adv.) sequence tables if program is current program. + + If set_volatile_parameters needs to run faster, set CONFIG_MODE_PARANOIA_LEVEL to 0 which causes the device to + enter the configuration mode with paranoia level 0 (Note: paranoia level 0 does not work for the simulator) + and set device._is_coupled. + + Args: + program_name: Name of program which should be changed. + parameters: Names of volatile parameters and respective values to which they should be set. + """ + waveform_to_segment_index, program = self._channel_tuple._known_programs[program_name] + modifications = program.update_volatile_parameters(parameters) + + self._channel_tuple.logger.debug("parameter modifications: %r" % modifications) + + if not modifications: + self._channel_tuple.logger.info( + "There are no volatile parameters to update. Either there are no volatile parameters with " + "these names,\nthe respective repetition counts already have the given values or the " + "volatile parameters were dropped during upload.") + return + + if program_name == self._channel_tuple._current_program: + commands = [] + + for position, entry in modifications.items(): + if not entry.repetition_count > 0: + raise ValueError("Repetition must be > 0") + + if isinstance(position, int): + commands.append(":ASEQ:DEF {},{},{},{}".format(position + 1, entry.element_number + 1, + entry.repetition_count, entry.jump_flag)) + else: + table_num, step_num = position + commands.append(":SEQ:SEL {}".format(table_num + 2)) + commands.append(":SEQ:DEF {},{},{},{}".format(step_num, + waveform_to_segment_index[entry.element_id] + 1, + entry.repetition_count, entry.jump_flag)) + self._channel_tuple._execute_multiple_commands_with_config_guard(commands) + + # Wait until AWG is finished + _ = self._channel_tuple.device.main_instrument._visa_inst.query("*OPC?") + + +class TaborReadProgram(ReadProgram): + def __init__(self, channel_tuple: "TaborChannelTuple", ): + super().__init__(channel_tuple=channel_tuple) + + def read_complete_program(self): + return PlottableProgram.from_read_data(self._channel_tuple.read_waveforms(), + self._channel_tuple.read_sequence_tables(), + self._channel_tuple.read_advanced_sequencer_table()) + + +# Implementation +class TaborChannelTuple(AWGChannelTuple): + CONFIG_MODE_PARANOIA_LEVEL = None + + def __init__(self, idn: int, device: TaborDevice, channels: Iterable["TaborChannel"], + marker_channels: Iterable["TaborMarkerChannel"]): + super().__init__(idn) + self._device = weakref.ref(device) + + self._configuration_guard_count = 0 + self._is_in_config_mode = False + + self._channels = tuple(channels) + self._marker_channels = tuple(marker_channels) + + # the channel and channel marker are assigned to this channel tuple + for channel in self.channels: + channel._set_channel_tuple(self) + for marker_ch in self.marker_channels: + marker_ch._set_channel_tuple(self) + + # adding Features + self.add_feature(TaborProgramManagement(self)) + self.add_feature(TaborVolatileParameters(self)) + + self._idle_segment = TaborSegment.from_sampled(voltage_to_uint16(voltage=np.zeros(192), + output_amplitude=0.5, + output_offset=0., resolution=14), + voltage_to_uint16(voltage=np.zeros(192), + output_amplitude=0.5, + output_offset=0., resolution=14), + None, None) + + self._known_programs = dict() # type: Dict[str, TaborProgramMemory] + self._current_program = None + + self._segment_lengths = None + self._segment_capacity = None + self._segment_hashes = None + self._segment_references = None + + self._sequencer_tables = None + self._advanced_sequence_table = None + + self._internal_paranoia_level = 0 + + self[TaborProgramManagement].clear() + + self._channel_tuple_adapter: ChannelTupleAdapter + + @property + def internal_paranoia_level(self) -> Optional[int]: + return self._internal_paranoia_level + + @property + def logger(self): + return logging.getLogger("qupulse.tabor") + + @property + def channel_tuple_adapter(self) -> ChannelTupleAdapter: + if self._channel_tuple_adapter is None: + self._channel_tuple_adapter = ChannelTupleAdapter(self) + return self._channel_tuple_adapter + + def _select(self) -> None: + """The channel tuple is selected, which means that the first channel of the channel tuple is selected""" + self.channels[0]._select() + + @property + def device(self) -> TaborDevice: + """Returns the device that the channel tuple belongs to""" + return self._device() + + @property + def channels(self) -> Collection["TaborChannel"]: + """Returns all channels of the channel tuple""" + return self._channels + + @property + def marker_channels(self) -> Collection["TaborMarkerChannel"]: + """Returns all marker channels of the channel tuple""" + return self._marker_channels + + @property + @with_select + def sample_rate(self) -> TimeType: + """Returns the sample rate that the channels of a channel tuple have""" + return TimeType.from_float( + float(self.device[SCPI].send_query(":FREQ:RAST?".format(channel=self.channels[0].idn)))) + + @property + def total_capacity(self) -> int: + return int(self.device.dev_properties["max_arb_mem"]) // 2 + + def free_program(self, name: str) -> TaborProgramMemory: + if name is None: + raise TaborException("Removing 'None' program is forbidden.") + program = self._known_programs.pop(name) + self._segment_references[program.waveform_to_segment] -= 1 + if self._current_program == name: + self[TaborProgramManagement]._change_armed_program(None) + return program + + @property + def _segment_reserved(self) -> np.ndarray: + return self._segment_references > 0 + + @property + def _free_points_in_total(self) -> int: + return self.total_capacity - np.sum(self._segment_capacity[self._segment_reserved]) + + @property + def _free_points_at_end(self) -> int: + reserved_index = np.flatnonzero(self._segment_reserved) + if len(reserved_index): + return self.total_capacity - np.sum(self._segment_capacity[:reserved_index[-1]]) + else: + return self.total_capacity + + @with_select + def read_waveforms(self) -> List[np.ndarray]: + device = self.device._get_readable_device(simulator=True) + + old_segment = device.send_query(":TRAC:SEL?") + waveforms = [] + uploaded_waveform_indices = np.flatnonzero( + self._segment_references) + 1 + + for segment in uploaded_waveform_indices: + device.send_cmd(":TRAC:SEL {}".format(segment), paranoia_level=self.internal_paranoia_level) + waveforms.append(device.read_act_seg_dat()) + device.send_cmd(":TRAC:SEL {}".format(old_segment), paranoia_level=self.internal_paranoia_level) + return waveforms + + @with_select + def read_sequence_tables(self) -> List[Tuple[np.ndarray, np.ndarray, np.ndarray]]: + device = self.device._get_readable_device(simulator=True) + + old_sequence = device.send_query(":SEQ:SEL?") + sequences = [] + uploaded_sequence_indices = np.arange(len(self._sequencer_tables)) + 1 + for sequence in uploaded_sequence_indices: + device.send_cmd(":SEQ:SEL {}".format(sequence), paranoia_level=self.internal_paranoia_level) + sequences.append(device.read_sequencer_table()) + device.send_cmd(":SEQ:SEL {}".format(old_sequence), paranoia_level=self.internal_paranoia_level) + return sequences + + @with_select + def read_advanced_sequencer_table(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + return self.device._get_readable_device(simulator=True).read_adv_seq_table() + + def read_complete_program(self) -> PlottableProgram: + return PlottableProgram.from_read_data(self.read_waveforms(), + self.read_sequence_tables(), + self.read_advanced_sequencer_table()) + + def _find_place_for_segments_in_memory(self, segments: Sequence, segment_lengths: np.ndarray) -> \ + Tuple[np.ndarray, np.ndarray, np.ndarray]: + # TODO: comment was not finished + """ + 1. Find known segments + 2. Find empty spaces with fitting length + 3. Find empty spaces with bigger length + 4. Amend remaining segments + + Args: + segments (Sequence): + segment_length (Sequence): + + Returns: + + """ + segment_hashes = np.fromiter((hash(segment) for segment in segments), count=len(segments), dtype=np.int64) + + waveform_to_segment = find_positions(self._segment_hashes, segment_hashes) + + # separate into known and unknown + unknown = (waveform_to_segment == -1) + known = ~unknown + + known_pos_in_memory = waveform_to_segment[known] + + assert len(known_pos_in_memory) == 0 or np.all( + self._segment_hashes[known_pos_in_memory] == segment_hashes[known]) + + new_reference_counter = self._segment_references.copy() + new_reference_counter[known_pos_in_memory] += 1 + + to_upload_size = np.sum(segment_lengths[unknown] + 16) + free_points_in_total = self.total_capacity - np.sum(self._segment_capacity[self._segment_references > 0]) + if free_points_in_total < to_upload_size: + raise MemoryError("Not enough free memory", + free_points_in_total, + to_upload_size, + self._free_points_in_total) + + to_amend = cast(np.ndarray, unknown) + to_insert = np.full(len(segments), fill_value=-1, dtype=np.int64) + + reserved_indices = np.flatnonzero(new_reference_counter > 0) + first_free = reserved_indices[-1] + 1 if len(reserved_indices) else 0 + + free_segments = new_reference_counter[:first_free] == 0 + free_segment_count = np.sum(free_segments) + + # look for a free segment place with the same length + for segment_idx in np.flatnonzero(to_amend): + if free_segment_count == 0: + break + + pos_of_same_length = np.logical_and(free_segments, + segment_lengths[segment_idx] == self._segment_capacity[:first_free]) + idx_same_length = np.argmax(pos_of_same_length) + if pos_of_same_length[idx_same_length]: + free_segments[idx_same_length] = False + free_segment_count -= 1 + + to_amend[segment_idx] = False + to_insert[segment_idx] = idx_same_length + + # try to find places that are larger than the segments to fit in starting with the large segments and large + # free spaces + segment_indices = np.flatnonzero(to_amend)[np.argsort(segment_lengths[to_amend])[::-1]] + capacities = self._segment_capacity[:first_free] + for segment_idx in segment_indices: + free_capacities = capacities[free_segments] + free_segments_indices = np.flatnonzero(free_segments)[np.argsort(free_capacities)[::-1]] + + if len(free_segments_indices) == 0: + break + + fitting_segment = np.argmax((free_capacities >= segment_lengths[segment_idx])[::-1]) + fitting_segment = free_segments_indices[fitting_segment] + if self._segment_capacity[fitting_segment] >= segment_lengths[segment_idx]: + free_segments[fitting_segment] = False + to_amend[segment_idx] = False + to_insert[segment_idx] = fitting_segment + + free_points_at_end = self.total_capacity - np.sum(self._segment_capacity[:first_free]) + if np.sum(segment_lengths[to_amend] + 16) > free_points_at_end: + raise MemoryError("Fragmentation does not allow upload.", + np.sum(segment_lengths[to_amend] + 16), + free_points_at_end, + self._free_points_at_end) + + return waveform_to_segment, to_amend, to_insert + + @with_select + @with_configuration_guard + def _upload_segment(self, segment_index: int, segment: TaborSegment) -> None: + if self._segment_references[segment_index] > 0: + raise ValueError("Reference count not zero") + if segment.num_points > self._segment_capacity[segment_index]: + raise ValueError("Cannot upload segment here.") + + segment_no = segment_index + 1 + + self.device[TaborSCPI].send_cmd(":TRAC:DEF {}, {}".format(segment_no, segment.num_points), + paranoia_level=self.internal_paranoia_level) + self._segment_lengths[segment_index] = segment.num_points + + self.device[TaborSCPI].send_cmd(":TRAC:SEL {}".format(segment_no), + paranoia_level=self.internal_paranoia_level) + + self.device[TaborSCPI].send_cmd(":TRAC:MODE COMB", + paranoia_level=self.internal_paranoia_level) + wf_data = segment.get_as_binary() + + self.device._send_binary_data(pref=":TRAC:DATA", bin_dat=wf_data) + self._segment_references[segment_index] = 1 + self._segment_hashes[segment_index] = hash(segment) + + @with_select + @with_configuration_guard + def _amend_segments(self, segments: List[TaborSegment]) -> np.ndarray: + new_lengths = np.asarray([s.num_points for s in segments], dtype=np.uint32) + + wf_data = make_combined_wave(segments) + trac_len = len(wf_data) // 2 + + segment_index = len(self._segment_capacity) + first_segment_number = segment_index + 1 + + self.device[TaborSCPI].send_cmd(":TRAC:DEF {},{}".format(first_segment_number, trac_len), + paranoia_level=self.internal_paranoia_level) + self.device[TaborSCPI].send_cmd(":TRAC:SEL {}".format(first_segment_number), + paranoia_level=self.internal_paranoia_level) + self.device[TaborSCPI].send_cmd(":TRAC:MODE COMB", + paranoia_level=self.internal_paranoia_level) + self.device._send_binary_data(pref=":TRAC:DATA", bin_dat=wf_data) + + old_to_update = np.count_nonzero(self._segment_capacity != self._segment_lengths) + segment_capacity = np.concatenate((self._segment_capacity, new_lengths)) + segment_lengths = np.concatenate((self._segment_lengths, new_lengths)) + segment_references = np.concatenate((self._segment_references, np.ones(len(segments), dtype=int))) + segment_hashes = np.concatenate((self._segment_hashes, [hash(s) for s in segments])) + if len(segments) < old_to_update: + for i, segment in enumerate(segments): + current_segment_number = first_segment_number + i + self.device[TaborSCPI].send_cmd(":TRAC:DEF {},{}".format(current_segment_number, segment.num_points), + paranoia_level=self.internal_paranoia_level) + else: + # flush the capacity + self.device._download_segment_lengths(segment_capacity) + + # update non fitting lengths + for i in np.flatnonzero(segment_capacity != segment_lengths): + self.device[SCPI].send_cmd(":TRAC:DEF {},{}".format(i + 1, segment_lengths[i])) + + self._segment_capacity = segment_capacity + self._segment_lengths = segment_lengths + self._segment_hashes = segment_hashes + self._segment_references = segment_references + + return segment_index + np.arange(len(segments), dtype=np.int64) + + @with_select + @with_configuration_guard + def cleanup(self) -> None: + """Discard all segments after the last which is still referenced""" + reserved_indices = np.flatnonzero(self._segment_references > 0) + old_end = len(self._segment_lengths) + new_end = reserved_indices[-1] + 1 if len(reserved_indices) else 0 + self._segment_lengths = self._segment_lengths[:new_end] + self._segment_capacity = self._segment_capacity[:new_end] + self._segment_hashes = self._segment_hashes[:new_end] + self._segment_references = self._segment_references[:new_end] + + try: + # send max 10 commands at once + chunk_size = 10 + for chunk_start in range(new_end, old_end, chunk_size): + self.device[SCPI].send_cmd("; ".join("TRAC:DEL {}".format(i + 1) + for i in range(chunk_start, min(chunk_start + chunk_size, old_end)))) + except Exception as e: + raise TaborUndefinedState("Error during cleanup. Device is in undefined state.", device=self) from e + + @with_configuration_guard + def _execute_multiple_commands_with_config_guard(self, commands: List[str]) -> None: + """ Joins the given commands into one and executes it with configuration guard. + + Args: + commands: Commands that should be executed. + """ + cmd_str = ";".join(commands) + self.device[TaborSCPI].send_cmd(cmd_str, paranoia_level=self.internal_paranoia_level) + + def _enter_config_mode(self) -> None: + """ + Enter the configuration mode if not already in. All outputs are set to the DC offset of the device and the + sequencing is disabled. The manual states this speeds up sequence validation when uploading multiple sequences. + When entering and leaving the configuration mode the AWG outputs a small (~60 mV in 4 V mode) blip. + """ + if self._is_in_config_mode is False: + + # 1. Selct channel pair + # 2. Select DC as function shape + # 3. Select build-in waveform mode + + if self.device._is_coupled(): + out_cmd = ":OUTP:ALL OFF" + else: + out_cmd = "" + for channel in self.channels: + out_cmd = out_cmd + ":INST:SEL {ch_id}; :OUTP OFF;".format(ch_id=channel.idn) + + marker_0_cmd = ":SOUR:MARK:SEL 1;:SOUR:MARK:SOUR USER;:SOUR:MARK:STAT OFF" + marker_1_cmd = ":SOUR:MARK:SEL 2;:SOUR:MARK:SOUR USER;:SOUR:MARK:STAT OFF" + + wf_mode_cmd = ":SOUR:FUNC:MODE FIX" + + cmd = ";".join([marker_0_cmd, marker_1_cmd, wf_mode_cmd]) + cmd = out_cmd + cmd + self.device[TaborSCPI].send_cmd(cmd, paranoia_level=self.CONFIG_MODE_PARANOIA_LEVEL) + self._is_in_config_mode = True + + @with_select + def _exit_config_mode(self) -> None: + """Leave the configuration mode. Enter advanced sequence mode and turn on all outputs""" + + if self.device._is_coupled(): + # Coupled -> switch all channels at once + other_channel_tuple: TaborChannelTuple + if self.channels == self.device.channel_tuples[0].channels: + other_channel_tuple = self.device.channel_tuples[1] + else: + other_channel_tuple = self.device.channel_tuples[0] + + if not other_channel_tuple._is_in_config_mode: + self.device[SCPI].send_cmd(":SOUR:FUNC:MODE ASEQ") + self.device[SCPI].send_cmd(":SEQ:SEL 1") + self.device[SCPI].send_cmd(":OUTP:ALL ON") + + else: + self.device[SCPI].send_cmd(":SOUR:FUNC:MODE ASEQ") + self.device[SCPI].send_cmd(":SEQ:SEL 1") + + for channel in self.channels: + channel[ActivatableChannels].enable() + + for marker_ch in self.marker_channels: + marker_ch[ActivatableChannels].enable() + + self._is_in_config_mode = False + + +######################################################################################################################## +# Marker Channel +######################################################################################################################## +# Features + +class TaborActivatableMarkerChannels(ActivatableChannels): + def __init__(self, marker_channel: "TaborMarkerChannel"): + super().__init__() + self._parent = weakref.ref(marker_channel) + + @property + def enabled(self) -> bool: + """ + Returns the the state a marker channel has at the moment. A channel is either activated or deactivated + True stands for activated and false for deactivated + """ + return self._parent().device[SCPI].send_query(":MARK:STAT ?") == "ON" + + @with_select + def enable(self): + """Enables the output of a certain marker channel""" + command_string = "SOUR:MARK:SOUR USER; :SOUR:MARK:STAT ON" + command_string = command_string.format( + channel=self._parent().channel_tuple.channels[0].idn, + marker=self._parent().channel_tuple.marker_channels.index(self._parent()) + 1) + self._parent().device[SCPI].send_cmd(command_string) + + @with_select + def disable(self): + """Disable the output of a certain marker channel""" + command_string = ":SOUR:MARK:SOUR USER; :SOUR:MARK:STAT OFF" + command_string = command_string.format( + channel=self._parent().channel_tuple.channels[0].idn, + marker=self._parent().channel_tuple.marker_channels.index(self._parent()) + 1) + self._parent().device[SCPI].send_cmd(command_string) + + def _select(self) -> None: + self._parent()._select() + + +# Implementation +class TaborMarkerChannel(AWGMarkerChannel): + def __init__(self, idn: int, device: TaborDevice): + super().__init__(idn) + self._device = weakref.ref(device) + + # adding Features + self.add_feature(TaborActivatableMarkerChannels(self)) + + @property + def device(self) -> TaborDevice: + """Returns the device that this marker channel belongs to""" + return self._device() + + @property + def channel_tuple(self) -> TaborChannelTuple: + """Returns the channel tuple that this marker channel belongs to""" + return self._channel_tuple() + + def _set_channel_tuple(self, channel_tuple: TaborChannelTuple) -> None: + """ + The channel tuple 'channel_tuple' is assigned to this marker channel + + Args: + channel_tuple (TaborChannelTuple): the channel tuple that this marker channel belongs to + """ + self._channel_tuple = weakref.ref(channel_tuple) + + def _select(self) -> None: + """ + This marker channel is selected and is now the active channel marker of the device + """ + self.device.channels[int((self.idn - 1) / 2)]._select() + self.device[SCPI].send_cmd(":SOUR:MARK:SEL {marker}".format(marker=(((self.idn - 1) % 2) + 1))) + + +class TaborUndefinedState(TaborException): + """ + If this exception is raised the attached tabor device is in an undefined state. + It is highly recommended to call reset it.f + """ + + def __init__(self, *args, device: Union[TaborDevice, TaborChannelTuple]): + super().__init__(*args) + self.device = device + + def reset_device(self): + if isinstance(self.device, TaborDevice): + self.device[TaborDeviceControl].reset() + elif isinstance(self.device, TaborChannelTuple): + self.device.cleanup() + self.device[TaborProgramManagement].clear() diff --git a/tests/hardware/feature_awg/__init__.py b/tests/hardware/feature_awg/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/hardware/feature_awg/awg_new_driver_base_tests.py b/tests/hardware/feature_awg/awg_new_driver_base_tests.py new file mode 100644 index 000000000..c3dbadf7f --- /dev/null +++ b/tests/hardware/feature_awg/awg_new_driver_base_tests.py @@ -0,0 +1,283 @@ +from typing import Callable, Iterable, Optional, Set, Tuple +import unittest +import warnings + +from qupulse import ChannelID +from qupulse._program._loop import Loop +from qupulse.hardware.feature_awg import channel_tuple_wrapper +from qupulse.hardware.feature_awg.base import AWGDevice, AWGChannel, AWGChannelTuple, AWGMarkerChannel +from qupulse.hardware.feature_awg.features import ChannelSynchronization, ProgramManagement, VoltageRange, \ + AmplitudeOffsetHandling, RepetitionMode +from qupulse.utils.types import Collection + + +######################################################################################################################## +# Example Features +######################################################################################################################## + +class TestSynchronizeChannelsFeature(ChannelSynchronization): + def __init__(self, device: "TestAWGDevice"): + super().__init__() + self._parent = device + + def synchronize_channels(self, group_size: int) -> None: + """Forwarding call to TestAWGDevice""" + self._parent.synchronize_channels(group_size) + + +class TestVoltageRangeFeature(VoltageRange): + def __init__(self, channel: "TestAWGChannel"): + super().__init__() + self._parent = channel + + @property + def offset(self) -> float: + """Get offset of TestAWGChannel""" + return self._parent._offset + + @offset.setter + def offset(self, offset: float) -> None: + """Set offset of TestAWGChannel""" + self._parent._offset = offset + + @property + def amplitude(self) -> float: + """Get amplitude of TestAWGChannel""" + return self._parent._amplitude + + @amplitude.setter + def amplitude(self, amplitude: float) -> None: + """Set amplitude of TestAWGChannel""" + self._parent._amplitude = amplitude + + @property + def amplitude_offset_handling(self) -> str: + """Get amplitude-offset-handling of TestAWGChannel""" + return self._parent._ampl_offs_handling + + @amplitude_offset_handling.setter + def amplitude_offset_handling(self, ampl_offs_handling: str) -> None: + """Set amplitude-offset-handling of TestAWGChannel""" + self._parent._ampl_offs_handling = ampl_offs_handling + + +class TestProgramManagementFeature(ProgramManagement): + def __init__(self, channel_tuple): + super().__init__(channel_tuple=channel_tuple) + self._programs = {} + self._armed_program = None + self._supported_repetition_modes = {RepetitionMode.INFINITE, RepetitionMode.ONCE} + + def upload(self, name: str, program: Loop, channels: Tuple[Optional[ChannelID], ...], + marker_channels: Tuple[Optional[ChannelID], ...], voltage_transformation: Tuple[Optional[Callable], ...], + force: bool = False) -> None: + if name in self._programs: + raise KeyError("Program with name \"{}\" is already on the instrument.".format(name)) + self._programs[name] = program + + def remove(self, name: str) -> None: + if self._armed_program == name: + raise RuntimeError("Cannot remove program, when it is armed.") + if name not in self._programs: + raise KeyError("Unknown program: {}".format(name)) + del self._programs[name] + + def clear(self) -> None: + if self._armed_program is not None: + raise RuntimeError("Cannot clear programs, with an armed program.") + self._programs.clear() + + def arm(self, name: Optional[str]) -> None: + self._armed_program = name + + @property + def programs(self) -> Set[str]: + return set(self._programs.keys()) + + def run_current_program(self) -> None: + if self._armed_program: + print("Run Program:", self._armed_program) + print(self.programs[self._armed_program]) + else: + print("No program armed") + + def supported_repetition_modes(self) -> Set[RepetitionMode]: + return self._supported_repetition_modes + + +######################################################################################################################## +# Device & Channels +######################################################################################################################## + +class TestAWGDevice(AWGDevice): + def __init__(self, name: str): + super().__init__(name) + + # Add feature to this object (self) + # During this call, the function of the feature is dynamically added to this object + self.add_feature(TestSynchronizeChannelsFeature(self)) + + self._channels = [TestAWGChannel(i, self) for i in range(8)] # 8 channels + self._channel_tuples = [] + + # Call the feature function, with the feature's signature + # Default channel synchronization with a group size of 2 + self[ChannelSynchronization].synchronize_channels(2) + + def cleanup(self) -> None: + """This will be called automatically in __del__""" + self._channels.clear() + self._channel_tuples.clear() + + @property + def channels(self) -> Collection["TestAWGChannel"]: + return self._channels + + @property + def marker_channels(self) -> Collection[AWGMarkerChannel]: + return [] + + @property + def channel_tuples(self) -> Collection["TestAWGChannelTuple"]: + return self._channel_tuples + + def synchronize_channels(self, group_size: int) -> None: + """Implementation of the feature's , but you can also call it directly""" + if group_size not in [2, 4, 8]: # Allowed group sizes + raise ValueError("Invalid group size for channel synchronization") + + self._channel_tuples.clear() + tmp_channel_tuples = [[] for i in range(len(self._channels) // group_size)] + + # Preparing the channel structure + for i, channel in enumerate(self._channels): + tmp_channel_tuples[i // group_size].append(channel) + + # Create channel tuples with its belonging channels and refer to their parent tuple + for i, tmp_channel_tuple in enumerate(tmp_channel_tuples): + channel_tuple = TestAWGChannelTuple(i, self, tmp_channel_tuple) + self._channel_tuples.append(channel_tuple) + for channel in tmp_channel_tuple: + channel._set_channel_tuple(channel_tuple) + + +class TestAWGChannelTuple(AWGChannelTuple): + def __init__(self, idn: int, device: TestAWGDevice, channels: Iterable["TestAWGChannel"]): + super().__init__(idn) + + # Add feature to this object (self) + # During this call, the function of the feature is dynamically added to this object + self.add_feature(TestProgramManagementFeature(channel_tuple=self)) + + self._device = device + self._channels = tuple(channels) + self._sample_rate = 12.456 # default value + + @property + def channel_tuple_adapter(self) -> channel_tuple_wrapper: + pass + + @property + def sample_rate(self) -> float: + return self._sample_rate + + @sample_rate.setter + def sample_rate(self, sample_rate: float) -> None: + self._sample_rate = sample_rate + + @property + def device(self) -> TestAWGDevice: + return self._device + + @property + def channels(self) -> Collection["TestAWGChannel"]: + return self._channels + + @property + def marker_channels(self) -> Collection[AWGMarkerChannel]: + return [] + + +class TestAWGChannel(AWGChannel): + def __init__(self, idn: int, device: TestAWGDevice): + super().__init__(idn) + + # Add feature to this object (self) + # During this call, all functions of the feature are dynamically added to this object + self.add_feature(TestVoltageRangeFeature(self)) + + self._device = device + self._channel_tuple = None + self._offset = 0.0 + self._amplitude = 5.0 + self._ampl_offs_handling = AmplitudeOffsetHandling.IGNORE_OFFSET + + @property + def device(self) -> TestAWGDevice: + return self._device + + @property + def channel_tuple(self) -> Optional[TestAWGChannelTuple]: + return self._channel_tuple + + def _set_channel_tuple(self, channel_tuple: TestAWGChannelTuple) -> None: + self._channel_tuple = channel_tuple + + +class TestBaseClasses(unittest.TestCase): + def setUp(self): + self.device_name = "My device" + self.device = TestAWGDevice(self.device_name) + + def test_device(self): + self.assertEqual(self.device.name, self.device_name, "Invalid name for device") + self.assertEqual(len(self.device.channels), 8, "Invalid number of channels") + self.assertEqual(len(self.device.marker_channels), 0, "Invalid number of marker channels") + self.assertEqual(len(self.device.channel_tuples), 4, "Invalid default channel tuples for device") + + def test_channels(self): + for i, channel in enumerate(self.device.channels): + self.assertEqual(channel.idn, i), "Invalid channel id" + self.assertEqual(channel[VoltageRange].offset, 0, "Invalid default offset for channel {}".format(i)) + self.assertEqual(channel[VoltageRange].amplitude, 5.0, + "Invalid default amplitude for channel {}".format(i)) + + offs = -0.1 * i + ampl = 0.5 + 3 * i + channel[VoltageRange].offset = offs + channel[VoltageRange].amplitude = ampl + + self.assertEqual(channel[VoltageRange].offset, offs, "Invalid offset for channel {}".format(i)) + self.assertEqual(channel[VoltageRange].amplitude, ampl, "Invalid amplitude for channel {}".format(i)) + + def test_channel_tuples(self): + for group_size in [2, 4, 8]: + self.device[ChannelSynchronization].synchronize_channels(group_size) + + self.assertEqual(len(self.device.channel_tuples), 8 // group_size, "Invalid number of channel tuples") + + # Check if channels and channel tuples are connected right + for i, channel in enumerate(self.device.channels): + self.assertEqual(channel.channel_tuple.idn, i // group_size, + "Invalid channel tuple {} for channel {}".format(channel.channel_tuple.idn, i)) + self.assertTrue(channel in channel.channel_tuple.channels, + "Channel {} not in its parent channel tuple {}".format(i, channel.channel_tuple.idn)) + + self.assertEqual(len(self.device.channel_tuples), 1, "Invalid number of channel tuples") + + def test_error_handling(self): + with self.assertRaises(ValueError): + self.device[ChannelSynchronization].synchronize_channels(3) + + with self.assertRaises(KeyError): + self.device.add_feature(TestSynchronizeChannelsFeature(self.device)) + + with self.assertRaises(TypeError): + self.device.add_feature(TestProgramManagementFeature()) + + with self.assertRaises(TypeError): + self.device.features[ChannelSynchronization] = None + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/hardware/feature_awg/tabor_new_driver_clock_tests.py b/tests/hardware/feature_awg/tabor_new_driver_clock_tests.py new file mode 100644 index 000000000..fe3ae7a3c --- /dev/null +++ b/tests/hardware/feature_awg/tabor_new_driver_clock_tests.py @@ -0,0 +1,135 @@ +import unittest + + +with_alazar = True + +def get_pulse(): + from qupulse.pulses import TablePulseTemplate as TPT, SequencePulseTemplate as SPT, RepetitionPulseTemplate as RPT + + ramp = TPT(identifier='ramp', channels={'out', 'trigger'}) + ramp.add_entry(0, 'start', channel='out') + ramp.add_entry('duration', 'stop', 'linear', channel='out') + + ramp.add_entry(0, 1, channel='trigger') + ramp.add_entry('duration', 1, 'hold', channel='trigger') + + ramp.add_measurement_declaration('meas', 0, 'duration') + + base = SPT([(ramp, dict(start='min', stop='max', duration='tau/3'), dict(meas='A')), + (ramp, dict(start='max', stop='max', duration='tau/3'), dict(meas='B')), + (ramp, dict(start='max', stop='min', duration='tau/3'), dict(meas='C'))], {'min', 'max', 'tau'}) + + repeated = RPT(base, 'n') + + root = SPT([repeated, repeated, repeated], {'min', 'max', 'tau', 'n'}) + + return root + + +def get_alazar_config(): + from atsaverage import alazar + from atsaverage.config import ScanlineConfiguration, CaptureClockConfiguration, EngineTriggerConfiguration,\ + TRIGInputConfiguration, InputConfiguration + + trig_level = int((5 + 0.4) / 10. * 255) + assert 0 <= trig_level < 256 + + config = ScanlineConfiguration() + config.triggerInputConfiguration = TRIGInputConfiguration(triggerRange=alazar.TriggerRangeID.etr_5V) + config.triggerConfiguration = EngineTriggerConfiguration(triggerOperation=alazar.TriggerOperation.J, + triggerEngine1=alazar.TriggerEngine.J, + triggerSource1=alazar.TriggerSource.external, + triggerSlope1=alazar.TriggerSlope.positive, + triggerLevel1=trig_level, + triggerEngine2=alazar.TriggerEngine.K, + triggerSource2=alazar.TriggerSource.disable, + triggerSlope2=alazar.TriggerSlope.positive, + triggerLevel2=trig_level) + config.captureClockConfiguration = CaptureClockConfiguration(source=alazar.CaptureClockType.internal_clock, + samplerate=alazar.SampleRateID.rate_100MSPS) + config.inputConfiguration = 4*[InputConfiguration(input_range=alazar.InputRangeID.range_1_V)] + config.totalRecordSize = 0 + + assert config.totalRecordSize == 0 + + return config + +def get_operations(): + from atsaverage.operations import Downsample + + return [Downsample(identifier='DS_A', maskID='A'), + Downsample(identifier='DS_B', maskID='B'), + Downsample(identifier='DS_C', maskID='C'), + Downsample(identifier='DS_D', maskID='D')] + +def get_window(card): + from atsaverage.gui import ThreadedStatusWindow + window = ThreadedStatusWindow(card) + window.start() + return window + + +class TaborTests(unittest.TestCase): + @unittest.skip + def test_all(self): + from qupulse.hardware.feature_awg.tabor import TaborChannelTuple, TaborDevice + #import warnings + tawg = TaborDevice(r'USB0::0x168C::0x2184::0000216488::INSTR') + tchannelpair = TaborChannelTuple(tawg, (1, 2), 'TABOR_AB') + tawg.paranoia_level = 2 + + #warnings.simplefilter('error', Warning) + + from qupulse.hardware.setup import HardwareSetup, PlaybackChannel, MarkerChannel + hardware_setup = HardwareSetup() + + hardware_setup.set_channel('TABOR_A', PlaybackChannel(tchannelpair, 0)) + hardware_setup.set_channel('TABOR_B', PlaybackChannel(tchannelpair, 1)) + hardware_setup.set_channel('TABOR_A_MARKER', MarkerChannel(tchannelpair, 0)) + hardware_setup.set_channel('TABOR_B_MARKER', MarkerChannel(tchannelpair, 1)) + + if with_alazar: + from qupulse.hardware.dacs.alazar import AlazarCard + import atsaverage.server + + if not atsaverage.server.Server.default_instance.running: + atsaverage.server.Server.default_instance.start(key=b'guest') + + import atsaverage.core + + alazar = AlazarCard(atsaverage.core.getLocalCard(1, 1)) + alazar.register_mask_for_channel('A', 0) + alazar.register_mask_for_channel('B', 0) + alazar.register_mask_for_channel('C', 0) + alazar.config = get_alazar_config() + + alazar.register_operations('test', get_operations()) + window = get_window(atsaverage.core.getLocalCard(1, 1)) + hardware_setup.register_dac(alazar) + + repeated = get_pulse() + + from qupulse.pulses.sequencing import Sequencer + + sequencer = Sequencer() + sequencer.push(repeated, + parameters=dict(n=1000, min=-0.5, max=0.5, tau=192*3), + channel_mapping={'out': 'TABOR_A', 'trigger': 'TABOR_A_MARKER'}, + window_mapping=dict(A='A', B='B', C='C')) + instruction_block = sequencer.build() + + hardware_setup.register_program('test', instruction_block) + + if with_alazar: + from atsaverage.masks import PeriodicMask + m = PeriodicMask() + m.identifier = 'D' + m.begin = 0 + m.end = 1 + m.period = 1 + m.channel = 0 + alazar._registered_programs['test'].masks.append(m) + + hardware_setup.arm_program('test') + + d = 1 diff --git a/tests/hardware/feature_awg/tabor_new_driver_dummy_based_tests.py b/tests/hardware/feature_awg/tabor_new_driver_dummy_based_tests.py new file mode 100644 index 000000000..2ea52ab36 --- /dev/null +++ b/tests/hardware/feature_awg/tabor_new_driver_dummy_based_tests.py @@ -0,0 +1,809 @@ +import sys +import unittest +from unittest import mock +from unittest.mock import patch, MagicMock + +from typing import List, Tuple, Optional, Any +from copy import copy, deepcopy + +import numpy as np + +from qupulse.hardware.awgs.base import AWGAmplitudeOffsetHandling +from qupulse.hardware.feature_awg.tabor import TaborProgram, TaborProgramMemory +from qupulse.utils.types import TimeType +from qupulse._program.tabor import TableDescription, TimeType, TableEntry +from tests.hardware.dummy_modules import import_package + + +class DummyTaborProgramClass: + def __init__(self, segments=None, segment_lengths=None, + sequencer_tables=None, advanced_sequencer_table=None, waveform_mode=None): + self.program = None + self.device_properties = None + self.channels = None + self.markers = None + + self.segment_lengths = segment_lengths + self.segments = segments + + self.sequencer_tables = sequencer_tables + self.advanced_sequencer_table = advanced_sequencer_table + self.waveform_mode = waveform_mode + + self.created = [] + + def __call__(self, program, device_properties, channels, markers): + self.program = program + self.device_properties = device_properties + self.channels = channels + self.markers = markers + + class DummyTaborProgram: + def __init__(self, class_obj: DummyTaborProgramClass): + self.sampled_segments_calls = [] + self.class_obj = class_obj + self.waveform_mode = class_obj.waveform_mode + def sampled_segments(self, sample_rate, voltage_amplitude, voltage_offset, voltage_transformation): + self.sampled_segments_calls.append((sample_rate, voltage_amplitude, voltage_offset, voltage_transformation)) + return self.class_obj.segments, self.class_obj.segment_lengths + def get_sequencer_tables(self): + return self.class_obj.sequencer_tables + def get_advanced_sequencer_table(self): + return self.class_obj.advanced_sequencer_table + self.created.append(DummyTaborProgram(self)) + return self.created[-1] + + +class TaborDummyBasedTest(unittest.TestCase): + to_unload = ['pytabor', 'pyvisa', 'visa', 'teawg', 'qupulse', 'tests.pulses.sequencing_dummies'] + backup_modules = dict() + + @classmethod + def unload_package(cls, package_name): + modules_to_delete = [module_name for module_name in sys.modules if module_name.startswith(package_name)] + + for module_name in modules_to_delete: + del sys.modules[module_name] + + @classmethod + def backup_package(cls, package_name): + cls.backup_modules[package_name] = [(module_name, module) + for module_name, module in sys.modules.items() + if module_name.startswith(package_name)] + + @classmethod + def restore_packages(cls): + for package, module_list in cls.backup_modules.items(): + for module_name, module in module_list: + sys.modules[module_name] = module + + @classmethod + def setUpClass(cls): + for u in cls.to_unload: + cls.backup_package(u) + + for u in cls.to_unload: + cls.unload_package(u) + + import_package('pytabor') + import_package('pyvisa') + import_package('teawg') + + @classmethod + def tearDownClass(cls): + for u in cls.to_unload: + cls.unload_package(u) + + cls.restore_packages() + + def setUp(self): + from qupulse.hardware.awgs.tabor import TaborAWGRepresentation + self.instrument = TaborAWGRepresentation('main_instrument', + reset=True, + paranoia_level=2, + mirror_addresses=['mirror_instrument']) + self.instrument.main_instrument.visa_inst.answers[':OUTP:COUP'] = 'DC' + self.instrument.main_instrument.visa_inst.answers[':VOLT'] = '1.0' + self.instrument.main_instrument.visa_inst.answers[':FREQ:RAST'] = '1e9' + self.instrument.main_instrument.visa_inst.answers[':VOLT:HV'] = '0.7' + + @property + def awg_representation(self): + return self.instrument + + @property + def channel_pair(self): + return self.awg_representation.channel_pair_AB + + def reset_instrument_logs(self): + for device in self.instrument.all_devices: + device.logged_commands = [] + device._send_binary_data_calls = [] + device._download_adv_seq_table_calls = [] + device._download_sequencer_table_calls = [] + + def assertAllCommandLogsEqual(self, expected_log: List): + for device in self.instrument.all_devices: + self.assertEqual(device.logged_commands, expected_log) + + +class TaborAWGRepresentationDummyBasedTests(TaborDummyBasedTest): + def test_send_cmd(self): + self.reset_instrument_logs() + + self.instrument.send_cmd('bleh', paranoia_level=3) + + self.assertAllCommandLogsEqual([((), dict(paranoia_level=3, cmd_str='bleh'))]) + + self.instrument.send_cmd('bleho') + self.assertAllCommandLogsEqual([((), dict(paranoia_level=3, cmd_str='bleh')), + ((), dict(cmd_str='bleho', paranoia_level=None))]) + + def test_trigger(self): + self.reset_instrument_logs() + self.instrument.trigger() + + self.assertAllCommandLogsEqual([((), dict(cmd_str=':TRIG', paranoia_level=None))]) + + def test_paranoia_level(self): + self.assertEqual(self.instrument.paranoia_level, self.instrument.main_instrument.paranoia_level) + self.instrument.paranoia_level = 30 + for device in self.instrument.all_devices: + self.assertEqual(device.paranoia_level, 30) + + def test_enable(self): + self.reset_instrument_logs() + self.instrument.enable() + + expected_commands = [':ENAB'] + expected_log = [((), dict(cmd_str=cmd, paranoia_level=None)) + for cmd in expected_commands] + self.assertAllCommandLogsEqual(expected_log) + + +class TaborChannelPairTests(TaborDummyBasedTest): + @staticmethod + def to_new_sequencer_tables(sequencer_tables: List[List[Tuple[int, int, int]]] + ) -> List[List[Tuple[TableDescription, Optional[Any]]]]: + return [[(TableDescription(*entry), None) for entry in sequencer_table] + for sequencer_table in sequencer_tables] + + @staticmethod + def to_new_advanced_sequencer_table(advanced_sequencer_table: List[Tuple[int, int, int]]) -> List[TableDescription]: + return [TableDescription(*entry) for entry in advanced_sequencer_table] + + @classmethod + def setUpClass(cls): + super().setUpClass() + + from qupulse.hardware.awgs.tabor import TaborChannelPair, TaborProgramMemory, TaborSegment, TaborSequencing + from qupulse.pulses.table_pulse_template import TableWaveform + from qupulse.pulses.interpolation import HoldInterpolationStrategy + from qupulse._program._loop import Loop + + from tests.pulses.sequencing_dummies import DummyWaveform + + from qupulse._program.tabor import make_combined_wave + + cls.DummyWaveform = DummyWaveform + cls.TaborChannelPair = TaborChannelPair + cls.TaborProgramMemory = TaborProgramMemory + cls.TableWaveform = TableWaveform + cls.HoldInterpolationStrategy = HoldInterpolationStrategy + cls.Loop = Loop + cls.TaborSegment = TaborSegment + cls.make_combined_wave = staticmethod(make_combined_wave) + cls.TaborSequencing = TaborSequencing + + def setUp(self): + super().setUp() + + def test__execute_multiple_commands_with_config_guard(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + given_commands = [':ASEQ:DEF 2,2,5,0', ':SEQ:SEL 2', ':SEQ:DEF 1,2,10,0'] + expected_command = ':ASEQ:DEF 2,2,5,0;:SEQ:SEL 2;:SEQ:DEF 1,2,10,0' + with mock.patch.object(channel_pair.device, 'send_cmd') as send_cmd: + channel_pair._execute_multiple_commands_with_config_guard(given_commands) + send_cmd.assert_called_once_with(expected_command, paranoia_level=channel_pair.internal_paranoia_level) + + def test_set_volatile_parameters(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + parameters = {'var': 2} + modifications = {1: TableEntry(repetition_count=5, element_number=1, jump_flag=0), + (0, 1): TableDescription(repetition_count=10, element_id=0, jump_flag=0)} + invalid_modification = {1: TableEntry(repetition_count=0, element_number=1, jump_flag=0)} + no_modifications = {} + + program_mock = mock.Mock(TaborProgram) + program_memory = TaborProgramMemory(waveform_to_segment=np.array([1, 4]), program=program_mock) + + expected_commands = {':ASEQ:DEF 2,2,5,0', ':SEQ:SEL 2', ':SEQ:DEF 1,2,10,0'} + + channel_pair._known_programs['active_program'] = program_memory + channel_pair._known_programs['other_program'] = program_memory + channel_pair._current_program = 'active_program' + + with mock.patch.object(program_mock, 'update_volatile_parameters', return_value=modifications) as update_prog: + with mock.patch.object(channel_pair, '_execute_multiple_commands_with_config_guard') as ex_com: + with mock.patch.object(channel_pair.device.main_instrument._visa_inst, 'query'): + channel_pair.set_volatile_parameters('other_program', parameters) + ex_com.assert_not_called() + update_prog.assert_called_once_with(parameters) + + channel_pair.set_volatile_parameters('active_program', parameters) + self.assertEqual(1, ex_com.call_count) + actual_commands, = ex_com.call_args[0] + self.assertEqual(expected_commands, set(actual_commands)) + self.assertEqual(len(expected_commands), len(actual_commands)) + + assert update_prog.call_count == 2 + update_prog.assert_called_with(parameters) + + with mock.patch.object(program_mock, 'update_volatile_parameters', return_value=no_modifications) as update_prog: + with mock.patch.object(channel_pair, '_execute_multiple_commands_with_config_guard') as ex_com: + channel_pair.set_volatile_parameters('active_program', parameters) + + ex_com.assert_not_called() + update_prog.assert_called_once_with(parameters) + + with mock.patch.object(program_mock, 'update_volatile_parameters', return_value=invalid_modification) as update_prog: + with mock.patch.object(channel_pair, '_execute_multiple_commands_with_config_guard') as ex_com: + with self.assertRaises(ValueError): + channel_pair.set_volatile_parameters('active_program', parameters) + + ex_com.assert_not_called() + update_prog.assert_called_once_with(parameters) + + def test_copy(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + with self.assertRaises(NotImplementedError): + copy(channel_pair) + with self.assertRaises(NotImplementedError): + deepcopy(channel_pair) + + def test_init(self): + with self.assertRaises(ValueError): + self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 3)) + + def test_free_program(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + with self.assertRaises(KeyError): + channel_pair.free_program('test') + + program = self.TaborProgramMemory(np.array([1, 2], dtype=np.int64), None) + + channel_pair._segment_references = np.array([1, 3, 1, 0]) + channel_pair._known_programs['test'] = program + self.assertIs(channel_pair.free_program('test'), program) + + np.testing.assert_equal(channel_pair._segment_references, np.array([1, 2, 0, 0])) + + def test_upload_exceptions(self): + + wv = self.TableWaveform(1, [(0, 0.1, self.HoldInterpolationStrategy()), + (192, 0.1, self.HoldInterpolationStrategy())]) + + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + program = self.Loop(waveform=wv) + with self.assertRaises(ValueError): + channel_pair.upload('test', program, (1, 2, 3), (5, 6), (lambda x: x, lambda x: x)) + with self.assertRaises(ValueError): + channel_pair.upload('test', program, (1, 2), (5, 6, 'a'), (lambda x: x, lambda x: x)) + with self.assertRaises(ValueError): + channel_pair.upload('test', program, (1, 2), (3, 4), (lambda x: x,)) + + old = channel_pair._amplitude_offset_handling + with self.assertRaises(ValueError): + channel_pair._amplitude_offset_handling = 'invalid' + channel_pair.upload('test', program, (1, None), (None, None), (lambda x: x, lambda x: x)) + channel_pair._amplitude_offset_handling = old + + channel_pair._known_programs['test'] = self.TaborProgramMemory(np.array([0]), None) + with self.assertRaises(ValueError): + channel_pair.upload('test', program, (1, 2), (3, 4), (lambda x: x, lambda x: x)) + + def test_upload(self): + segments = np.array([1, 2, 3, 4, 5]) + segment_lengths = np.array([0, 16, 0, 16, 0], dtype=np.uint16).tolist() + + segment_references = np.array([1, 1, 2, 0, 1], dtype=np.uint32) + + w2s = np.array([-1, -1, 1, 2, -1], dtype=np.int64) + ta = np.array([True, False, False, False, True]) + ti = np.array([-1, 3, -1, -1, -1]) + + channels = (1, None) + markers = (None, None) + voltage_transformations = (lambda x: x, lambda x: x) + sample_rate = TimeType.from_fraction(1, 1) + + with mock.patch('qupulse.hardware.awgs.tabor.TaborProgram', specs=TaborProgram) as DummyTaborProgram: + tabor_program = DummyTaborProgram.return_value + tabor_program.get_sampled_segments.return_value = (segments, segment_lengths) + + program = self.Loop(waveform=self.DummyWaveform(duration=192)) + + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + channel_pair._segment_references = segment_references + + def dummy_find_place(segments_, segement_lengths_): + self.assertIs(segments_, segments) + self.assertIs(segment_lengths, segement_lengths_) + return w2s, ta, ti + + def dummy_upload_segment(segment_index, segment): + self.assertEqual(segment_index, 3) + self.assertEqual(segment, 2) + + def dummy_amend_segments(segments_): + np.testing.assert_equal(segments_, np.array([1, 5])) + return np.array([5, 6], dtype=np.int64) + + channel_pair._find_place_for_segments_in_memory = dummy_find_place + channel_pair._upload_segment = dummy_upload_segment + channel_pair._amend_segments = dummy_amend_segments + + channel_pair.upload('test', program, channels, markers, voltage_transformations) + + DummyTaborProgram.assert_called_once_with( + program, + channels=tuple(channels), + markers=markers, + device_properties=channel_pair.device.dev_properties, + sample_rate=sample_rate, + amplitudes=(.5, .5), + offsets=(0., 0.), + voltage_transformations=voltage_transformations + ) + + # the other references are increased in amend and upload segment method + np.testing.assert_equal(channel_pair._segment_references, np.array([1, 2, 3, 0, 1])) + + self.assertEqual(len(channel_pair._known_programs), 1) + np.testing.assert_equal(channel_pair._known_programs['test'].waveform_to_segment, + np.array([5, 3, 1, 2, 6], dtype=np.int64)) + + def test_upload_offset_handling(self): + + program = self.Loop(waveform=self.TableWaveform(1, [(0, 0.1, self.HoldInterpolationStrategy()), + (192, 0.1, self.HoldInterpolationStrategy())])) + + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + channels = (1, None) + markers = (None, None) + + tabor_program_kwargs = dict( + channels=channels, + markers=markers, + device_properties=channel_pair.device.dev_properties) + + test_sample_rate = TimeType.from_fraction(1, 1) + test_amplitudes = (channel_pair.device.amplitude(channel_pair._channels[0]) / 2, + channel_pair.device.amplitude(channel_pair._channels[1]) / 2) + test_offset = 0.1 + test_transform = (lambda x: x, lambda x: x) + + with patch('qupulse.hardware.awgs.tabor.TaborProgram', wraps=TaborProgram) as tabor_program_mock: + with patch.object(self.instrument, 'offset', return_value=test_offset) as offset_mock: + tabor_program_mock.get_sampled_segments = mock.Mock(wraps=tabor_program_mock.get_sampled_segments) + + channel_pair.amplitude_offset_handling = AWGAmplitudeOffsetHandling.CONSIDER_OFFSET + channel_pair.upload('test1', program, channels, markers, test_transform) + + tabor_program_mock.assert_called_once_with(program, **tabor_program_kwargs, + sample_rate=test_sample_rate, + amplitudes=test_amplitudes, + offsets=(test_offset, test_offset), + voltage_transformations=test_transform) + self.assertEqual([mock.call(1), mock.call(2)], offset_mock.call_args_list) + offset_mock.reset_mock() + tabor_program_mock.reset_mock() + + channel_pair.amplitude_offset_handling = AWGAmplitudeOffsetHandling.IGNORE_OFFSET + channel_pair.upload('test2', program, (1, None), (None, None), test_transform) + + tabor_program_mock.assert_called_once_with(program, **tabor_program_kwargs, + sample_rate=test_sample_rate, + amplitudes=test_amplitudes, + offsets=(0., 0.), + voltage_transformations=test_transform) + self.assertEqual([], offset_mock.call_args_list) + + def test_find_place_for_segments_in_memory(self): + def hash_based_on_dir(ch): + hash_list = [] + for d in dir(ch): + o = getattr(ch, d) + if isinstance(o, np.ndarray): + hash_list.append(hash(o.tobytes())) + else: + try: + hash_list.append(hash(o)) + except TypeError: + pass + return hash(tuple(hash_list)) + + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + # empty + segments = np.asarray([-5, -6, -7, -8, -9]) + segment_lengths = 192 + np.asarray([32, 16, 64, 32, 16]) + + hash_before = hash_based_on_dir(channel_pair) + + w2s, ta, ti = channel_pair._find_place_for_segments_in_memory(segments, segment_lengths) + self.assertEqual(w2s.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(ta.tolist(), [True, True, True, True, True]) + self.assertEqual(ti.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(hash_before, hash_based_on_dir(channel_pair)) + + # all new segments + channel_pair._segment_capacity = 192 + np.asarray([0, 16, 32, 16, 0], dtype=np.uint32) + channel_pair._segment_hashes = np.asarray([1, 2, 3, 4, 5], dtype=np.int64) + channel_pair._segment_references = np.asarray([1, 1, 1, 2, 1], dtype=np.int32) + hash_before = hash_based_on_dir(channel_pair) + + w2s, ta, ti = channel_pair._find_place_for_segments_in_memory(segments, segment_lengths) + self.assertEqual(w2s.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(ta.tolist(), [True, True, True, True, True]) + self.assertEqual(ti.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(hash_before, hash_based_on_dir(channel_pair)) + + # some known segments + channel_pair._segment_capacity = 192 + np.asarray([0, 16, 32, 64, 0, 16], dtype=np.uint32) + channel_pair._segment_hashes = np.asarray([1, 2, 3, -7, 5, -9], dtype=np.int64) + channel_pair._segment_references = np.asarray([1, 1, 1, 2, 1, 3], dtype=np.int32) + hash_before = hash_based_on_dir(channel_pair) + + w2s, ta, ti = channel_pair._find_place_for_segments_in_memory(segments, segment_lengths) + self.assertEqual(w2s.tolist(), [-1, -1, 3, -1, 5]) + self.assertEqual(ta.tolist(), [True, True, False, True, False]) + self.assertEqual(ti.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(hash_before, hash_based_on_dir(channel_pair)) + + # insert some segments with same length + channel_pair._segment_capacity = 192 + np.asarray([0, 16, 32, 64, 0, 16], dtype=np.uint32) + channel_pair._segment_hashes = np.asarray([1, 2, 3, 4, 5, 6], dtype=np.int64) + channel_pair._segment_references = np.asarray([1, 0, 1, 0, 1, 3], dtype=np.int32) + hash_before = hash_based_on_dir(channel_pair) + + w2s, ta, ti = channel_pair._find_place_for_segments_in_memory(segments, segment_lengths) + self.assertEqual(w2s.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(ta.tolist(), [True, False, False, True, True]) + self.assertEqual(ti.tolist(), [-1, 1, 3, -1, -1]) + self.assertEqual(hash_before, hash_based_on_dir(channel_pair)) + + # insert some segments with smaller length + channel_pair._segment_capacity = 192 + np.asarray([0, 80, 32, 64, 96, 16], dtype=np.uint32) + channel_pair._segment_hashes = np.asarray([1, 2, 3, 4, 5, 6], dtype=np.int64) + channel_pair._segment_references = np.asarray([1, 0, 1, 1, 0, 3], dtype=np.int32) + hash_before = hash_based_on_dir(channel_pair) + + w2s, ta, ti = channel_pair._find_place_for_segments_in_memory(segments, segment_lengths) + self.assertEqual(w2s.tolist(), [-1, -1, -1, -1, -1]) + self.assertEqual(ta.tolist(), [True, True, False, False, True]) + self.assertEqual(ti.tolist(), [-1, -1, 4, 1, -1]) + self.assertEqual(hash_before, hash_based_on_dir(channel_pair)) + + # mix everything + segments = np.asarray([-5, -6, -7, -8, -9, -10, -11]) + segment_lengths = 192 + np.asarray([32, 16, 64, 32, 16, 0, 0]) + + channel_pair._segment_capacity = 192 + np.asarray([0, 80, 32, 64, 32, 16], dtype=np.uint32) + channel_pair._segment_hashes = np.asarray([1, 2, 3, 4, -8, 6], dtype=np.int64) + channel_pair._segment_references = np.asarray([1, 0, 1, 0, 1, 0], dtype=np.int32) + hash_before = hash_based_on_dir(channel_pair) + + w2s, ta, ti = channel_pair._find_place_for_segments_in_memory(segments, segment_lengths) + self.assertEqual(w2s.tolist(), [-1, -1, -1, 4, -1, -1, -1]) + self.assertEqual(ta.tolist(), [False, True, False, False, True, True, True]) + self.assertEqual(ti.tolist(), [1, -1, 3, -1, -1, -1, -1]) + self.assertEqual(hash_before, hash_based_on_dir(channel_pair)) + + def test_upload_segment(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + self.reset_instrument_logs() + + channel_pair._segment_references = np.array([1, 2, 0, 1], dtype=np.uint32) + channel_pair._segment_capacity = 192 + np.array([0, 16, 32, 32], dtype=np.uint32) + channel_pair._segment_lengths = channel_pair._segment_capacity.copy() + + channel_pair._segment_hashes = np.array([1, 2, 3, 4], dtype=np.int64) + + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + segment = self.TaborSegment.from_sampled(np.ones(192+16, dtype=np.uint16), np.zeros(192+16, dtype=np.uint16), None, None) + segment_binary = segment.get_as_binary() + with self.assertRaises(ValueError): + channel_pair._upload_segment(3, segment) + + with self.assertRaises(ValueError): + channel_pair._upload_segment(0, segment) + + channel_pair._upload_segment(2, segment) + np.testing.assert_equal(channel_pair._segment_capacity, 192 + np.array([0, 16, 32, 32], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_lengths, 192 + np.array([0, 16, 16, 32], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_hashes, np.array([1, 2, hash(segment), 4], dtype=np.int64)) + + expected_commands = [':INST:SEL 1', ':INST:SEL 1', ':INST:SEL 1', + ':TRAC:DEF 3, 208', + ':TRAC:SEL 3', + ':TRAC:MODE COMB'] + expected_log = [((), dict(cmd_str=cmd, paranoia_level=channel_pair.internal_paranoia_level)) + for cmd in expected_commands] + self.assertAllCommandLogsEqual(expected_log) + + expected_send_binary_data_log = [(':TRAC:DATA', segment_binary, None)] + for device in self.instrument.all_devices: + np.testing.assert_equal(device._send_binary_data_calls, expected_send_binary_data_log) + + def test_amend_segments_flush(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + self.instrument.main_instrument.paranoia_level = 0 + self.instrument.main_instrument.logged_commands = [] + self.instrument.main_instrument.logged_queries = [] + self.instrument.main_instrument._send_binary_data_calls = [] + self.reset_instrument_logs() + + channel_pair._segment_references = np.array([1, 2, 0, 1], dtype=np.uint32) + channel_pair._segment_capacity = 192 + np.array([0, 16, 32, 32], dtype=np.uint32) + channel_pair._segment_lengths = 192 + np.array([0, 16, 16, 32], dtype=np.uint32) + + channel_pair._segment_hashes = np.array([1, 2, 3, 4], dtype=np.int64) + + data = np.ones(192, dtype=np.uint16) + segments = [self.TaborSegment.from_sampled(0*data, 1*data, None, None), + self.TaborSegment.from_sampled(1*data, 2*data, None, None)] + + channel_pair._amend_segments(segments) + + expected_references = np.array([1, 2, 0, 1, 1, 1], dtype=np.uint32) + expected_capacities = 192 + np.array([0, 16, 32, 32, 0, 0], dtype=np.uint32) + expected_lengths = 192 + np.array([0, 16, 16, 32, 0, 0], dtype=np.uint32) + expected_hashes = np.array([1, 2, 3, 4, hash(segments[0]), hash(segments[1])], dtype=np.int64) + + np.testing.assert_equal(channel_pair._segment_references, expected_references) + np.testing.assert_equal(channel_pair._segment_capacity, expected_capacities) + np.testing.assert_equal(channel_pair._segment_lengths, expected_lengths) + np.testing.assert_equal(channel_pair._segment_hashes, expected_hashes) + + expected_commands = [':INST:SEL 1', + ':TRAC:DEF 5,{}'.format(2 * 192 + 16), + ':TRAC:SEL 5', + ':TRAC:MODE COMB', + ':TRAC:DEF 3,208'] + expected_log = [((), dict(cmd_str=cmd, paranoia_level=channel_pair.internal_paranoia_level)) + for cmd in expected_commands] + self.assertAllCommandLogsEqual(expected_log) + #self.assertEqual(expected_log, instrument.main_instrument.logged_commands) + + expected_download_segment_calls = [(expected_capacities, ':SEGM:DATA', None)] + np.testing.assert_equal(self.instrument.main_instrument._download_segment_lengths_calls, expected_download_segment_calls) + + expected_bin_blob = self.make_combined_wave(segments) + expected_send_binary_data_log = [(':TRAC:DATA', expected_bin_blob, None)] + np.testing.assert_equal(self.instrument.main_instrument._send_binary_data_calls, expected_send_binary_data_log) + + def test_amend_segments_iter(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + self.instrument.paranoia_level = 0 + self.reset_instrument_logs() + + channel_pair._segment_references = np.array([1, 2, 0, 1], dtype=np.uint32) + channel_pair._segment_capacity = 192 + np.array([0, 16, 32, 32], dtype=np.uint32) + channel_pair._segment_lengths = 192 + np.array([0, 0, 16, 16], dtype=np.uint32) + + channel_pair._segment_hashes = np.array([1, 2, 3, 4], dtype=np.int64) + + data = np.ones(192, dtype=np.uint16) + segments = [self.TaborSegment.from_sampled(0*data, 1*data, None, None), + self.TaborSegment.from_sampled(1*data, 2*data, None, None)] + + indices = channel_pair._amend_segments(segments) + + expected_references = np.array([1, 2, 0, 1, 1, 1], dtype=np.uint32) + expected_capacities = 192 + np.array([0, 16, 32, 32, 0, 0], dtype=np.uint32) + expected_lengths = 192 + np.array([0, 0, 16, 16, 0, 0], dtype=np.uint32) + expected_hashes = np.array([1, 2, 3, 4, hash(segments[0]), hash(segments[1])], dtype=np.int64) + + np.testing.assert_equal(channel_pair._segment_references, expected_references) + np.testing.assert_equal(channel_pair._segment_capacity, expected_capacities) + np.testing.assert_equal(channel_pair._segment_lengths, expected_lengths) + np.testing.assert_equal(channel_pair._segment_hashes, expected_hashes) + + np.testing.assert_equal(indices, np.array([4, 5], dtype=np.int64)) + + expected_commands = [':INST:SEL 1', + ':TRAC:DEF 5,{}'.format(2 * 192 + 16), + ':TRAC:SEL 5', + ':TRAC:MODE COMB', + ':TRAC:DEF 5,192', + ':TRAC:DEF 6,192'] + expected_log = [((), dict(cmd_str=cmd, paranoia_level=channel_pair.internal_paranoia_level)) + for cmd in expected_commands] + self.assertAllCommandLogsEqual(expected_log) + + expected_download_segment_calls = [] + for device in self.instrument.all_devices: + self.assertEqual(device._download_segment_lengths_calls, expected_download_segment_calls) + + expected_bin_blob = self.make_combined_wave(segments) + expected_send_binary_data_log = [(':TRAC:DATA', expected_bin_blob, None)] + for device in self.instrument.all_devices: + np.testing.assert_equal(device._send_binary_data_calls, expected_send_binary_data_log) + + def test_cleanup(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + self.instrument.paranoia_level = 0 + self.instrument.logged_commands = [] + self.instrument.logged_queries = [] + self.instrument._send_binary_data_calls = [] + + channel_pair._segment_references = np.array([1, 2, 0, 1], dtype=np.uint32) + channel_pair._segment_capacity = 192 + np.array([0, 16, 32, 32], dtype=np.uint32) + channel_pair._segment_lengths = 192 + np.array([0, 0, 16, 16], dtype=np.uint32) + channel_pair._segment_hashes = np.array([1, 2, 3, 4], dtype=np.int64) + + channel_pair.cleanup() + np.testing.assert_equal(channel_pair._segment_references, np.array([1, 2, 0, 1], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_capacity, 192 + np.array([0, 16, 32, 32], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_lengths, 192 + np.array([0, 0, 16, 16], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_hashes, np.array([1, 2, 3, 4], dtype=np.int64)) + + channel_pair._segment_references = np.array([1, 2, 0, 1, 0], dtype=np.uint32) + channel_pair._segment_capacity = 192 + np.array([0, 16, 32, 32, 32], dtype=np.uint32) + channel_pair._segment_lengths = 192 + np.array([0, 0, 16, 16, 0], dtype=np.uint32) + channel_pair._segment_hashes = np.array([1, 2, 3, 4, 5], dtype=np.int64) + + channel_pair.cleanup() + np.testing.assert_equal(channel_pair._segment_references, np.array([1, 2, 0, 1], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_capacity, 192 + np.array([0, 16, 32, 32], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_lengths, 192 + np.array([0, 0, 16, 16], dtype=np.uint32)) + np.testing.assert_equal(channel_pair._segment_hashes, np.array([1, 2, 3, 4], dtype=np.int64)) + + def test_remove(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + + calls = [] + + program_name = 'test' + def dummy_free_program(name): + self.assertIs(name, program_name) + calls.append('free_program') + + def dummy_cleanup(): + calls.append('cleanup') + + channel_pair.cleanup = dummy_cleanup + channel_pair.free_program = dummy_free_program + + channel_pair.remove(program_name) + self.assertEqual(calls, ['free_program', 'cleanup']) + + def test_change_armed_program_single_sequence(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + self.instrument.paranoia_level = 0 + self.instrument.logged_commands = [] + self.instrument.logged_queries = [] + self.reset_instrument_logs() + + advanced_sequencer_table = [(2, 1, 0)] + sequencer_tables = [[(3, 0, 0), (2, 1, 0), (1, 0, 0), (1, 2, 0), (1, 3, 0)]] + w2s = np.array([2, 5, 3, 1]) + + sequencer_tables = self.to_new_sequencer_tables(sequencer_tables) + advanced_sequencer_table = self.to_new_advanced_sequencer_table(advanced_sequencer_table) + + expected_sequencer_table = [(3, 3, 0), (2, 6, 0), (1, 3, 0), (1, 4, 0), (1, 2, 0)] + + program = DummyTaborProgramClass(advanced_sequencer_table=advanced_sequencer_table, + sequencer_tables=sequencer_tables, + waveform_mode=self.TaborSequencing.SINGLE)(None, None, None, None) + + channel_pair._known_programs['test'] = self.TaborProgramMemory(w2s, program) + + channel_pair.change_armed_program('test') + + expected_adv_seq_table_log = [([(1, 1, 1), (2, 2, 0), (1, 1, 0)], ':ASEQ:DATA', None)] + expected_sequencer_table_log = [((sequencer_table,), dict(pref=':SEQ:DATA', paranoia_level=None)) + for sequencer_table in [channel_pair._idle_sequence_table, + expected_sequencer_table]] + + for device in self.instrument.all_devices: + self.assertEqual(device._download_adv_seq_table_calls, expected_adv_seq_table_log) + self.assertEqual(device._download_sequencer_table_calls, expected_sequencer_table_log) + + def test_change_armed_program_single_waveform(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + self.instrument.paranoia_level = 0 + self.instrument.logged_commands = [] + self.instrument.logged_queries = [] + self.reset_instrument_logs() + + advanced_sequencer_table = [(1, 1, 0)] + sequencer_tables = [[(10, 0, 0)]] + w2s = np.array([4]) + + sequencer_tables = self.to_new_sequencer_tables(sequencer_tables) + advanced_sequencer_table = self.to_new_advanced_sequencer_table(advanced_sequencer_table) + + expected_sequencer_table = [(10, 5, 0), (1, 1, 0), (1, 1, 0)] + + program = DummyTaborProgramClass(advanced_sequencer_table=advanced_sequencer_table, + sequencer_tables=sequencer_tables, + waveform_mode=self.TaborSequencing.SINGLE)(None, None, None, None) + + channel_pair._known_programs['test'] = self.TaborProgramMemory(w2s, program) + + channel_pair.change_armed_program('test') + + expected_adv_seq_table_log = [([(1, 1, 1), (1, 2, 0), (1, 1, 0)], ':ASEQ:DATA', None)] + expected_sequencer_table_log = [((sequencer_table,), dict(pref=':SEQ:DATA', paranoia_level=None)) + for sequencer_table in [channel_pair._idle_sequence_table, + expected_sequencer_table]] + + for device in self.instrument.all_devices: + self.assertEqual(device._download_adv_seq_table_calls, expected_adv_seq_table_log) + self.assertEqual(device._download_sequencer_table_calls, expected_sequencer_table_log) + + def test_change_armed_program_advanced_sequence(self): + channel_pair = self.TaborChannelPair(self.instrument, identifier='asd', channels=(1, 2)) + # prevent entering and exiting configuration mode + channel_pair._configuration_guard_count = 2 + + self.instrument.paranoia_level = 0 + self.instrument.logged_commands = [] + self.instrument.logged_queries = [] + self.instrument._send_binary_data_calls = [] + + self.reset_instrument_logs() + + advanced_sequencer_table = [(2, 1, 0), (3, 2, 0)] + sequencer_tables = [[(3, 0, 0), (2, 1, 0), (1, 0, 0), (1, 2, 0), (1, 3, 0)], + [(4, 1, 0), (2, 1, 0), (1, 0, 0), (1, 2, 0), (1, 3, 0)]] + wf_idx2seg_idx = np.array([2, 5, 3, 1]) + + sequencer_tables = self.to_new_sequencer_tables(sequencer_tables) + advanced_sequencer_table = self.to_new_advanced_sequencer_table(advanced_sequencer_table) + + expected_sequencer_tables = [[(3, 3, 0), (2, 6, 0), (1, 3, 0), (1, 4, 0), (1, 2, 0)], + [(4, 6, 0), (2, 6, 0), (1, 3, 0), (1, 4, 0), (1, 2, 0)]] + + program = DummyTaborProgramClass(advanced_sequencer_table=advanced_sequencer_table, + sequencer_tables=sequencer_tables, + waveform_mode=self.TaborSequencing.ADVANCED)(None, None, None, None) + + channel_pair._known_programs['test'] = self.TaborProgramMemory(wf_idx2seg_idx, program) + + channel_pair.change_armed_program('test') + + expected_adv_seq_table_log = [([(1, 1, 1), (2, 2, 0), (3, 3, 0)], ':ASEQ:DATA', None)] + expected_sequencer_table_log = [((sequencer_table,), dict(pref=':SEQ:DATA', paranoia_level=None)) + for sequencer_table in [channel_pair._idle_sequence_table] + + expected_sequencer_tables] + + for device in self.instrument.all_devices: + self.assertEqual(device._download_adv_seq_table_calls, expected_adv_seq_table_log) + self.assertEqual(device._download_sequencer_table_calls, expected_sequencer_table_log) diff --git a/tests/hardware/feature_awg/tabor_new_driver_exex_test.py b/tests/hardware/feature_awg/tabor_new_driver_exex_test.py new file mode 100644 index 000000000..c4cf4489e --- /dev/null +++ b/tests/hardware/feature_awg/tabor_new_driver_exex_test.py @@ -0,0 +1,136 @@ +import unittest + + +with_alazar = True + +def get_pulse(): + from qupulse.pulses import TablePulseTemplate as TPT, SequencePulseTemplate as SPT, RepetitionPulseTemplate as RPT + + ramp = TPT(identifier='ramp', channels={'out', 'trigger'}) + ramp.add_entry(0, 'start', channel='out') + ramp.add_entry('duration', 'stop', 'linear', channel='out') + + ramp.add_entry(0, 1, channel='trigger') + ramp.add_entry('duration', 1, 'hold', channel='trigger') + + ramp.add_measurement_declaration('meas', 0, 'duration') + + base = SPT([(ramp, dict(start='min', stop='max', duration='tau/3'), dict(meas='A')), + (ramp, dict(start='max', stop='max', duration='tau/3'), dict(meas='B')), + (ramp, dict(start='max', stop='min', duration='tau/3'), dict(meas='C'))], {'min', 'max', 'tau'}) + + repeated = RPT(base, 'n') + + root = SPT([repeated, repeated, repeated], {'min', 'max', 'tau', 'n'}) + + return root + + +def get_alazar_config(): + from atsaverage import alazar + from atsaverage.config import ScanlineConfiguration, CaptureClockConfiguration, EngineTriggerConfiguration,\ + TRIGInputConfiguration, InputConfiguration + + trig_level = int((5 + 0.4) / 10. * 255) + assert 0 <= trig_level < 256 + + config = ScanlineConfiguration() + config.triggerInputConfiguration = TRIGInputConfiguration(triggerRange=alazar.TriggerRangeID.etr_5V) + config.triggerConfiguration = EngineTriggerConfiguration(triggerOperation=alazar.TriggerOperation.J, + triggerEngine1=alazar.TriggerEngine.J, + triggerSource1=alazar.TriggerSource.external, + triggerSlope1=alazar.TriggerSlope.positive, + triggerLevel1=trig_level, + triggerEngine2=alazar.TriggerEngine.K, + triggerSource2=alazar.TriggerSource.disable, + triggerSlope2=alazar.TriggerSlope.positive, + triggerLevel2=trig_level) + config.captureClockConfiguration = CaptureClockConfiguration(source=alazar.CaptureClockType.internal_clock, + samplerate=alazar.SampleRateID.rate_100MSPS) + config.inputConfiguration = 4*[InputConfiguration(input_range=alazar.InputRangeID.range_1_V)] + config.totalRecordSize = 0 + + assert config.totalRecordSize == 0 + + return config + +def get_operations(): + from atsaverage.operations import Downsample + + return [Downsample(identifier='DS_A', maskID='A'), + Downsample(identifier='DS_B', maskID='B'), + Downsample(identifier='DS_C', maskID='C'), + Downsample(identifier='DS_D', maskID='D')] + +def get_window(card): + from atsaverage.gui import ThreadedStatusWindow + window = ThreadedStatusWindow(card) + window.start() + return window + + +class TaborTests(unittest.TestCase): + @unittest.skip + def test_all(self): + from qupulse.hardware.feature_awg.tabor import TaborChannelTuple, TaborDevice + #import warnings + tawg = TaborDevice(r'USB0::0x168C::0x2184::0000216488::INSTR') + tchannelpair = TaborChannelTuple(tawg, (1, 2), 'TABOR_AB') + tawg.paranoia_level = 2 + + #warnings.simplefilter('error', Warning) + + from qupulse.hardware.setup import HardwareSetup, PlaybackChannel, MarkerChannel + hardware_setup = HardwareSetup() + + hardware_setup.set_channel('TABOR_A', PlaybackChannel(tchannelpair, 0)) + hardware_setup.set_channel('TABOR_B', PlaybackChannel(tchannelpair, 1)) + hardware_setup.set_channel('TABOR_A_MARKER', MarkerChannel(tchannelpair, 0)) + hardware_setup.set_channel('TABOR_B_MARKER', MarkerChannel(tchannelpair, 1)) + + if with_alazar: + from qupulse.hardware.dacs.alazar import AlazarCard + import atsaverage.server + + if not atsaverage.server.Server.default_instance.running: + atsaverage.server.Server.default_instance.start(key=b'guest') + + import atsaverage.core + + alazar = AlazarCard(atsaverage.core.getLocalCard(1, 1)) + alazar.register_mask_for_channel('A', 0) + alazar.register_mask_for_channel('B', 0) + alazar.register_mask_for_channel('C', 0) + alazar.config = get_alazar_config() + + alazar.register_operations('test', get_operations()) + window = get_window(atsaverage.core.getLocalCard(1, 1)) + hardware_setup.register_dac(alazar) + + repeated = get_pulse() + + from qupulse.pulses.sequencing import Sequencer + + sequencer = Sequencer() + sequencer.push(repeated, + parameters=dict(n=1000, min=-0.5, max=0.5, tau=192*3), + channel_mapping={'out': 'TABOR_A', 'trigger': 'TABOR_A_MARKER'}, + window_mapping=dict(A='A', B='B', C='C')) + instruction_block = sequencer.build() + + hardware_setup.register_program('test', instruction_block) + + if with_alazar: + from atsaverage.masks import PeriodicMask + m = PeriodicMask() + m.identifier = 'D' + m.begin = 0 + m.end = 1 + m.period = 1 + m.channel = 0 + alazar._registered_programs['test'].masks.append(m) + + hardware_setup.arm_program('test') + + d = 1 + diff --git a/tests/hardware/feature_awg/tabor_new_driver_simulator_based_tests.py b/tests/hardware/feature_awg/tabor_new_driver_simulator_based_tests.py new file mode 100644 index 000000000..3f2200245 --- /dev/null +++ b/tests/hardware/feature_awg/tabor_new_driver_simulator_based_tests.py @@ -0,0 +1,283 @@ +import unittest +import subprocess +import time +import platform +import os +from typing import List, Tuple, Optional, Any + +import pytabor +import numpy as np + +from qupulse._program.tabor import TableDescription, TableEntry +from qupulse.hardware.feature_awg.features import DeviceControl, VoltageRange, ProgramManagement, SCPI, VolatileParameters +from qupulse.hardware.feature_awg.tabor import TaborDevice, TaborSegment +from qupulse.utils.types import TimeType + + +class TaborSimulatorManager: + def __init__(self, + simulator_executable='WX2184C.exe', + simulator_path=os.path.realpath(os.path.dirname(__file__))): + self.simulator_executable = simulator_executable + self.simulator_path = simulator_path + + self.started_simulator = False + + self.simulator_process = None + self.instrument: TaborDevice = None + + def kill_running_simulators(self): + command = 'Taskkill', '/IM {simulator_executable}'.format(simulator_executable=self.simulator_executable) + try: + subprocess.run([command], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except FileNotFoundError: + pass + + @property + def simulator_full_path(self): + return os.path.join(self.simulator_path, self.simulator_executable) + + def start_simulator(self, try_connecting_to_existing_simulator=True, max_wait_time=30): + if try_connecting_to_existing_simulator: + if pytabor.open_session('127.0.0.1') is not None: + return + + if not os.path.isfile(self.simulator_full_path): + raise RuntimeError('Cannot locate simulator executable.') + + self.kill_running_simulators() + + self.simulator_process = subprocess.Popen([self.simulator_full_path, '/switch-on', '/gui-in-tray']) + + start = time.time() + while pytabor.open_session('127.0.0.1') is None: + if self.simulator_process.returncode: + raise RuntimeError('Simulator exited with return code {}'.format(self.simulator_process.returncode)) + if time.time() - start > max_wait_time: + raise RuntimeError('Could not connect to simulator') + time.sleep(0.1) + + def connect(self) -> TaborDevice: + self.instrument = TaborDevice("testDevice", + "127.0.0.1", + reset=True, + paranoia_level=2) + + if self.instrument.main_instrument.visa_inst is None: + raise RuntimeError('Could not connect to simulator') + return self.instrument + + def disconnect(self): + for device in self.instrument.all_devices: + device.close() + self.instrument = None + + def __del__(self): + if self.started_simulator and self.simulator_process: + self.simulator_process.kill() + + +@unittest.skipIf(platform.system() != 'Windows', "Simulator currently only available on Windows :(") +class TaborSimulatorBasedTest(unittest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.instrument: TaborDevice = None + + @classmethod + def setUpClass(cls): + cls.simulator_manager = TaborSimulatorManager('WX2184C.exe', os.path.dirname(__file__)) + try: + cls.simulator_manager.start_simulator() + except RuntimeError as err: + raise unittest.SkipTest(*err.args) from err + + @classmethod + def tearDownClass(cls): + del cls.simulator_manager + + def setUp(self): + self.instrument = self.simulator_manager.connect() + + def tearDown(self): + self.instrument[DeviceControl].reset() + self.simulator_manager.disconnect() + + @staticmethod + def to_new_sequencer_tables(sequencer_tables: List[List[Tuple[int, int, int]]] + ) -> List[List[Tuple[TableDescription, Optional[Any]]]]: + return [[(TableDescription(*entry), None) for entry in sequencer_table] + for sequencer_table in sequencer_tables] + + @staticmethod + def to_new_advanced_sequencer_table(advanced_sequencer_table: List[Tuple[int, int, int]]) -> List[TableDescription]: + return [TableDescription(*entry) for entry in advanced_sequencer_table] + + +class TaborAWGRepresentationTests(TaborSimulatorBasedTest): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def test_sample_rate(self): + for ch_tuple in self.instrument.channel_tuples: + self.assertIsInstance(ch_tuple.sample_rate, TimeType) + + self.instrument[SCPI].send_cmd(':INST:SEL 1') + self.instrument[SCPI].send_cmd(':FREQ:RAST 2.3e9') + + self.assertEqual(2300000000, self.instrument.channel_tuples[0].sample_rate) + + def test_amplitude(self): + for channel in self.instrument.channels: + self.assertIsInstance(channel[VoltageRange].amplitude, float) + + self.instrument[SCPI].send_cmd(':INST:SEL 1; :OUTP:COUP DC') + self.instrument[SCPI].send_cmd(':VOLT 0.7') + + self.assertAlmostEqual(.7, self.instrument.channels[0][VoltageRange].amplitude) + + def test_select_marker(self): + with self.assertRaises(IndexError): + self.instrument.marker_channels[6]._select() + + self.instrument.marker_channels[1]._select() + selected = self.instrument[SCPI].send_query(':SOUR:MARK:SEL?') + self.assertEqual(selected, '2') + + self.instrument.marker_channels[0]._select() + selected = self.instrument[SCPI].send_query(':SOUR:MARK:SEL?') + self.assertEqual(selected, '1') + + def test_select_channel(self): + with self.assertRaises(IndexError): + self.instrument.channels[6]._select() + + self.instrument.channels[0]._select() + self.assertEqual(self.instrument[SCPI].send_query(':INST:SEL?'), '1') + + self.instrument.channels[3]._select() + self.assertEqual(self.instrument[SCPI].send_query(':INST:SEL?'), '4') + + +class TaborMemoryReadTests(TaborSimulatorBasedTest): + def setUp(self): + super().setUp() + + ramp_up = np.linspace(0, 2**14-1, num=192, dtype=np.uint16) + ramp_down = ramp_up[::-1] + zero = np.ones(192, dtype=np.uint16) * 2**13 + sine = ((np.sin(np.linspace(0, 2*np.pi, 192+64)) + 1) / 2 * (2**14 - 1)).astype(np.uint16) + + self.segments = [TaborSegment.from_sampled(ramp_up, ramp_up, None, None), + TaborSegment.from_sampled(ramp_down, zero, None, None), + TaborSegment.from_sampled(sine, sine, None, None)] + + self.zero_segment = TaborSegment.from_sampled(zero, zero, None, None) + + # program 1 + self.sequence_tables_raw = [[(10, 0, 0), (10, 1, 0), (10, 0, 0), (10, 1, 0)], + [(1, 0, 0), (1, 1, 0), (1, 0, 0), (1, 1, 0)]] + self.advanced_sequence_table = [(1, 1, 0), (1, 2, 0)] + + self.sequence_tables = self.to_new_sequencer_tables(self.sequence_tables_raw) + self.advanced_sequence_table = self.to_new_advanced_sequencer_table(self.advanced_sequence_table) + + self.channel_pair = self.instrument.channel_tuples[0] + + def arm_program(self, sequencer_tables, advanced_sequencer_table, mode, waveform_to_segment_index): + class DummyProgram: + @staticmethod + def get_sequencer_tables(): + return sequencer_tables + + @staticmethod + def get_advanced_sequencer_table(): + return advanced_sequencer_table + + @staticmethod + def update_volatile_parameters(parameters): + modifications = {1: TableEntry(repetition_count=5, element_number=2, jump_flag=0), + (0, 1): TableDescription(repetition_count=50, element_id=1, jump_flag=0)} + return modifications + + markers = (None, None) + channels = (1, 2) + + waveform_mode = mode + + self.channel_pair._known_programs['dummy_program'] = (waveform_to_segment_index, DummyProgram) + self.channel_pair[ProgramManagement]._change_armed_program('dummy_program') + + def test_read_waveforms(self): + self.channel_pair._amend_segments(self.segments) + + waveforms = self.channel_pair.read_waveforms() + + segments = [TaborSegment.from_binary_segment(waveform) + for waveform in waveforms] + + expected = [self.zero_segment, *self.segments] + + for ex, r in zip(expected, segments): + ex1, ex2 = ex.data_a, ex.data_b + r1, r2 = r.data_a, r.data_b + np.testing.assert_equal(ex1, r1) + np.testing.assert_equal(ex2, r2) + + self.assertEqual(expected, segments) + + def test_read_sequence_tables(self): + self.channel_pair._amend_segments(self.segments) + self.arm_program(self.sequence_tables, self.advanced_sequence_table, None, np.asarray([1, 2])) + + sequence_tables = self.channel_pair.read_sequence_tables() + + actual_sequence_tables = [self.channel_pair[ProgramManagement]._idle_sequence_table] + [[(rep, index+2, jump) + for rep, index, jump in table] + for table in self.sequence_tables_raw] + + expected = list(tuple(np.asarray(d) + for d in zip(*table)) + for table in actual_sequence_tables) + + np.testing.assert_equal(sequence_tables, expected) + + def test_read_advanced_sequencer_table(self): + self.channel_pair._amend_segments(self.segments) + self.arm_program(self.sequence_tables, self.advanced_sequence_table, None, np.asarray([1, 2])) + + actual_advanced_table = [(1, 1, 0)] + [(rep, idx + 1, jmp) for rep, idx, jmp in self.advanced_sequence_table] + + expected = list(np.asarray(d) + for d in zip(*actual_advanced_table)) + + advanced_table = self.channel_pair.read_advanced_sequencer_table() + np.testing.assert_equal(advanced_table, expected) + + def test_set_volatile_parameter(self): + self.channel_pair._amend_segments(self.segments) + self.arm_program(self.sequence_tables, self.advanced_sequence_table, None, np.asarray([1, 2])) + + para = {'a': 5} + actual_sequence_tables = [self.channel_pair[ProgramManagement]._idle_sequence_table] + [[(rep, index + 2, jump) + for rep, index, jump in table] + for table in self.sequence_tables_raw] + + actual_advanced_table = [(1, 1, 0)] + [(rep, idx + 1, jmp) for rep, idx, jmp in self.advanced_sequence_table] + + self.channel_pair[VolatileParameters].set_volatile_parameters('dummy_program', parameters=para) + + actual_sequence_tables[1][1] = (50, 3, 0) + actual_advanced_table[2] = (5, 3, 0) + + sequence_table = self.channel_pair.read_sequence_tables() + expected = list(tuple(np.asarray(d) + for d in zip(*table)) + for table in actual_sequence_tables) + np.testing.assert_equal(sequence_table, expected) + + advanced_table = self.channel_pair.read_advanced_sequencer_table() + expected = list(np.asarray(d) + for d in zip(*actual_advanced_table)) + np.testing.assert_equal(advanced_table, expected) diff --git a/tests/hardware/feature_awg/tabor_new_driver_tests.py b/tests/hardware/feature_awg/tabor_new_driver_tests.py new file mode 100644 index 000000000..85421b042 --- /dev/null +++ b/tests/hardware/feature_awg/tabor_new_driver_tests.py @@ -0,0 +1,43 @@ +import unittest + +from qupulse.hardware.feature_awg.tabor import with_configuration_guard + + +class ConfigurationGuardTest(unittest.TestCase): + class DummyChannelPair: + def __init__(self, test_obj: unittest.TestCase): + self.test_obj = test_obj + self._configuration_guard_count = 0 + self.is_in_config_mode = False + + def _enter_config_mode(self): + self.test_obj.assertFalse(self.is_in_config_mode) + self.test_obj.assertEqual(self._configuration_guard_count, 0) + self.is_in_config_mode = True + + def _exit_config_mode(self): + self.test_obj.assertTrue(self.is_in_config_mode) + self.test_obj.assertEqual(self._configuration_guard_count, 0) + self.is_in_config_mode = False + + @with_configuration_guard + def guarded_method(self, counter=5, throw=False): + self.test_obj.assertTrue(self.is_in_config_mode) + if counter > 0: + return self.guarded_method(counter - 1, throw) + 1 + if throw: + raise RuntimeError() + return 0 + + def test_config_guard(self): + channel_pair = ConfigurationGuardTest.DummyChannelPair(self) + + for i in range(5): + self.assertEqual(channel_pair.guarded_method(i), i) + + with self.assertRaises(RuntimeError): + channel_pair.guarded_method(1, True) + + self.assertFalse(channel_pair.is_in_config_mode) + + From 26e94ea8a943d421a445084ee691f6b5d31e79bc Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Mon, 8 Mar 2021 18:19:32 +0100 Subject: [PATCH 2/2] Fix imports, tests and add missing for adapter to old interface --- qupulse/hardware/feature_awg/base.py | 6 +- .../feature_awg/channel_tuple_wrapper.py | 35 ++++++---- .../feature_awg/awg_new_driver_base_tests.py | 9 ++- .../channel_tuple_wrapper_tests.py | 67 +++++++++++++++++++ 4 files changed, 98 insertions(+), 19 deletions(-) create mode 100644 tests/hardware/feature_awg/channel_tuple_wrapper_tests.py diff --git a/qupulse/hardware/feature_awg/base.py b/qupulse/hardware/feature_awg/base.py index 259442345..8d35a6f8e 100644 --- a/qupulse/hardware/feature_awg/base.py +++ b/qupulse/hardware/feature_awg/base.py @@ -2,7 +2,7 @@ from typing import Optional import weakref -from qupulse.hardware.feature_awg import channel_tuple_wrapper +from qupulse.hardware.awgs.base import AWG from qupulse.hardware.feature_awg.base_features import Feature, FeatureAble from qupulse.utils.types import Collection @@ -83,8 +83,8 @@ def __init__(self, idn: int): @property @abstractmethod - def channel_tuple_adapter(self) -> channel_tuple_wrapper: - pass + def channel_tuple_adapter(self) -> AWG: + """Return old interface adapter object. See channel_tuple_wrapper for details.""" @property def idn(self) -> int: diff --git a/qupulse/hardware/feature_awg/channel_tuple_wrapper.py b/qupulse/hardware/feature_awg/channel_tuple_wrapper.py index ad8d6597c..10212153b 100644 --- a/qupulse/hardware/feature_awg/channel_tuple_wrapper.py +++ b/qupulse/hardware/feature_awg/channel_tuple_wrapper.py @@ -1,7 +1,9 @@ from typing import Tuple, Optional, Callable, Set +from qupulse import ChannelID from qupulse._program._loop import Loop from qupulse.hardware.feature_awg.base import AWGChannelTuple +from qupulse.hardware.feature_awg.features import ProgramManagement, VolatileParameters from qupulse.hardware.awgs.base import AWG @@ -14,42 +16,47 @@ def __copy__(self) -> None: pass def __init__(self, channel_tuple: AWGChannelTuple): + super().__init__(channel_tuple.name) self._channel_tuple = channel_tuple - def identifier(self) -> str: - return self._channel_tuple.name - + @property def num_channels(self) -> int: - return self._channel_tuple.num_channels + return len(self._channel_tuple.channels) + @property def num_markers(self) -> int: - return self._channel_tuple.num_markers + return len(self._channel_tuple.marker_channels) def upload(self, name: str, program: Loop, - channels: Tuple[Optional["ChannelID"], ...], - markers: Tuple[Optional["ChannelID"], ...], + channels: Tuple[Optional[ChannelID], ...], + markers: Tuple[Optional[ChannelID], ...], voltage_transformation: Tuple[Optional[Callable], ...], force: bool = False) -> None: - from qupulse.hardware.feature_awg.tabor import ProgramManagement - return self._channel_tuple[ProgramManagement].upload(name, program, channels, markers, - voltage_transformation, force) + return self._channel_tuple[ProgramManagement].upload(name=name, program=program, + channels=channels, + marker_channels=markers, + voltage_transformation=voltage_transformation, + repetition_mode=None, + force=force) def remove(self, name: str) -> None: - from qupulse.hardware.feature_awg.tabor import ProgramManagement return self._channel_tuple[ProgramManagement].remove(name) def clear(self) -> None: - from qupulse.hardware.feature_awg.tabor import ProgramManagement return self._channel_tuple[ProgramManagement].clear() def arm(self, name: Optional[str]) -> None: - from qupulse.hardware.feature_awg.tabor import ProgramManagement return self._channel_tuple[ProgramManagement].arm(name) + @property def programs(self) -> Set[str]: - from qupulse.hardware.feature_awg.tabor import ProgramManagement return self._channel_tuple[ProgramManagement].programs + @property def sample_rate(self) -> float: return self._channel_tuple.sample_rate + + def set_volatile_parameters(self, program_name: str, parameters): + self._channel_tuple[VolatileParameters].set_volatile_parameters(program_name, parameters) + diff --git a/tests/hardware/feature_awg/awg_new_driver_base_tests.py b/tests/hardware/feature_awg/awg_new_driver_base_tests.py index c3dbadf7f..b64eca876 100644 --- a/tests/hardware/feature_awg/awg_new_driver_base_tests.py +++ b/tests/hardware/feature_awg/awg_new_driver_base_tests.py @@ -7,7 +7,7 @@ from qupulse.hardware.feature_awg import channel_tuple_wrapper from qupulse.hardware.feature_awg.base import AWGDevice, AWGChannel, AWGChannelTuple, AWGMarkerChannel from qupulse.hardware.feature_awg.features import ChannelSynchronization, ProgramManagement, VoltageRange, \ - AmplitudeOffsetHandling, RepetitionMode + AmplitudeOffsetHandling, RepetitionMode, VolatileParameters from qupulse.utils.types import Collection @@ -25,6 +25,11 @@ def synchronize_channels(self, group_size: int) -> None: self._parent.synchronize_channels(group_size) +class TestVolatileParameters(VolatileParameters): + def set_volatile_parameters(self, program_name: str, parameters) -> None: + raise NotImplementedError() + + class TestVoltageRangeFeature(VoltageRange): def __init__(self, channel: "TestAWGChannel"): super().__init__() @@ -175,7 +180,7 @@ def __init__(self, idn: int, device: TestAWGDevice, channels: Iterable["TestAWGC @property def channel_tuple_adapter(self) -> channel_tuple_wrapper: - pass + return channel_tuple_wrapper.ChannelTupleAdapter(self) @property def sample_rate(self) -> float: diff --git a/tests/hardware/feature_awg/channel_tuple_wrapper_tests.py b/tests/hardware/feature_awg/channel_tuple_wrapper_tests.py new file mode 100644 index 000000000..7c36ad4bc --- /dev/null +++ b/tests/hardware/feature_awg/channel_tuple_wrapper_tests.py @@ -0,0 +1,67 @@ +from unittest import TestCase, mock + +from qupulse.hardware.feature_awg.features import ProgramManagement, VolatileParameters + +from tests.hardware.feature_awg.awg_new_driver_base_tests import TestAWGChannelTuple, TestAWGDevice, TestAWGChannel, TestVolatileParameters + + +class ChannelTupleAdapterTest(TestCase): + def setUp(self): + self.device = TestAWGDevice("device") + self.channels = [TestAWGChannel(0, self.device), TestAWGChannel(1, self.device)] + self.tuple = TestAWGChannelTuple(0, device=self.device, channels=self.channels) + + def test_simple_properties(self): + adapter = self.tuple.channel_tuple_adapter + self.assertEqual(adapter.num_channels, len(self.channels)) + self.assertEqual(adapter.num_markers, 0) + self.assertEqual(adapter.identifier, self.tuple.name) + self.assertEqual(adapter.sample_rate, self.tuple.sample_rate) + + def test_upload(self): + adapter = self.tuple.channel_tuple_adapter + + upload_kwargs = dict(name="upload_test", + program=mock.Mock(), + channels=('A', None), + voltage_transformation=(lambda x:x, lambda x:x**2), + markers=(), force=True) + + expected_kwargs = {**upload_kwargs, 'repetition_mode': None} + expected_kwargs['marker_channels'] = expected_kwargs.pop('markers') + + with mock.patch.object(self.tuple[ProgramManagement], 'upload') as upload_mock: + adapter.upload(**upload_kwargs) + upload_mock.assert_called_once_with(**expected_kwargs) + + def test_arm(self): + adapter = self.tuple.channel_tuple_adapter + with mock.patch.object(self.tuple[ProgramManagement], 'arm') as arm_mock: + adapter.arm('test_prog') + arm_mock.assert_called_once_with('test_prog') + + def test_remove(self): + adapter = self.tuple.channel_tuple_adapter + with mock.patch.object(self.tuple[ProgramManagement], 'remove') as remove_mock: + adapter.remove('test_prog') + remove_mock.assert_called_once_with('test_prog') + + def test_clear(self): + adapter = self.tuple.channel_tuple_adapter + with mock.patch.object(self.tuple[ProgramManagement], 'clear') as clear_mock: + adapter.clear() + clear_mock.assert_called_once_with() + + def test_programs(self): + adapter = self.tuple.channel_tuple_adapter + with mock.patch.object(type(self.tuple[ProgramManagement]), 'programs', new_callable=mock.PropertyMock): + self.assertIs(self.tuple[ProgramManagement].programs, adapter.programs) + + def test_set_volatile_parameters(self): + adapter = self.tuple.channel_tuple_adapter + + self.tuple.add_feature(TestVolatileParameters(self.tuple)) + + with mock.patch.object(self.tuple[VolatileParameters], 'set_volatile_parameters') as set_volatile_parameters_mock: + adapter.set_volatile_parameters('wurst', {'a': 5.}) + set_volatile_parameters_mock.assert_called_once_with('wurst', {'a': 5.})