In [1]:
import configparser
import typing as t
import re
from functools import wraps

from zhinst.toolkit import Session
from zhinst.toolkit.nodetree.node import Node

In [144]:
session = Session('localhost')
device = session.connect_device('dev12018')

In [None]:
IGNORED_NODES = {
    'GENERAL': {},
    'MODULES': {
        'AWG': {
            "/ELF/CHECKSUM"
        }
    }
}

# wave needs x_name, x_unit time and value

In [27]:
def general_settings(config, device):
    section = 'General settings'
    config.add_section(section)
    config.set(section, "name", f'Zurich Instruments {device.device_type}MH')
    config.set(section, "version", f'{device.system.fwrevision()}')
    config.set(section, "driver_path", f'Zurich_Instruments_{device.device_type}MH')
    config.set(section, "interface", f'Other')
    config.set(section, "startup", f'Do nothing')

In [164]:
def enum_description(value: str) -> t.Tuple[str, str]:
    v = value.split(': ')
    if len(v) > 1:
        v2 = v[0].split(',')
        return v2[0].strip('"'), v[-1]
    return "", v[0]

def to_labber_format(obj):
    TYPE_MAP = {
        str: 'STRING',
        int: 'DOUBLE',
        bool: 'BOOLEAN',
        Node: 'STRING',
        float: 'DOUBLE',
        dict: 'PATH'
    }
    if 'enum' in str(obj):
        return 'COMBO'
    str_obj = str(obj)
    if 'typing' in str_obj:
        if 'dict' in str_obj.lower():
            return 'PATH'
        if 'list' in str_obj.lower():
            return 'PATH'
    try:
        return TYPE_MAP[obj]
    except KeyError:
        return 'NONE'

def _replace_characters(s: str) -> str:
    return s.replace('\n', ' ').replace('\r', '').replace('"', '`').replace("'", '`').replace(";", ':').replace("%", " percent")

def labber_delimiter(*args: str) -> str:
    return ' - '.join(args).upper()

def _to_html_list(x: t.List[str]) -> str:
    html_list = "<ul>"
    for item in x:
        item_cleaned = _replace_characters(item)
        html_list += f"<li>{item_cleaned}</li>"
    html_list += "</ul>"
    return html_list

def tooltip(desc, node = None, enum = None):
    desc_cleaned = _replace_characters(desc)
    desc = f'<p>{desc_cleaned}</p>'
    enum = f'<p>{_to_html_list(enum)}</p>' if enum else ''
    node_path = f'<p><b>{node.strip()}</b></p>' if node else ''
    return '<html><body>' + desc + enum + node_path + '</body></html>'

In [165]:
import inspect
from docstring_parser import parse

def function_to_group(func, section):
    """Native Python function."""
    items = []
    group = func.__name__
    signature = inspect.signature(func)
    docstring = parse(func.__doc__)

    for k, v in signature.parameters.items():
        if k == 'self':
            continue
        item = {}
        item['label'] = k.upper()
        item['datatype'] = to_labber_format(v.annotation)
        item['group'] = labber_delimiter(section.upper(), group.upper())
        item['section'] = section.upper() if section else 'DEVICE'
        if v.default != inspect._empty:
            print(v, v.default, type(v.default))
            if 'enum' in str(type(v.default)):
                item['def_value'] = str(v.default.name)
            else:
                item['def_value'] = str(v.default)
        item['permission'] = 'WRITE'
        if item['datatype'] == 'COMBO':
            for idx, enum in enumerate(v.annotation, 1):
                item[f'cmd_def_{idx}'] = str(enum.value)
                item[f'combo_def_{idx}'] = str(enum.name)

        for param in docstring.params:
            if k == param.arg_name:
                item['tooltip'] = tooltip(param.description)
        items.append(item)
    items.append(
        {
            'label': 'EXECUTEFUNC', 
            'datatype': 'BUTTON', 
            'group': labber_delimiter(section.upper(), group.upper()),
            'section': section.upper() if section else 'DEVICE',
            'tooltip': tooltip(docstring.short_description),
        }
    )
    return items

