-
Notifications
You must be signed in to change notification settings - Fork 31
/
tabor.py
1385 lines (1104 loc) · 58.6 KB
/
tabor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import functools
import logging
import numbers
import sys
import weakref
from typing import List, Tuple, Set, Callable, Optional, Any, cast, Union, Dict, Mapping, NamedTuple, Iterable,\
Collection
from collections import OrderedDict
import numpy as np
from qupulse import ChannelID
from qupulse._program._loop import Loop, make_compatible
from qupulse.hardware.feature_awg.channel_tuple_wrapper import ChannelTupleAdapter
from qupulse.hardware.feature_awg.features import ChannelSynchronization, AmplitudeOffsetHandling, VoltageRange, \
ProgramManagement, ActivatableChannels, DeviceControl, StatusTable, SCPI, VolatileParameters, \
ReadProgram, RepetitionMode
from qupulse.hardware.util import voltage_to_uint16, find_positions
from qupulse.utils.types import TimeType
from qupulse.hardware.feature_awg.base import AWGChannelTuple, AWGChannel, AWGDevice, AWGMarkerChannel
from typing import Sequence
from qupulse._program.tabor import TaborSegment, TaborException, TaborProgram, PlottableProgram, TaborSequencing, \
make_combined_wave
import pyvisa
import warnings
# Provided by Tabor electronics for python 2.7
# a python 3 version is in a private repository on https://git.rwth-aachen.de/qutech
# Beware of the string encoding change!
import teawg
assert (sys.byteorder == "little")
__all__ = ["TaborDevice", "TaborChannelTuple", "TaborChannel"]
TaborProgramMemory = NamedTuple("TaborProgramMemory", [("waveform_to_segment", np.ndarray),
("program", TaborProgram)])
def with_configuration_guard(function_object: Callable[["TaborChannelTuple", Any], Any]) -> Callable[
["TaborChannelTuple"], Any]:
"""This decorator assures that the AWG is in configuration mode while the decorated method runs."""
@functools.wraps(function_object)
def guarding_method(channel_pair: "TaborChannelTuple", *args, **kwargs) -> Any:
if channel_pair._configuration_guard_count == 0:
channel_pair._enter_config_mode()
channel_pair._configuration_guard_count += 1
try:
return function_object(channel_pair, *args, **kwargs)
finally:
channel_pair._configuration_guard_count -= 1
if channel_pair._configuration_guard_count == 0:
channel_pair._exit_config_mode()
return guarding_method
def with_select(function_object: Callable[["TaborChannelTuple", Any], Any]) -> Callable[["TaborChannelTuple"], Any]:
"""Asserts the channel pair is selcted when the wrapped function is called"""
@functools.wraps(function_object)
def selector(channel_tuple: "TaborChannelTuple", *args, **kwargs) -> Any:
channel_tuple._select()
return function_object(channel_tuple, *args, **kwargs)
return selector
########################################################################################################################
# Device
########################################################################################################################
# Features
class TaborSCPI(SCPI):
def __init__(self, device: "TaborDevice", visa: pyvisa.resources.MessageBasedResource):
super().__init__(visa)
self._parent = weakref.ref(device)
def send_cmd(self, cmd_str, paranoia_level=None):
for instr in self._parent().all_devices:
instr.send_cmd(cmd_str=cmd_str, paranoia_level=paranoia_level)
def send_query(self, query_str, query_mirrors=False) -> Any:
if query_mirrors:
return tuple(instr.send_query(query_str) for instr in self._parent().all_devices)
else:
return self._parent().main_instrument.send_query(query_str)
def _send_cmd(self, cmd_str, paranoia_level=None) -> Any:
"""Overwrite send_cmd for paranoia_level > 3"""
if paranoia_level is None:
paranoia_level = self._parent().paranoia_level
if paranoia_level < 3:
self._parent().super().send_cmd(cmd_str=cmd_str, paranoia_level=paranoia_level) # pragma: no cover
else:
cmd_str = cmd_str.rstrip()
if len(cmd_str) > 0:
ask_str = cmd_str + "; *OPC?; :SYST:ERR?"
else:
ask_str = "*OPC?; :SYST:ERR?"
*answers, opc, error_code_msg = self._parent()._visa_inst.ask(ask_str).split(";")
error_code, error_msg = error_code_msg.split(",")
error_code = int(error_code)
if error_code != 0:
_ = self._parent()._visa_inst.ask("*CLS; *OPC?")
if error_code == -450:
# query queue overflow
self.send_cmd(cmd_str)
else:
raise RuntimeError("Cannot execute command: {}\n{}: {}".format(cmd_str, error_code, error_msg))
assert len(answers) == 0
class TaborChannelSynchronization(ChannelSynchronization):
"""This Feature is used to synchronise a certain ammount of channels"""
def __init__(self, device: "TaborDevice"):
super().__init__()
self._parent = weakref.ref(device)
def synchronize_channels(self, group_size: int) -> None:
"""
Synchronize in groups of `group_size` channels. Groups of synchronized channels will be provided as
AWGChannelTuples. The channel_size must be evenly dividable by the number of channels
Args:
group_size: Number of channels per channel tuple
"""
if group_size == 2:
self._parent()._channel_tuples = []
for i in range((int)(len(self._parent().channels) / group_size)):
self._parent()._channel_tuples.append(
TaborChannelTuple((i + 1),
self._parent(),
self._parent().channels[(i * group_size):((i * group_size) + group_size)],
self._parent().marker_channels[(i * group_size):((i * group_size) + group_size)])
)
self._parent()[SCPI].send_cmd(":INST:COUP:STAT OFF")
elif group_size == 4:
self._parent()._channel_tuples = [TaborChannelTuple(1,
self._parent(),
self._parent().channels,
self._parent().marker_channels)]
self._parent()[SCPI].send_cmd(":INST:COUP:STAT ON")
else:
raise TaborException("Invalid group size")
class TaborDeviceControl(DeviceControl):
"""This feature is used for basic communication with a AWG"""
def __init__(self, device: "TaborDevice"):
super().__init__()
self._parent = weakref.ref(device)
def reset(self) -> None:
"""
Resetting the whole device. A command for resetting is send to the Device, the device is initialized again and
all channel tuples are cleared.
"""
self._parent()[SCPI].send_cmd(":RES")
self._parent()._coupled = None
self._parent()._initialize()
for channel_tuple in self._parent().channel_tuples:
channel_tuple[TaborProgramManagement].clear()
def trigger(self) -> None:
"""
This method triggers a device remotely.
"""
self._parent()[SCPI].send_cmd(":TRIG")
class TaborStatusTable(StatusTable):
def __init__(self, device: "TaborDevice"):
super().__init__()
self._parent = device
def get_status_table(self) -> Dict[str, Union[str, float, int]]:
"""
Send a lot of queries to the AWG about its settings. A good way to visualize is using pandas.DataFrame
Returns:
An ordered dictionary with the results
"""
name_query_type_list = [("channel", ":INST:SEL?", int),
("coupling", ":OUTP:COUP?", str),
("volt_dc", ":SOUR:VOLT:LEV:AMPL:DC?", float),
("volt_hv", ":VOLT:HV?", float),
("offset", ":VOLT:OFFS?", float),
("outp", ":OUTP?", str),
("mode", ":SOUR:FUNC:MODE?", str),
("shape", ":SOUR:FUNC:SHAPE?", str),
("dc_offset", ":SOUR:DC?", float),
("freq_rast", ":FREQ:RAST?", float),
("gated", ":INIT:GATE?", str),
("continuous", ":INIT:CONT?", str),
("continuous_enable", ":INIT:CONT:ENAB?", str),
("continuous_source", ":INIT:CONT:ENAB:SOUR?", str),
("marker_source", ":SOUR:MARK:SOUR?", str),
("seq_jump_event", ":SOUR:SEQ:JUMP:EVEN?", str),
("seq_adv_mode", ":SOUR:SEQ:ADV?", str),
("aseq_adv_mode", ":SOUR:ASEQ:ADV?", str),
("marker", ":SOUR:MARK:SEL?", int),
("marker_high", ":MARK:VOLT:HIGH?", str),
("marker_low", ":MARK:VOLT:LOW?", str),
("marker_width", ":MARK:WIDT?", int),
("marker_state", ":MARK:STAT?", str)]
data = OrderedDict((name, []) for name, *_ in name_query_type_list)
for ch in (1, 2, 3, 4):
self._parent.channels[ch - 1]._select()
self._parent.marker_channels[(ch - 1) % 2]._select()
for name, query, dtype in name_query_type_list:
data[name].append(dtype(self._parent[SCPI].send_query(query)))
return data
# Implementation
class TaborDevice(AWGDevice):
def __init__(self, device_name: str, instr_addr=None, paranoia_level=1, external_trigger=False, reset=False,
mirror_addresses=()):
"""
Constructor for a Tabor device
Args:
device_name (str): Name of the device
instr_addr: Instrument address that is forwarded to teawag
paranoia_level (int): Paranoia level that is forwarded to teawg
external_trigger (bool): Not supported yet
reset (bool):
mirror_addresses: list of devices on which the same things as on the main device are done.
For example you can a simulator and a real Device at once
"""
super().__init__(device_name)
self._instr = teawg.TEWXAwg(instr_addr, paranoia_level)
self._mirrors = tuple(teawg.TEWXAwg(address, paranoia_level) for address in mirror_addresses)
self._coupled = None
self._clock_marker = [0, 0, 0, 0]
self.add_feature(TaborSCPI(self, self.main_instrument._visa_inst))
self.add_feature(TaborDeviceControl(self))
self.add_feature(TaborStatusTable(self))
if reset:
self[SCPI].send_cmd(":RES")
# Channel
self._channels = [TaborChannel(i + 1, self) for i in range(4)]
# MarkerChannels
self._marker_channels = [TaborMarkerChannel(i + 1, self) for i in range(4)]
self._initialize()
# ChannelTuple
self._channel_tuples = []
self.add_feature(TaborChannelSynchronization(self))
self[TaborChannelSynchronization].synchronize_channels(2)
if external_trigger:
raise NotImplementedError() # pragma: no cover
def enable(self) -> None:
"""
This method immediately generates the selected output waveform, if the device is in continuous and armed
repetition mode.
"""
self[SCPI].send_cmd(":ENAB")
def abort(self) -> None:
"""
With abort you can terminate the current generation of the output waveform. When the output waveform is
terminated the output starts generating an idle waveform.
"""
self[SCPI].send_cmd(":ABOR")
def set_coupled(self, coupled: bool) -> None:
"""
Thats the coupling of the device to 'coupled'
"""
if coupled:
self[SCPI].send_cmd("INST:COUP:STAT ON")
else:
self[SCPI].send_cmd("INST:COUP:STAT OFF")
def _is_coupled(self) -> bool:
"""
Returns true if the coupling of the device is 'coupled' otherwise false
"""
if self._coupled is None:
return self[SCPI].send_query(":INST:COUP:STAT?") == "ON"
else:
return self._coupled
def cleanup(self) -> None:
for channel_tuple in self.channel_tuples:
channel_tuple.cleanup()
@property
def channels(self) -> Collection["TaborChannel"]:
"""Returns a list of all channels of a Device"""
return self._channels
@property
def marker_channels(self) -> Collection["TaborMarkerChannel"]:
"""Returns a list of all marker channels of a device. The collection may be empty"""
return self._marker_channels
@property
def channel_tuples(self) -> Collection["TaborChannelTuple"]:
"""Returns a list of all channel tuples of a list"""
return self._channel_tuples
@property
def main_instrument(self) -> teawg.TEWXAwg:
return self._instr
@property
def mirrored_instruments(self) -> Sequence[teawg.TEWXAwg]:
return self._mirrors
@property
def all_devices(self) -> Sequence[teawg.TEWXAwg]:
return (self._instr,) + self._mirrors
@property
def _paranoia_level(self) -> int:
return self._instr.paranoia_level
@_paranoia_level.setter
def _paranoia_level(self, val):
for instr in self.all_devices:
instr.paranoia_level = val
@property
def dev_properties(self) -> dict:
return self._instr.dev_properties
def _send_binary_data(self, pref, bin_dat, paranoia_level=None):
for instr in self.all_devices:
instr.send_binary_data(pref, bin_dat=bin_dat, paranoia_level=paranoia_level)
def _download_segment_lengths(self, seg_len_list, pref=":SEGM:DATA", paranoia_level=None):
for instr in self.all_devices:
instr.download_segment_lengths(seg_len_list, pref=pref, paranoia_level=paranoia_level)
def _download_sequencer_table(self, seq_table, pref=":SEQ:DATA", paranoia_level=None):
for instr in self.all_devices:
instr.download_sequencer_table(seq_table, pref=pref, paranoia_level=paranoia_level)
def _download_adv_seq_table(self, seq_table, pref=":ASEQ:DATA", paranoia_level=None):
for instr in self.all_devices:
instr.download_adv_seq_table(seq_table, pref=pref, paranoia_level=paranoia_level)
make_combined_wave = staticmethod(teawg.TEWXAwg.make_combined_wave)
def _initialize(self) -> None:
# 1. Select channel
# 2. Turn off gated mode
# 3. Turn on continous mode
# 4. Armed mode (only generate waveforms after enab command)
# 5. Expect enable signal from (USB / LAN / GPIB)
# 6. Use arbitrary waveforms as marker source
# 7. Expect jump command for sequencing from (USB / LAN / GPIB)
setup_command = (
":INIT:GATE OFF; :INIT:CONT ON; "
":INIT:CONT:ENAB ARM; :INIT:CONT:ENAB:SOUR BUS;"
":SOUR:MARK:SOUR USER; :SOUR:SEQ:JUMP:EVEN BUS ")
self[SCPI].send_cmd(":INST:SEL 1")
self[SCPI].send_cmd(setup_command)
self[SCPI].send_cmd(":INST:SEL 3")
self[SCPI].send_cmd(setup_command)
def _get_readable_device(self, simulator=True) -> teawg.TEWXAwg:
"""
A method to get the first readable device out of all devices.
A readable device is a device which you can read data from like a simulator.
Returns:
The first readable device out of all devices
Throws:
TaborException: this exception is thrown if there is no readable device in the list of all devices
"""
for device in self.all_devices:
if device.fw_ver >= 3.0:
if simulator:
if device.is_simulator:
return device
else:
return device
raise TaborException("No device capable of device data read")
########################################################################################################################
# Channel
########################################################################################################################
# Features
class TaborVoltageRange(VoltageRange):
def __init__(self, channel: "TaborChannel"):
super().__init__()
self._parent = weakref.ref(channel)
@property
@with_select
def offset(self) -> float:
"""Get offset of AWG channel"""
return float(
self._parent().device[SCPI].send_query(":VOLT:OFFS?".format(channel=self._parent().idn)))
@property
@with_select
def amplitude(self) -> float:
"""Get amplitude of AWG channel"""
coupling = self._parent().device[SCPI].send_query(":OUTP:COUP?")
if coupling == "DC":
return float(self._parent().device[SCPI].send_query(":VOLT?"))
elif coupling == "HV":
return float(self._parent().device[SCPI].send_query(":VOLT:HV?"))
else:
raise TaborException("Unknown coupling: {}".format(coupling))
@property
def amplitude_offset_handling(self) -> AmplitudeOffsetHandling:
"""
Gets the amplitude and offset handling of this channel. The amplitude-offset controls if the amplitude and
offset settings are constant or if these should be optimized by the driver
"""
return self._parent()._amplitude_offset_handling
@amplitude_offset_handling.setter
def amplitude_offset_handling(self, amp_offs_handling: Union[AmplitudeOffsetHandling, str]) -> None:
"""
amp_offs_handling: See possible values at `AWGAmplitudeOffsetHandling`
"""
amp_offs_handling = AmplitudeOffsetHandling(AmplitudeOffsetHandling)
self._parent()._amplitude_offset_handling = amp_offs_handling
def _select(self) -> None:
self._parent()._select()
class TaborActivatableChannels(ActivatableChannels):
def __init__(self, channel: "TaborChannel"):
super().__init__()
self._parent = weakref.ref(channel)
@property
def enabled(self) -> bool:
"""
Returns the the state a channel has at the moment. A channel is either activated or deactivated
True stands for activated and false for deactivated
"""
return self._parent().device[SCPI].send_query(":OUTP ?") == "ON"
@with_select
def enable(self):
"""Enables the output of a certain channel"""
command_string = ":OUTP ON".format(ch_id=self._parent().idn)
self._parent().device[SCPI].send_cmd(command_string)
@with_select
def disable(self):
"""Disables the output of a certain channel"""
command_string = ":OUTP OFF".format(ch_id=self._parent().idn)
self._parent().device[SCPI].send_cmd(command_string)
def _select(self) -> None:
self._parent()._select()
# Implementation
class TaborChannel(AWGChannel):
def __init__(self, idn: int, device: TaborDevice):
super().__init__(idn)
self._device = weakref.ref(device)
self._amplitude_offset_handling = AmplitudeOffsetHandling.IGNORE_OFFSET
# adding Features
self.add_feature(TaborVoltageRange(self))
self.add_feature(TaborActivatableChannels(self))
@property
def device(self) -> TaborDevice:
"""Returns the device that the channel belongs to"""
return self._device()
@property
def channel_tuple(self) -> "TaborChannelTuple":
"""Returns the channel tuple that this channel belongs to"""
return self._channel_tuple()
def _set_channel_tuple(self, channel_tuple: "TaborChannelTuple") -> None:
"""
The channel tuple "channel_tuple" is assigned to this channel
Args:
channel_tuple (TaborChannelTuple): the channel tuple that this channel belongs to
"""
self._channel_tuple = weakref.ref(channel_tuple)
def _select(self) -> None:
self.device[SCPI].send_cmd(":INST:SEL {channel}".format(channel=self.idn))
########################################################################################################################
# ChannelTuple
########################################################################################################################
# Features
class TaborProgramManagement(ProgramManagement):
def __init__(self, channel_tuple: "TaborChannelTuple"):
super().__init__(channel_tuple)
self._programs = {}
self._armed_program = None
self._idle_sequence_table = [(1, 1, 0), (1, 1, 0), (1, 1, 0)]
self._trigger_source = 'BUS'
def get_repetition_mode(self, program_name: str) -> str:
"""
Returns the default repetition mode of a certain program
Args:
program_name (str): name of the program whose repetition mode should be returned
"""
return self._channel_tuple._known_programs[program_name].program._repetition_mode
def set_repetition_mode(self, program_name: str, repetition_mode: str) -> None:
"""
Changes the default repetition mode of a certain program
Args:
program_name (str): name of the program whose repetition mode should be changed
Throws:
ValueError: this Exception is thrown when an invalid repetition mode is given
"""
if repetition_mode in ("infinite", "once"):
self._channel_tuple._known_programs[program_name].program._repetition_mode = repetition_mode
else:
raise ValueError("{} is no vaild repetition mode".format(repetition_mode))
@property
def supported_repetition_modes(self) -> Set[RepetitionMode]:
return {RepetitionMode.INFINITE}
@with_configuration_guard
@with_select
def upload(self, name: str,
program: Loop,
channels: Tuple[Optional[ChannelID], Optional[ChannelID]],
marker_channels: Tuple[Optional[ChannelID], Optional[ChannelID]],
voltage_transformation: Tuple[Callable, Callable],
repetition_mode: str = None,
force: bool = False) -> None:
"""
Upload a program to the AWG.
The policy is to prefer amending the unknown waveforms to overwriting old ones.
"""
if repetition_mode is None:
repetition_mode = self._default_repetition_mode
else:
repetition_mode = RepetitionMode(repetition_mode)
if repetition_mode not in self.supported_repetition_modes:
raise ValueError(f"{repetition_mode} is not supported on {self._channel_tuple}")
if len(channels) != len(self._channel_tuple.channels):
raise ValueError("Wrong number of channels")
if len(marker_channels) != len(self._channel_tuple.marker_channels):
raise ValueError("Wrong number of marker")
if len(voltage_transformation) != len(self._channel_tuple.channels):
raise ValueError("Wrong number of voltage transformations")
# adjust program to fit criteria
sample_rate = self._channel_tuple.device.channel_tuples[0].sample_rate
make_compatible(program,
minimal_waveform_length=192,
waveform_quantum=16,
sample_rate=sample_rate / 10 ** 9)
if name in self._channel_tuple._known_programs:
if force:
self._channel_tuple.free_program(name)
else:
raise ValueError('{} is already known on {}'.format(name, self._channel_tuple.idn))
# They call the peak to peak range amplitude
ranges = tuple(ch[VoltageRange].amplitude for ch in self._channel_tuple.channels)
voltage_amplitudes = tuple(range / 2 for range in ranges)
voltage_offsets = []
for channel in self._channel_tuple.channels:
if channel._amplitude_offset_handling == AmplitudeOffsetHandling.IGNORE_OFFSET:
voltage_offsets.append(0)
elif channel._amplitude_offset_handling == AmplitudeOffsetHandling.CONSIDER_OFFSET:
voltage_offsets.append(channel[VoltageRange].offset)
else:
raise NotImplementedError(
'{} is invalid as AWGAmplitudeOffsetHandling'.format(channel._amplitude_offset_handling))
voltage_offsets = tuple(voltage_offsets)
# parse to tabor program
tabor_program = TaborProgram(program,
channels=tuple(channels),
markers=marker_channels,
device_properties=self._channel_tuple.device.dev_properties,
sample_rate=sample_rate / 10 ** 9,
amplitudes=voltage_amplitudes,
offsets=voltage_offsets,
voltage_transformations=voltage_transformation)
segments, segment_lengths = tabor_program.get_sampled_segments()
waveform_to_segment, to_amend, to_insert = self._channel_tuple._find_place_for_segments_in_memory(segments,
segment_lengths)
self._channel_tuple._segment_references[waveform_to_segment[waveform_to_segment >= 0]] += 1
for wf_index in np.flatnonzero(to_insert > 0):
segment_index = to_insert[wf_index]
self._channel_tuple._upload_segment(to_insert[wf_index], segments[wf_index])
waveform_to_segment[wf_index] = segment_index
if np.any(to_amend):
segments_to_amend = [segments[idx] for idx in np.flatnonzero(to_amend)]
waveform_to_segment[to_amend] = self._channel_tuple._amend_segments(segments_to_amend)
self._channel_tuple._known_programs[name] = TaborProgramMemory(waveform_to_segment=waveform_to_segment,
program=tabor_program)
# set the default repetionmode for a programm
self.set_repetition_mode(program_name=name, repetition_mode=repetition_mode)
def remove(self, name: str) -> None:
"""
Remove a program from the AWG.
Also discards all waveforms referenced only by the program identified by name.
Args:
name (str): The name of the program to remove.
"""
self._channel_tuple.free_program(name)
self._channel_tuple.cleanup()
def clear(self) -> None:
"""
Removes all programs and waveforms from the AWG.
Caution: This affects all programs and waveforms on the AWG, not only those uploaded using qupulse!
"""
self._channel_tuple.device.channels[0]._select()
self._channel_tuple.device[SCPI].send_cmd(":TRAC:DEL:ALL")
self._channel_tuple.device[SCPI].send_cmd(":SOUR:SEQ:DEL:ALL")
self._channel_tuple.device[SCPI].send_cmd(":ASEQ:DEL")
self._channel_tuple.device[SCPI].send_cmd(":TRAC:DEF 1, 192")
self._channel_tuple.device[SCPI].send_cmd(":TRAC:SEL 1")
self._channel_tuple.device[SCPI].send_cmd(":TRAC:MODE COMB")
self._channel_tuple.device._send_binary_data(pref=":TRAC:DATA", bin_dat=self._channel_tuple._idle_segment.get_as_binary())
self._channel_tuple._segment_lengths = 192 * np.ones(1, dtype=np.uint32)
self._channel_tuple._segment_capacity = 192 * np.ones(1, dtype=np.uint32)
self._channel_tuple._segment_hashes = np.ones(1, dtype=np.int64) * hash(self._channel_tuple._idle_segment)
self._channel_tuple._segment_references = np.ones(1, dtype=np.uint32)
self._channel_tuple._advanced_sequence_table = []
self._channel_tuple._sequencer_tables = []
self._channel_tuple._known_programs = dict()
self._change_armed_program(None)
@with_select
def arm(self, name: Optional[str]) -> None:
"""
Load the program 'name' and arm the device for running it.
Args:
name (str): the program the device should change to
"""
if self._channel_tuple._current_program == name:
self._channel_tuple.device[SCPI].send_cmd("SEQ:SEL 1")
else:
self._change_armed_program(name)
@property
def programs(self) -> Set[str]:
"""The set of program names that can currently be executed on the hardware AWG."""
return set(program.name for program in self._channel_tuple._known_programs.keys())
@with_select
def run_current_program(self) -> None:
"""
This method starts running the active program
Throws:
RuntimeError: This exception is thrown if there is no active program for this device
"""
if (self._channel_tuple.device._is_coupled()):
# channel tuple is the first channel tuple
if (self._channel_tuple.device._channel_tuples[0] == self):
if self._channel_tuple._current_program:
repetition_mode = self._channel_tuple._known_programs[
self._channel_tuple._current_program].program._repetition_mode
if repetition_mode == "infinite":
self._cont_repetition_mode()
self._channel_tuple.device[SCPI].send_cmd(':TRIG',
paranoia_level=self._channel_tuple.internal_paranoia_level)
else:
raise ValueError("{} is no vaild repetition mode".format(repetition_mode))
else:
raise RuntimeError("No program active")
else:
warnings.warn(
"TaborWarning - run_current_program() - the device is coupled - runthe program via the first channel tuple")
else:
if self._channel_tuple._current_program:
repetition_mode = self._channel_tuple._known_programs[
self._channel_tuple._current_program].program._repetition_mode
if repetition_mode == "infinite":
self._cont_repetition_mode()
self._channel_tuple.device[SCPI].send_cmd(':TRIG', paranoia_level=self._channel_tuple.internal_paranoia_level)
else:
raise ValueError("{} is no vaild repetition mode".format(repetition_mode))
else:
raise RuntimeError("No program active")
@with_select
@with_configuration_guard
def _change_armed_program(self, name: Optional[str]) -> None:
"""The armed program of the channel tuple is changed to the program with the name 'name'"""
if name is None:
sequencer_tables = [self._idle_sequence_table]
advanced_sequencer_table = [(1, 1, 0)]
else:
waveform_to_segment_index, program = self._channel_tuple._known_programs[name]
waveform_to_segment_number = waveform_to_segment_index + 1
# translate waveform number to actual segment
sequencer_tables = [[(rep_count, waveform_to_segment_number[wf_index], jump_flag)
for ((rep_count, wf_index, jump_flag), _) in sequencer_table]
for sequencer_table in program.get_sequencer_tables()]
# insert idle sequence
sequencer_tables = [self._idle_sequence_table] + sequencer_tables
# adjust advanced sequence table entries by idle sequence table offset
advanced_sequencer_table = [(rep_count, seq_no + 1, jump_flag)
for rep_count, seq_no, jump_flag in program.get_advanced_sequencer_table()]
if program.waveform_mode == TaborSequencing.SINGLE:
assert len(advanced_sequencer_table) == 1
assert len(sequencer_tables) == 2
while len(sequencer_tables[1]) < self._channel_tuple.device.dev_properties["min_seq_len"]:
assert advanced_sequencer_table[0][0] == 1
sequencer_tables[1].append((1, 1, 0))
# insert idle sequence in advanced sequence table
advanced_sequencer_table = [(1, 1, 0)] + advanced_sequencer_table
while len(advanced_sequencer_table) < self._channel_tuple.device.dev_properties["min_aseq_len"]:
advanced_sequencer_table.append((1, 1, 0))
self._channel_tuple.device[SCPI].send_cmd("SEQ:DEL:ALL", paranoia_level=self._channel_tuple.internal_paranoia_level)
self._channel_tuple._sequencer_tables = []
self._channel_tuple.device[SCPI].send_cmd("ASEQ:DEL", paranoia_level=self._channel_tuple.internal_paranoia_level)
self._channel_tuple._advanced_sequence_table = []
# download all sequence tables
for i, sequencer_table in enumerate(sequencer_tables):
self._channel_tuple.device[SCPI].send_cmd("SEQ:SEL {}".format(i + 1),
paranoia_level=self._channel_tuple.internal_paranoia_level)
self._channel_tuple.device._download_sequencer_table(sequencer_table)
self._channel_tuple._sequencer_tables = sequencer_tables
self._channel_tuple.device[SCPI].send_cmd("SEQ:SEL 1", paranoia_level=self._channel_tuple.internal_paranoia_level)
self._channel_tuple.device._download_adv_seq_table(advanced_sequencer_table)
self._channel_tuple._advanced_sequence_table = advanced_sequencer_table
self._channel_tuple._current_program = name
def _select(self):
self._channel_tuple.channels[0]._select()
@property
def _configuration_guard_count(self):
return self._channel_tuple._configuration_guard_count
@_configuration_guard_count.setter
def _configuration_guard_count(self, configuration_guard_count):
self._channel_tuple._configuration_guard_count = configuration_guard_count
def _enter_config_mode(self):
self._channel_tuple._enter_config_mode()
def _exit_config_mode(self):
self._channel_tuple._exit_config_mode()
@with_select
def _cont_repetition_mode(self):
"""Changes the run mode of this channel tuple to continous mode"""
self._channel_tuple.device[SCPI].send_cmd(f":TRIG:SOUR:ADV EXT")
self._channel_tuple.device[SCPI].send_cmd(
f":INIT:GATE OFF; :INIT:CONT ON; :INIT:CONT:ENAB ARM; :INIT:CONT:ENAB:SOUR {self._trigger_source}")
class TaborVolatileParameters(VolatileParameters):
def __init__(self, channel_tuple: "TaborChannelTuple", ):
super().__init__(channel_tuple=channel_tuple)
def set_volatile_parameters(self, program_name: str, parameters: Mapping[str, numbers.Number]) -> None:
""" Set the values of parameters which were marked as volatile on program creation. Sets volatile parameters
in program memory and device's (adv.) sequence tables if program is current program.
If set_volatile_parameters needs to run faster, set CONFIG_MODE_PARANOIA_LEVEL to 0 which causes the device to
enter the configuration mode with paranoia level 0 (Note: paranoia level 0 does not work for the simulator)
and set device._is_coupled.
Args:
program_name: Name of program which should be changed.
parameters: Names of volatile parameters and respective values to which they should be set.
"""
waveform_to_segment_index, program = self._channel_tuple._known_programs[program_name]
modifications = program.update_volatile_parameters(parameters)
self._channel_tuple.logger.debug("parameter modifications: %r" % modifications)
if not modifications:
self._channel_tuple.logger.info(
"There are no volatile parameters to update. Either there are no volatile parameters with "
"these names,\nthe respective repetition counts already have the given values or the "
"volatile parameters were dropped during upload.")
return
if program_name == self._channel_tuple._current_program:
commands = []
for position, entry in modifications.items():
if not entry.repetition_count > 0:
raise ValueError("Repetition must be > 0")
if isinstance(position, int):
commands.append(":ASEQ:DEF {},{},{},{}".format(position + 1, entry.element_number + 1,
entry.repetition_count, entry.jump_flag))
else:
table_num, step_num = position
commands.append(":SEQ:SEL {}".format(table_num + 2))
commands.append(":SEQ:DEF {},{},{},{}".format(step_num,
waveform_to_segment_index[entry.element_id] + 1,
entry.repetition_count, entry.jump_flag))
self._channel_tuple._execute_multiple_commands_with_config_guard(commands)
# Wait until AWG is finished
_ = self._channel_tuple.device.main_instrument._visa_inst.query("*OPC?")
class TaborReadProgram(ReadProgram):
def __init__(self, channel_tuple: "TaborChannelTuple", ):
super().__init__(channel_tuple=channel_tuple)
def read_complete_program(self):
return PlottableProgram.from_read_data(self._channel_tuple.read_waveforms(),
self._channel_tuple.read_sequence_tables(),
self._channel_tuple.read_advanced_sequencer_table())
# Implementation
class TaborChannelTuple(AWGChannelTuple):
CONFIG_MODE_PARANOIA_LEVEL = None
def __init__(self, idn: int, device: TaborDevice, channels: Iterable["TaborChannel"],
marker_channels: Iterable["TaborMarkerChannel"]):
super().__init__(idn)
self._device = weakref.ref(device)
self._configuration_guard_count = 0
self._is_in_config_mode = False
self._channels = tuple(channels)
self._marker_channels = tuple(marker_channels)
# the channel and channel marker are assigned to this channel tuple
for channel in self.channels:
channel._set_channel_tuple(self)
for marker_ch in self.marker_channels:
marker_ch._set_channel_tuple(self)
# adding Features
self.add_feature(TaborProgramManagement(self))
self.add_feature(TaborVolatileParameters(self))
self._idle_segment = TaborSegment.from_sampled(voltage_to_uint16(voltage=np.zeros(192),
output_amplitude=0.5,
output_offset=0., resolution=14),
voltage_to_uint16(voltage=np.zeros(192),
output_amplitude=0.5,
output_offset=0., resolution=14),
None, None)
self._known_programs = dict() # type: Dict[str, TaborProgramMemory]
self._current_program = None
self._segment_lengths = None
self._segment_capacity = None
self._segment_hashes = None
self._segment_references = None
self._sequencer_tables = None
self._advanced_sequence_table = None
self._internal_paranoia_level = 0
self[TaborProgramManagement].clear()
self._channel_tuple_adapter: ChannelTupleAdapter
@property
def internal_paranoia_level(self) -> Optional[int]:
return self._internal_paranoia_level
@property
def logger(self):
return logging.getLogger("qupulse.tabor")
@property
def channel_tuple_adapter(self) -> ChannelTupleAdapter:
if self._channel_tuple_adapter is None:
self._channel_tuple_adapter = ChannelTupleAdapter(self)
return self._channel_tuple_adapter
def _select(self) -> None:
"""The channel tuple is selected, which means that the first channel of the channel tuple is selected"""
self.channels[0]._select()
@property
def device(self) -> TaborDevice:
"""Returns the device that the channel tuple belongs to"""
return self._device()
@property
def channels(self) -> Collection["TaborChannel"]:
"""Returns all channels of the channel tuple"""
return self._channels
@property
def marker_channels(self) -> Collection["TaborMarkerChannel"]:
"""Returns all marker channels of the channel tuple"""
return self._marker_channels
@property
@with_select
def sample_rate(self) -> TimeType:
"""Returns the sample rate that the channels of a channel tuple have"""
return TimeType.from_float(
float(self.device[SCPI].send_query(":FREQ:RAST?".format(channel=self.channels[0].idn))))
@property
def total_capacity(self) -> int:
return int(self.device.dev_properties["max_arb_mem"]) // 2
def free_program(self, name: str) -> TaborProgramMemory:
if name is None:
raise TaborException("Removing 'None' program is forbidden.")
program = self._known_programs.pop(name)
self._segment_references[program.waveform_to_segment] -= 1
if self._current_program == name:
self[TaborProgramManagement]._change_armed_program(None)
return program
@property
def _segment_reserved(self) -> np.ndarray:
return self._segment_references > 0
@property
def _free_points_in_total(self) -> int:
return self.total_capacity - np.sum(self._segment_capacity[self._segment_reserved])
@property
def _free_points_at_end(self) -> int: