forked from PyFixate/Fixate
/
keysight_33500b.py
348 lines (319 loc) · 15.5 KB
/
keysight_33500b.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
import time
from fixate.core.common import mode_builder, unit_scale
from fixate.core.exceptions import ParameterError, InstrumentError
from fixate.drivers.funcgen.helper import FuncGen
from functools import update_wrapper
import inspect
MODES = {
":SINusoid": {
" [{frequency}]": {
",[{amplitude}]": {
",[{offset}]": {}}}},
":SQUare": {
" [{frequency}]": {
",[{amplitude}]": {
",[{offset}]": {}}}},
":RAMP": {
" [{frequency}]": {
",[{amplitude}]": {
",[{offset}]": {}}}},
":PULSE": {
" [{frequency}]": {
",[{amplitude}]": {
",[{offset}]": {}}}},
":NOISe DEFault": {
",[{amplitude}]": {
",[{offset}]": {}}},
":DC DEFault,DEFault": {
",[{offset}]": {}},
":PRBS": {
" [{frequency}]": {
",[{amplitude}]": {
",[{offset}]": {}}}},
":ARB": {
" [{frequency}]": {
",[{amplitude}]": {
",[{offset}]": {}}}}
}
ADV_MODES = {
':SQUare:'
'DCYCle'
}
class Keysight33500B(FuncGen):
REGEX_ID = "Agilent Technologies,335..B"
INSTR_TYPE = "VISA"
def __init__(self, instrument):
"""
The self._ values indicate the user values as entered if valid.
The self.__ values are the sanitised values used internally in the system to parse between functions
Limitations:
The function generator switches internal relays at certain thresholds.
Try to avoid these ranges in design if the function generator is loaded with a relatively low impedance
Table of ranges on the same relay arrangement
Min mVpp Max mVpp
4 60
60.1 199.9
200 599.9
600 2000
2001 6000
6001 20000
:param instrument:
:return:
"""
super().__init__(instrument)
self.instrument.query_delay = 0.2
self.instrument.timeout = 1000
# Rigol Restrictions
self.__restr_bandwidth = {"min": unit_scale("4uHz"), "max": unit_scale("20MHz")}
self.__restr_phase = {"min": -180, "max": 180}
self.__restr_amplitude = {"min": unit_scale("4mVpp"), "max": unit_scale("20Vpp")}
self._amplitude = None
self._store = {"ch1_duty": "50", "ch2_duty": "50", "ch1_modulate_source": "INT"}
self.api = [
# waveform selection
# Channel 1:
("channel1.waveform.sin", self.store_and_write, ("SOUR1:FUNC SIN", {"ch1_waveform_handler": None})),
("channel1.waveform.square", self.store_and_write,
("SOUR1:FUNC SQU\r\nSOUR1:FUNC:SQU:DCYC {self._store[ch1_duty]}",
{"ch1_waveform_handler": "channel1.waveform.square"})),
("channel1.waveform.ramp", self.store_and_write, ("SOUR1:FUNC RAMP", {"ch1_waveform_handler": None})),
("channel1.waveform.pulse", self.store_and_write,
("SOUR1:FUNC PULS\r\nPULS:DCYC {self._store[ch1_duty]}",
{"ch1_waveform_handler": "channel1.waveform.pulse"})),
("channel1.waveform.arb", self.store_and_write, ("SOUR1:FUNC ARB", {"ch1_waveform_handler": None})),
("channel1.waveform.triangle", self.store_and_write, ("SOUR1:FUNC TRI", {"ch1_waveform_handler": None})),
("channel1.waveform.noise", self.store_and_write, ("SOUR1:FUNC NOIS", {"ch1_waveform_handler": None})),
("channel1.waveform.dc", self.store_and_write, ("SOUR1:FUNC DC", {"ch1_waveform_handler": None})),
("channel1.waveform.prbs", self.store_and_write, ("SOUR1:FUNC PRBS", {"ch1_waveform_handler": None})),
# Channel Configuration
# Channel 1:
("channel1.vrms", self.write, "SOUR1:VOLT:UNIT VRMS\r\nVOLT {value}"),
("channel1.vpp", self.write, "SOUR1:VOLT:UNIT VPP\r\nVOLT {value}"),
("channel1.dbm", self.write, "SOUR1:VOLT:UNIT DBM\r\nVOLT {value}"),
("channel1.offset", self.write, "SOUR1:VOLT:OFFS {value}"),
("channel1.phase", self.write, "SOUR1:PHAS {value}"),
("channel1.duty", self.store_and_execute, ({"ch1_duty": "{value}"}, "ch1_waveform_handler")),
("channel1.frequency", self.write, "SOUR1:FREQ {value}"),
# Channel Activation
("channel1._call", self.write, "OUTP {value}"),
# Sync Configuration
("sync.polarity.normal", self.write, ""),
("sync.mode.normal", self.write, ""),
# Sync Mode source only works on one. Need to manually override so that only channel 1 being passed works
("sync.mode.source", self.write, ""),
("sync._call", self.write, "OUTP {value}"),
# Trigger Configuration
("trigger.immediate", self.write, "TRIG1:SOUR IMM"),
("trigger.external._call", self.write, "TRIG1:SOUR EXT"),
("trigger.external.rising", self.write, "TRIG1:SOUR EXT\r\n TRIG1:SLOP POS"),
("trigger.external.falling", self.write, "TRIG1:SOUR EXT\r\n TRIG1:SLOP NEG"),
("trigger.manual._call", self.write, "TRIG1:SOUR BUS"),
("trigger.manual.initiate", self.write, "TRIG1:SOUR BUS"),
("trigger.timer", self.write, "TRIG:SOUR TIM\r\n TRIG1:TIM {seconds}"),
("trigger.delay", self.write, "TRIG1:DEL {seconds}"),
("trigger.out._call", self.write, "OUTP:TRIG"),
("trigger.out.off", self.write, "OUTP:TRIG OFF"),
("trigger.out.rising", self.write, "OUTP:TRIG ON\r\n OUTP:TRIG:SLOP POS"),
("trigger.out.falling", self.write, "OUTP:TRIG ON\r\n OUTP:TRIG:SLOP NEG"),
# Modulate
# Channel 1:
("channel1.modulate.am._call", self.store, {"ch1_modulate_state": "AM", "ch1_modulate_setting": "FREQ"}),
("channel1.modulate.fm._call", self.store, {"ch1_modulate_state": "FM", "ch1_modulate_setting": "FREQ"}),
("channel1.modulate.pm._call", self.store, {"ch1_modulate_state": "PM", "ch1_modulate_setting": "FREQ"}),
("channel1.modulate.fsk._call", self.store, {"ch1_modulate_state": "FSK", "ch1_modulate_setting": "RATE"}),
(
"channel1.modulate.bpsk._call", self.store, {"ch1_modulate_state": "BPSK", "ch1_modulate_setting": "RATE"}),
("channel1.modulate.sum._call", self.store, {"ch1_modulate_state": "SUM", "ch1_modulate_setting": "FREQ"}),
# MODULATE SOURCES:
("channel1.modulate.source.internal._call", self.write, "SOUR1:{self._store[ch1_modulate_state]}:"
"SOUR INT"),
("channel1.modulate.source.external", self.write, "SOUR1:{self._store[ch1_modulate_state]}:SOUR EXT"),
# MODULATE ACTIVATION:
# Channel 1:
("channel1.modulate._call", self.write,
"{self._store[ch1_modulate_state]}:SOUR {self._store[ch1_modulate_source]}\r\n"
"{self._store[ch1_modulate_state]}:STAT {value}"),
# MODULATE OPTIONS:
# Channel 1:
("channel1.modulate.am.depth", self.write, "SOUR1:AM:DEPT {value}"),
("channel1.modulate.am.dssc", self.write, "SOUR1:AM:DSSC ON"),
("channel1.modulate.fm.freq_dev", self.write, "SOUR1:FM:DEV {value}"),
("channel1.modulate.pm.phase_dev", self.write, "SOUR1:PM:DEV {value}"),
("channel1.modulate.fsk.hop_freq", self.write, "SOUR1:FSK:FREQ {value}"),
("channel1.modulate.fsk.rate", self.write, "SOUR1:FSK:INT:RATE {value}"),
("channel1.modulate.sum.modulate_percent", self.write, "SOUR1:SUM:AMPL {percent}"),
# Internal
("channel1.modulate.source.internal.shape.sin", self.write,
"{self._store[ch1_modulate_state]}:INT:FUNC SIN"),
("channel1.modulate.source.internal.shape.square", self.write,
"{self._store[ch1_modulate_state]}:INT:FUNC SQU"),
("channel1.modulate.source.internal.shape.triangle", self.write,
"{self._store[ch1_modulate_state]}:INT:FUNC TRI"),
("channel1.modulate.source.internal.shape.up_ramp", self.write,
"{self._store[ch1_modulate_state]}:INT:FUNC RAMP"),
("channel1.modulate.source.internal.shape.down_ramp", self.write,
"{self._store[ch1_modulate_state]}:INT:FUNC NRAMP"),
("channel1.modulate.source.internal.shape.noise", self.write,
"{self._store[ch1_modulate_state]}:INT:FUNC NOIS"),
# Modulate Frequency
("channel1.modulate.source.internal.frequency", self.write,
"SOUR1:{self._store[ch1_modulate_state]}:INT:{self._store[ch1_modulate_setting]} {value}"),
# LOAD:
# channel1:
("channel1.load._call", self.write, "OUTP1:LOAD {ohms}"),
("channel1.load.infinite", self.write, "OUTP1:LOAD INF"),
# BURST
# Channel 1:
("channel1.burst.gated._call", self.write, "SOUR1:BURS:MODE GAT"),
("channel1.burst._call", self.write, "SOUR1:BURS:STAT {value}"),
("channel1.burst.ncycle._call", self.write, "SOUR1:BURS:MODE TRIG"),
("channel1.burst.ncycle.cycles._call", self.write, "SOUR1:BURS:NCYC {cycles}"),
("channel1.burst.ncycle.cycles.infinite", self.write, "SOUR1:BURS:NCYC INF"),
("channel1.burst.ncycle.burst_period", self.write, "SOUR1:BURS:INT:PER {seconds}"),
("channel1.burst.gated.positive", self.write, "SOUR1:BURS:GATE:POL NORM"),
("channel1.burst.gated.negative", self.write, "SOUR1:BURS:GATE:POL INV"),
("channel1.burst.phase", self.write, "SOUR1:BURS:PHAS {degrees}"),
]
# ----------------------------------------------------------------------------------------------------------------------
self.init_api()
def self_test(self):
timeout = self.instrument.timeout
try:
self.instrument.timeout = 20000
resp = self.instrument.query("*TST?")
if "0" not in resp:
raise InstrumentError("Failed Self Test")
finally:
self.instrument.timeout = timeout
def local(self):
"""
Gives local control back to the instrument
Remote control is activated on any other commands set to the device
:return:
"""
self._write("SYSTem:LOCal")
def reset(self):
"""
Be aware that the funcgen can have a short period where it sets to 5Vpp 1kHz with the output on for a short
period. This could cause issues. Ensure that setup is in a safe state to receive such a signal.
:return:
"""
self._write("*RST;*CLS")
self._write("OUTP1:LOAD INF")
def _write(self, data):
"""
The DG1022 cannot respond to visa commands as quickly as some other devices
A 100ms delay was found to be reliable for most commands with the exception of the *IDN?
identification command. An extra 100ms should be allowed for explicit calls to *IDN?
Note:
The 6000 number for the sleep is derived from trial and error. The write calls don't seem to block at the rate
they write. By allowing 166uS delay for each byte of data then the Funcgen doesn't choke on the next call. A
flat 100ms is added to allow processing time.
This is especially important for commands that write large amounts of data such as user arbitrary forms.
"""
if data:
if isinstance(data, str):
self.instrument.write(data)
time.sleep(0.1 + len(data) / 6000)
else:
for itm in data:
self.instrument.write(itm)
time.sleep(0.1 + len(itm) / 6000)
else:
raise ParameterError("Missing data in instrument write")
self._is_error()
def _check_errors(self):
resp = self.instrument.query("SYST:ERR?")
code, msg = resp.strip('\n').split(',')
code = int(code)
msg = msg.strip('"')
return code, msg
def _is_error(self, silent=False):
errors = []
while True:
code, msg = self._check_errors()
if code != 0:
errors.append((code, msg))
else:
break
if errors:
if silent:
return errors
else:
raise InstrumentError("Error(s) Returned from FuncGen\n" +
"\n".join(["Code: {}\nMessage:{}".format(code, msg) for code, msg in errors]))
def write(self, base_str, *args, **kwargs):
formatted_string = self._format_string(base_str, **kwargs)
self._write(formatted_string)
def _format_string(self, base_str, **kwargs):
kwargs['self'] = self
prev_string = base_str
cur_string = ""
while True:
cur_string = prev_string.format(**kwargs)
if cur_string == prev_string:
break
prev_string = cur_string
return cur_string
def store(self, store_dict, *args, **kwargs):
"""
Store a dictionary of values in TestClass
:param kwargs:
Dictionary containing the parameters to store
:return:
"""
new_dict = store_dict.copy()
for k, v in store_dict.items():
# I want the same function from write to set up the string before putting it in new_dict
try:
new_dict[k] = v.format(**kwargs)
except:
pass
self._store.update(new_dict)
def store_and_execute(self, params, *args, **kwargs):
store_dict, handler_id = params
self.store(store_dict, *args, **kwargs)
handler_string = self._store[handler_id]
if handler_string is not None:
*parents, func = handler_string.split(".")
parent_obj = self
for parent in parents:
parent_obj = getattr(parent_obj, parent)
handler = getattr(parent_obj, func)
handler(*args)
def store_and_write(self, params, *args, **kwargs):
base_str, store_dict = params
self.store(store_dict)
self.write(base_str, *args, **kwargs)
def init_api(self):
for func_str, handler, base_str in self.api:
*parents, func = func_str.split(".")
parent_obj = self
for parent in parents:
parent_obj = getattr(parent_obj, parent)
func_obc = getattr(parent_obj, func)
setattr(parent_obj, func, self.prepare_string(func_obc, handler, base_str))
def prepare_string(self, func, handler, base_str, *args, **kwargs):
def temp_func(*nargs, **nkwargs):
"""
Only formats using **nkwargs
New Temp
:param nargs:
:param nkwargs:
:return:
"""
sig = inspect.signature(func)
keys = [itm[0] for itm in sig.parameters.items()]
# Hard coding for RIGOL. BOOLS should be converted to "ON", "OFF"
for index, param in enumerate(nargs):
nkwargs[keys[index]] = param
for k, v in nkwargs.items():
if sig.parameters[k].annotation == bool:
if v:
nkwargs[k] = "ON"
else:
nkwargs[k] = "OFF"
# new_str = base_str.format(**nkwargs)
# handler(self, new_str)
return handler(base_str, **nkwargs)
return update_wrapper(temp_func, func)