In [166]:
class CustomLabel:
    def __init__(self, name, section, group):
        self._name = name
        self._section = section
        self._group = group
        self._items = {}
        self._combos = []
        self._tt_desc = ''
        self._tt_node = ''
        self._tt_enums: t.List[str] = []

    def __setitem__(self, key, value):
        self._items[key] = value

    def __getitem__(self, key):
        return self._items[key]

    def add_enum(self, cmd_def, combo_def, desc):
        self._combos.append((cmd_def, combo_def, desc))

    def tooltip(self, desc: str, node: str = None):
        self._tt_desc = desc
        self._tt_node = node

    def items(self):
        return self._items

    def combos(self):
        d = {}
        for idx,c in enumerate(self._combos, 1):
            d[f'cmd_def{idx}'] = c[0]
            d[f'combo_def{idx}'] = c[1]
            self._tt_enums.append(f'{c[1]}: {c[2]}')
        return d

    def to_config(self, config: configparser.ConfigParser):
        config.add_section(self._name)
        config.set(self._name, 'group', self._group)
        config.set(self._name, 'section', self._section)
        for k, v in self._items.items():
            config.set(self._name, k, v)

        for idx,c in enumerate(self._combos, 1):
            config.set(self._name, f'cmd_def{idx}', c[0])
            config.set(self._name, f'combo_def{idx}', c[1])
            self._tt_enums.append(f'{c[1]}: {c[2]}')
        config.set(self._name, 'tooltip', tooltip(self._tt_desc, self._tt_node, self._tt_enums))

In [165]:
config = configparser.ConfigParser()

a = CustomLabel(name='Waveform - input', section='Waveforms', group='configure')
a['datatype'] = 'PATH'
a.tooltip('Example', 'foobar/bar')
a.add_enum('input', 'input', 'Configure waveform 1')

b = CustomLabel(name='Waveform - num_samples', section='Waveforms', group='configure')
b['datatype'] = 'DOUBLE'
b.tooltip('Example', 'foobar/bar')
b.add_enum('wavef2', 'wavef_set', 'Configure num samples')

c = CustomLabel(name='Waveform - trigger_input', section='Waveforms', group='configure')
c['datatype'] = 'DOUBLE'
c.tooltip('Example path node', 'foobar/bar/path')


a.to_config(config)
b.to_config(config)
c.to_config(config)

with open('tester_custom.ini', "w") as config_file:
    config.write(config_file)

In [163]:
labber_delimiter(*['asd', 'bar'])

'ASD - BAR'

In [167]:
class NodeSection:
    def __init__(self, node: dict):
        self.node = node
        self.node.setdefault('Options', {})
        self._node_path = node["Node"].upper()
        self._properties = self.node["Properties"].lower()
        self.replaced = {}

    def replacer(key):
        def replacer(func):
            @wraps(func)
            def wraps_(self, *args, **kwargs):
                try:
                    return self.replaced[self._delete_root_node(self._node_path)][key]
                except KeyError:
                    return func(self, *args, **kwargs)
            return wraps_
        return replacer

    def permission(self) -> str:
        if "read" in self._properties and "write" in self._properties:
            return "BOTH"
        if "read" in self._properties:
            return "READ"
        if "write" in self._properties:
            return "WRITE"
        return "NONE"

    def show_in_measurement_dlg(self) -> t.Optional[str]:
        if self.node["Type"] in ["VECTOR", "COMPLEX", "VECTOR_COMPLEX"]:
            return "True"

    def section(self) -> str:
        parsed = self._node_path.split("/")
        try:
            if parsed[3].isnumeric():
                return labber_delimiter(*parsed[2:4])
                # return " - ".join(parsed[2:4])
            return labber_delimiter(*parsed[2:3]) #" - ".join(parsed[2:3])
        except IndexError:
            return parsed[2]

    def group(self) -> str:
        node_path = self._delete_root_node(self._node_path)
        path = [x for x in node_path.split("/") if not x.isnumeric()]
        if len(path) > 1:
            return labber_delimiter(*path[:-1])
            # return " - ".join(path[:-1])
        return labber_delimiter(*path)
        # return " - ".join(path)

    def label(self) -> str:
        node_path = self._delete_root_node(self._node_path)
        path = node_path.split("/")
        return labber_delimiter(*path)
        # return " - ".join(path)

    def combo_def(self) -> t.List[dict]:
        if "enumerated" in self.node["Type"].lower():
            if "READ" == self.permission():
                return []
        opt = self.node['Options']
        combos = []
        for idx, (k, v) in enumerate(opt.items(), 1):
            value, _ = enum_description(v)
            res = {
                f"cmd_def_{idx}": value if value else str(k),
                f"combo_def_{idx}": value if value else str(k),
            }
            combos.append(res)
        return combos

    def _to_html_list(self, x: t.List[str]) -> str:
        html_list = "<ul>"
        for item in x:
            html_list += f"<li>{item}</li>"
        html_list += "</ul>"
        return html_list

    def tooltip(self) -> str:
        node_path = "/DEV..." + "/" + "/".join(self._node_path.split("/")[2:])
        items = []
        for k, v in self.node["Options"].items():
            value, desc = enum_description(v)
            items.append(f"{value if value else k}: {desc}")
        return tooltip(self.node["Description"], enum=items, node=node_path)

    def unit(self) -> t.Optional[str]:
        if self.node["Unit"] == "None":
            return None
        unit = self.node["Unit"].replace("%", " percent").replace("'", "")
        # Remove degree signs etc.
        return unit.encode("ascii", "ignore").decode()

    @replacer("datatype")
    def datatype(self) -> t.Optional[str]:
        unit = self.node["Type"]
        if "enumerated" in unit.lower():
            if not "READ" == self.permission():
                return "COMBO"
        boolean_nodes = ["ENABLE", "SINGLE", "ON"]
        if self.node["Node"].split("/")[-1].upper() in boolean_nodes:
            return "BOOLEAN"
        if unit == "Double" or "integer" in unit.lower():
            return "DOUBLE"
        if unit == "Complex":
            return unit.upper()
        if unit == "ZIVectorData":
            return "VECTOR"
        if unit == "String":
            return "STRING"

    def _delete_root_node(self, path: str) -> str:
        return re.sub(r"/DEV(\d+)", "", path)[1:]

    def set_cmd(self) -> t.Optional[str]:
        if "read" in self._properties:
            return self._delete_root_node(self._node_path)

    def get_cmd(self) -> t.Optional[str]:
        if "write" in self._properties:
            return self._delete_root_node(self._node_path)

    def as_dict(self):
        d = {
            'section': self.section(),
            'group': self.group(),
            'label': self.label(),
            'datatype': self.datatype(),
            'unit': self.unit(),
            'tooltip': self.tooltip(),
            
        }
        for item in self.combo_def():
            for k, v in item.items():
                d[k] = v
        d['permission'] = self.permission()
        d['set_cmd'] = self.set_cmd()
        d['show_in_measurement_dlg'] = self.show_in_measurement_dlg()
        return d

    def to_config(self, config):
        sec_namex = self.label()
        config.add_section(sec_namex)
        config.set(sec_namex, "section", self.section())
        config.set(sec_namex, "group", self.group())
        config.set(sec_namex, "label", self.label())
        config.set(sec_namex, "datatype", self.datatype()) if self.datatype() else ...
        config.set(sec_namex, "unit", self.unit()) if self.unit() else ...
        config.set(sec_namex, "tooltip", self.tooltip())

        for item in self.combo_def():
            for k, v in item.items():
                config.set(sec_namex, k, v)

        config.set(sec_namex, "permission", self.permission()) if self.permission() else ...
        config.set(sec_namex, "set_cmd", self.set_cmd()) if self.set_cmd() else ...
        config.set(sec_namex, "get_cmd", self.get_cmd()) if self.get_cmd() else ...
        config.set(sec_namex, "show_in_measurement_dlg", self.show_in_measurement_dlg()) if self.show_in_measurement_dlg() else ...


class AWGModuleSection(NodeSection):
    def __init__(self, node: dict):
        super().__init__(node)
        self.replaced = {
            "DIRECTORY": {"datatype": "PATH"},
            'COMPILER/SOURCEFILE': {'datatype': 'PATH'}}

    def label(self):
        return f"AWG - {super().label()}"

    def section(self):
        return "AWG"


class DataServerSection(NodeSection):
    def __init__(self, node: dict):
        super().__init__(node)

    def label(self):
        return labber_delimiter("DataServer", super().label())

    def section(self):
        return "DataServer"


class ScopeSection(NodeSection):
    def __init__(self, node: dict):
        super().__init__(node)
        self.replaced = {"SAVE/DIRECTORY": {"datatype": "PATH"}}

    def label(self):
        return labber_delimiter("Scope", super().label())

    def section(self):
        return "Scope"

In [7]:
def dict_to_config(dict_, config):
    section = 'func - ' + dict_['section'] + ' - ' + dict_['group'] + ' - ' + dict_['label']
    config.add_section(section)
    for k, v in dict_.items():
        config.set(section, k, v)

def _delete_root_node(path: str) -> str:
    return re.sub(r"/DEV(\d+)", "", path.upper())[1:]
    
def to_labber_label(path):
    node_path = _delete_root_node(path)
    path = node_path.split("/")
    return " - ".join(path)

methods = [
    'configure_channel',
    'subscribe',
    'unsubscribe'
]

