-
Notifications
You must be signed in to change notification settings - Fork 126
/
ps5000aStreamingExample.py
212 lines (178 loc) · 7.84 KB
/
ps5000aStreamingExample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#
# Copyright (C) 2018-2019 Pico Technology Ltd. See LICENSE file for terms.
#
# PS5000 Series (A API) STREAMING MODE EXAMPLE
# This example demonstrates how to call the ps5000a driver API functions in order to open a device, setup 2 channels and collects streamed data (1 buffer).
# This data is then plotted as mV against time in ns.
import ctypes
import numpy as np
from picosdk.ps5000a import ps5000a as ps
import matplotlib.pyplot as plt
from picosdk.functions import adc2mV, assert_pico_ok
import time
# Create chandle and status ready for use
chandle = ctypes.c_int16()
status = {}
# Open PicoScope 5000 Series device
# Resolution set to 12 Bit
resolution =ps.PS5000A_DEVICE_RESOLUTION["PS5000A_DR_12BIT"]
# Returns handle to chandle for use in future API functions
status["openunit"] = ps.ps5000aOpenUnit(ctypes.byref(chandle), None, resolution)
try:
assert_pico_ok(status["openunit"])
except: # PicoNotOkError:
powerStatus = status["openunit"]
if powerStatus == 286:
status["changePowerSource"] = ps.ps5000aChangePowerSource(chandle, powerStatus)
elif powerStatus == 282:
status["changePowerSource"] = ps.ps5000aChangePowerSource(chandle, powerStatus)
else:
raise
assert_pico_ok(status["changePowerSource"])
enabled = 1
disabled = 0
analogue_offset = 0.0
# Set up channel A
# handle = chandle
# channel = PS5000A_CHANNEL_A = 0
# enabled = 1
# coupling type = PS5000A_DC = 1
# range = PS5000A_2V = 7
# analogue offset = 0 V
channel_range = ps.PS5000A_RANGE['PS5000A_2V']
status["setChA"] = ps.ps5000aSetChannel(chandle,
ps.PS5000A_CHANNEL['PS5000A_CHANNEL_A'],
enabled,
ps.PS5000A_COUPLING['PS5000A_DC'],
channel_range,
analogue_offset)
assert_pico_ok(status["setChA"])
# Set up channel B
# handle = chandle
# channel = PS5000A_CHANNEL_B = 1
# enabled = 1
# coupling type = PS5000A_DC = 1
# range = PS5000A_2V = 7
# analogue offset = 0 V
status["setChB"] = ps.ps5000aSetChannel(chandle,
ps.PS5000A_CHANNEL['PS5000A_CHANNEL_B'],
enabled,
ps.PS5000A_COUPLING['PS5000A_DC'],
channel_range,
analogue_offset)
assert_pico_ok(status["setChB"])
# Size of capture
sizeOfOneBuffer = 500
numBuffersToCapture = 10
totalSamples = sizeOfOneBuffer * numBuffersToCapture
# Create buffers ready for assigning pointers for data collection
bufferAMax = np.zeros(shape=sizeOfOneBuffer, dtype=np.int16)
bufferBMax = np.zeros(shape=sizeOfOneBuffer, dtype=np.int16)
memory_segment = 0
# Set data buffer location for data collection from channel A
# handle = chandle
# source = PS5000A_CHANNEL_A = 0
# pointer to buffer max = ctypes.byref(bufferAMax)
# pointer to buffer min = ctypes.byref(bufferAMin)
# buffer length = maxSamples
# segment index = 0
# ratio mode = PS5000A_RATIO_MODE_NONE = 0
status["setDataBuffersA"] = ps.ps5000aSetDataBuffers(chandle,
ps.PS5000A_CHANNEL['PS5000A_CHANNEL_A'],
bufferAMax.ctypes.data_as(ctypes.POINTER(ctypes.c_int16)),
None,
sizeOfOneBuffer,
memory_segment,
ps.PS5000A_RATIO_MODE['PS5000A_RATIO_MODE_NONE'])
assert_pico_ok(status["setDataBuffersA"])
# Set data buffer location for data collection from channel B
# handle = chandle
# source = PS5000A_CHANNEL_B = 1
# pointer to buffer max = ctypes.byref(bufferBMax)
# pointer to buffer min = ctypes.byref(bufferBMin)
# buffer length = maxSamples
# segment index = 0
# ratio mode = PS5000A_RATIO_MODE_NONE = 0
status["setDataBuffersB"] = ps.ps5000aSetDataBuffers(chandle,
ps.PS5000A_CHANNEL['PS5000A_CHANNEL_B'],
bufferBMax.ctypes.data_as(ctypes.POINTER(ctypes.c_int16)),
None,
sizeOfOneBuffer,
memory_segment,
ps.PS5000A_RATIO_MODE['PS5000A_RATIO_MODE_NONE'])
assert_pico_ok(status["setDataBuffersB"])
# Begin streaming mode:
sampleInterval = ctypes.c_int32(250)
sampleUnits = ps.PS5000A_TIME_UNITS['PS5000A_US']
# We are not triggering:
maxPreTriggerSamples = 0
autoStopOn = 1
# No downsampling:
downsampleRatio = 1
status["runStreaming"] = ps.ps5000aRunStreaming(chandle,
ctypes.byref(sampleInterval),
sampleUnits,
maxPreTriggerSamples,
totalSamples,
autoStopOn,
downsampleRatio,
ps.PS5000A_RATIO_MODE['PS5000A_RATIO_MODE_NONE'],
sizeOfOneBuffer)
assert_pico_ok(status["runStreaming"])
actualSampleInterval = sampleInterval.value
actualSampleIntervalNs = actualSampleInterval * 1000
print("Capturing at sample interval %s ns" % actualSampleIntervalNs)
# We need a big buffer, not registered with the driver, to keep our complete capture in.
bufferCompleteA = np.zeros(shape=totalSamples, dtype=np.int16)
bufferCompleteB = np.zeros(shape=totalSamples, dtype=np.int16)
nextSample = 0
autoStopOuter = False
wasCalledBack = False
def streaming_callback(handle, noOfSamples, startIndex, overflow, triggerAt, triggered, autoStop, param):
global nextSample, autoStopOuter, wasCalledBack
wasCalledBack = True
destEnd = nextSample + noOfSamples
sourceEnd = startIndex + noOfSamples
bufferCompleteA[nextSample:destEnd] = bufferAMax[startIndex:sourceEnd]
bufferCompleteB[nextSample:destEnd] = bufferBMax[startIndex:sourceEnd]
nextSample += noOfSamples
if autoStop:
autoStopOuter = True
# Convert the python function into a C function pointer.
cFuncPtr = ps.StreamingReadyType(streaming_callback)
# Fetch data from the driver in a loop, copying it out of the registered buffers and into our complete one.
while nextSample < totalSamples and not autoStopOuter:
wasCalledBack = False
status["getStreamingLastestValues"] = ps.ps5000aGetStreamingLatestValues(chandle, cFuncPtr, None)
if not wasCalledBack:
# If we weren't called back by the driver, this means no data is ready. Sleep for a short while before trying
# again.
time.sleep(0.01)
print("Done grabbing values.")
# Find maximum ADC count value
# handle = chandle
# pointer to value = ctypes.byref(maxADC)
maxADC = ctypes.c_int16()
status["maximumValue"] = ps.ps5000aMaximumValue(chandle, ctypes.byref(maxADC))
assert_pico_ok(status["maximumValue"])
# Convert ADC counts data to mV
adc2mVChAMax = adc2mV(bufferCompleteA, channel_range, maxADC)
adc2mVChBMax = adc2mV(bufferCompleteB, channel_range, maxADC)
# Create time data
time = np.linspace(0, (totalSamples - 1) * actualSampleIntervalNs, totalSamples)
# Plot data from channel A and B
plt.plot(time, adc2mVChAMax[:])
plt.plot(time, adc2mVChBMax[:])
plt.xlabel('Time (ns)')
plt.ylabel('Voltage (mV)')
plt.show()
# Stop the scope
# handle = chandle
status["stop"] = ps.ps5000aStop(chandle)
assert_pico_ok(status["stop"])
# Disconnect the scope
# handle = chandle
status["close"] = ps.ps5000aCloseUnit(chandle)
assert_pico_ok(status["close"])
# Display status returns
print(status)