def class_methods(class_, include: list = None):
    methods = []
    for attribute in dir(class_.__class__):
        if callable(class_.attribute):
            if not attribute.startswith('_'):
                methods.append(attribute)
    return methods

def qa_channels(device, config):
    for k in device.qachannels:
        for method in methods:
            items = function_to_group(getattr(k, method), to_labber_label(k.node_info.path))
            for item in items:
                dict_to_config(item, config)

In [227]:
from zhinst.toolkit.nodetree.node import NodeList
import inspect
from dataclasses import dataclass
from copy import deepcopy

funcs = []
paths = []

itemss = []

@dataclass
class NodePath:
    path: str = ''

    def extend(self, x):
        self.path = self.path + ' - ' + x if self.path else x

class Foo():
    def __init__(self):
        self.path = NodePath()
        self.root_path = NodePath()
        self.section_path = NodePath()
        self.property_path = NodePath()

    def functions(self, obj):
        for name, attr in vars(obj).items():
            if name.startswith('_'):
                continue
            if isinstance(attr, property):
                th = t.get_type_hints(attr.fget)
                if "typing.Union" in str(th["return"]) or "typing.Sequence" in str(th["return"]):
                    for k in getattr(device, name):
                        self.path.extend(name)
                        self.path.extend(k.raw_tree[-1])
                        self.section_path = deepcopy(self.path)
                        self.functions(th['return'].__args__[0])
                    #self.path.extend(name)
                    #self.functions(th['return'].__args__[0])

                elif inspect.isclass(th["return"]):
                    if issubclass(th["return"], Node):
                        #print(self.section_path.path + ' - ' + name)
                        # self.path = deepcopy(self.section_path.extend(name))
                        # print(self.section_path)
                        self.property_path = self.section_path.path + ' - ' + name
                        #self.section_path.path = self.section_path.path + ' - ' + name
                        # self.path.extend(name)
                        self.functions(th['return'])
            else:
                self.root_path = deepcopy(self.path)
                print(self.property_path, name)
                # print(self.path)
                # print(self.root_path)
                #funcs.append(attr)
                d = {
                    'section': self.path.path,
                    'name': name,
                    'obj': attr
                }
                paths.append(self.path)
                itemss.append(d)
            self.path = self.root_path
            self.property_path = self.section_path

        self.path = NodePath()

o = Foo()
o.functions(device.__class__)
# display(funcs)
# itemss

NodePath(path='') factory_reset
NodePath(path='') start_continuous_sw_trigger
NodePath(path='') configure_channel
qachannels - 0 - generator enable_sequencer
NodePath(path='qachannels - 0') wait_done
NodePath(path='qachannels - 0') load_sequencer_program
NodePath(path='qachannels - 0') write_to_waveform_memory
NodePath(path='qachannels - 0') read_from_waveform_memory
NodePath(path='qachannels - 0') configure_sequencer_triggering
qachannels - 0 - readout configure_result_logger
NodePath(path='qachannels - 0') run
NodePath(path='qachannels - 0') stop
NodePath(path='qachannels - 0') wait_done
NodePath(path='qachannels - 0') read
NodePath(path='qachannels - 0') write_integration_weights
NodePath(path='qachannels - 0') read_integration_weights
qachannels - 0 - spectroscopy configure_result_logger
NodePath(path='qachannels - 0') run
NodePath(path='qachannels - 0') stop
NodePath(path='qachannels - 0') wait_done
NodePath(path='qachannels - 0') read
NodePath(path='qachannels - 0') configure_cha

In [168]:
from zhinst.toolkit.nodetree.node import NodeList
import inspect
from dataclasses import dataclass
from copy import deepcopy

class Foo():
    def __init__(self, ignores: list):
        self.ignores = ignores
        self.path = ''
        self.root_path = ''
        self.section_path = ''
        self.property_path = ''
        self._functions = []

    def get_functions(self):
        return self._functions

    def functions(self, obj):
        for name, attr in vars(obj).items():
            if name in self.ignores:
                continue
            if name.startswith('_'):
                continue
            if isinstance(attr, property):
                th = t.get_type_hints(attr.fget)
                if "typing.Union" in str(th["return"]) or "typing.Sequence" in str(th["return"]):
                    for k in getattr(device, name):
                        self.path = ''
                        self.path = labber_delimiter(self.path, name) if self.path else name
                        self.path = labber_delimiter(self.path, k.raw_tree[-1])
                        self.root_path = self.path
                        self.functions(th['return'].__args__[0])
                elif inspect.isclass(th["return"]):
                    if issubclass(th["return"], Node):
                        self.root_path = self.path
                        self.path = labber_delimiter(self.path, name) if self.path else name
                        self.functions(th['return'])
                        self.path = self.root_path
                    else:
                        continue
            else:
                d = {
                    'section': self.root_path if self.root_path else 'DEVICE',
                    'section_name': self.path if self.path else 'DEVICE',
                    'name': name,
                    'obj': attr
                }
                self._functions.append(d)

In [170]:
from collections import defaultdict

config = configparser.ConfigParser()

# All modules separate INI file.
# DataServer separate instrument
# Test individual function
# Two entries for both waves. CSV / reading

general_settings(config, device)


IGNORED_FUNCTIONS = {
    'MFLI': [
        'check_compatibility',
        'get_streamingnodes',
        'set_transaction'
    ]
}
IGNORED_FUNCTIONS = defaultdict(list, **IGNORED_FUNCTIONS)

SECTIONS_ = [
    {'root': device, 'handler': NodeSection},
]

for item in SECTIONS_:
    handler = item['handler']
    for node, leaf in item['root']:
        sec = handler(leaf)
        sec.to_config(config)

o = Foo(ignores=IGNORED_FUNCTIONS[device._device_type])
o.functions(device.__class__)

for item in o.get_functions():
    for func in function_to_group(item['obj'], item['section_name']):
        section = labber_delimiter(item['section_name'], item['name'], func['label'])
        config.add_section(section)
        for k, v in func.items():
            config.set(section, k, v)

for k, v in config.items():
    if ('LOAD_SEQUENCER_PROGRAM' and 'SEQUENCER_PROGRAM') in k:
        v['DATATYPE'] = 'PATH'
    elif ('WRITE_TO_WAVEFORM_MEMORY' and 'WAVEFORMS') in k:
        v['DATATYPE'] = 'PATH'
    elif ('WRITE_TO_WAVEFORM_MEMORY' and 'INDEXES') in k:
        v['DATATYPE'] = 'STRING'
        v['TOOLTIP'] = tooltip('Indexes separated by comma.')

with open('tester_uhfli.ini', "w", encoding='utf-8') as config_file:
    config.write(config_file)

deep: bool = True True <class 'bool'>
timeout: float = 10 10 <class 'int'>
sleep_time: float = 0.005 0.005 <class 'float'>
timeout: float = 10 10 <class 'int'>
clear_existing: bool = True True <class 'bool'>
slots: List[int] = None None <class 'NoneType'>
play_pulse_delay: float = 0.0 0.0 <class 'float'>
num_averages: int = 1 1 <class 'int'>
averaging_mode: zhinst.toolkit.interface.AveragingMode = <AveragingMode.CYCLIC: 0> AveragingMode.CYCLIC <enum 'AveragingMode'>
timeout: float = 10 10 <class 'int'>
sleep_time: float = 0.05 0.05 <class 'float'>
timeout: float = 10 10 <class 'int'>
sleep_time: float = 0.05 0.05 <class 'float'>
timeout: float = 10 10 <class 'int'>
integration_delay: float = 0.0 0.0 <class 'float'>
clear_existing: bool = True True <class 'bool'>
slots: List[int] = None None <class 'NoneType'>
num_averages: int = 1 1 <class 'int'>
averaging_mode: zhinst.toolkit.interface.AveragingMode = <AveragingMode.CYCLIC: 0> AveragingMode.CYCLIC <enum 'AveragingMode'>
timeout: float

### AWG module

In [195]:
def general_settings(config):
    section = 'General settings'
    config.add_section(section)
    config.set(section, "name", f'Zurich Instruments AWG Module')
    config.set(section, "version", '0.1')
    config.set(section, "driver_path", f'Zurich_Instruments_AWG_Module')
    config.set(section, "interface", f'Other')
    config.set(section, "startup", f'Do nothing')

config = configparser.ConfigParser()

# All modules separate INI file.
# DataServer separate instrument
# Test individual function

general_settings(config)

SECTIONS_ = [
    {'root': session.modules.awg, 'handler': AWGModuleSection},
]

for item in SECTIONS_:
    handler = item['handler']
    for node, leaf in item['root']:
        sec = handler(leaf)
        sec.to_config(config)

funcs = [
    session.module.awg.subscribe,
    session.module.awg.unsubscribe,
]
for func in funcs:
    items = function_to_group(func, 'Func')
    for item in items:
        section = item['group'] + '-' + item['label']
        config.add_section(section)
        for k, v in item.items():
            config.set(section, k, v)

with open('tester_awg.ini', "w") as config_file:
    config.write(config_file)