/
uca.py
212 lines (159 loc) · 6.09 KB
/
uca.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
"""
Cameras supported by the libuca library.
"""
import time
import numpy as np
from concert.coroutines import null
from concert.helpers import async, inject
from concert.quantities import q
from concert.base import Parameter
from concert.devices.cameras import base
def _new_setter_wrapper(camera, name, unit=None):
def _wrapper(value):
if unit:
value = value.to(unit)
try:
dic = {name: value.magnitude}
except AttributeError:
dic = {name: value}
camera.set_properties(**dic)
return _wrapper
def _new_getter_wrapper(camera, name, unit=None):
def _wrapper():
value = camera.get_property(name)
if unit:
return value * unit
return value
return _wrapper
def _create_data_array(camera):
bits = camera.props.sensor_bitdepth
dtype = np.uint16 if bits > 8 else np.uint8
dims = camera.props.roi_height, camera.props.roi_width
array = np.empty(dims, dtype=dtype)
return (array, array.__array_interface__['data'][0])
class Camera(base.Camera):
"""libuca-based camera.
All properties that are exported by the underlying camera are also visible
in :class:`UcaCamera`.
:raises ValueError: In case camera *name* does not exist.
"""
def __init__(self, name):
from gi.repository import GObject, Uca
self._manager = Uca.PluginManager()
try:
self.uca = self._manager.get_camerav(name, [])
except:
raise ValueError("`{0}' is not a valid camera".format(name))
units = {
Uca.Unit.METER: q.m,
Uca.Unit.SECOND: q.s,
Uca.Unit.DEGREE_CELSIUS: q.celsius,
Uca.Unit.COUNT: q.count
}
parameters = []
for prop in self.uca.props:
getter, setter, unit = None, None, None
uca_unit = self.uca.get_unit(prop.name)
if uca_unit in units:
unit = units[uca_unit]
if prop.flags & GObject.ParamFlags.READABLE:
getter = _new_getter_wrapper(self.uca, prop.name, unit)
if prop.flags & GObject.ParamFlags.WRITABLE:
setter = _new_setter_wrapper(self.uca, prop.name, unit)
parameter = Parameter(prop.name, getter, setter, unit)
parameters.append(parameter)
super(Camera, self).__init__(parameters)
def readout(self, condition=lambda: True):
"""
Readout images from the camera buffer. *condition* is a callable,
as long as it resolves to True the camera keeps grabbing.
"""
while condition():
image = self.grab()
if image is None:
break
yield image
def take_frames(self, num_frames):
"""
Take *num_frames* frames. The camera is triggered explicitly from
Concert so the number of recorded frames is exact. The frames are
yielded as they are acquired.
"""
try:
self.start_recording()
for i in xrange(num_frames):
self.trigger()
yield self.grab()
finally:
self.stop_recording()
def _get_frame_rate(self):
return self.frames_per_second / q.s
def _set_frame_rate(self, frame_rate):
self.frames_per_second = frame_rate * q.s
def _record_real(self):
self.uca.start_recording()
def _stop_real(self):
self.uca.stop_recording()
def _trigger_real(self):
self.uca.trigger()
def _grab_real(self):
array, data = _create_data_array(self.uca)
if self.uca.grab(data):
return array
return None
class Dimax(Camera):
"""A PCO.dimax camera implementation based on libuca :py:class:`Camera`."""
def __init__(self):
super(Dimax, self).__init__("pco")
def readout_blocking(self, condition=lambda: True):
"""
Readout the frames and don't allow recording in the meantime.
*condition* is the same as in :py:meth:`Camera.readout`.
"""
try:
self.uca.start_readout()
super(Dimax, self).readout(condition)
finally:
self.uca.stop_readout()
def acquire_auto(self, num_frames):
"""
Acquire approximately *num_frames* frames. Camera is set to a mode when
it is triggered automatically and live images streaming is enabled.
Live frames are yielded as they are grabbed.
**Note** the number of actually recorded images may differ.
"""
@async
def async_wait():
try:
sleep_time = (num_frames /
self.frame_rate).to_base_units().magnitude
time.sleep(sleep_time)
finally:
self.stop_recording()
self.uca.props.trigger_mode = self.uca.props.trigger_mode.AUTO
self.uca.props.storage_mode = 0
self.uca.props.record_mode = 1
# We need to make sure that the camera is in recording mode when we
# start grabbing live frames, thus we start it synchronously.
self.start_recording()
# The sleeping and camera stopping can be handled asynchronously
acq_future = async_wait()
# Yield live frames
for frame in self.readout(condition=\
lambda :self.uca.props.is_recording):
yield frame
# Wait for the acquisition to end
acq_future.result()
def take_frames_auto(self, num_frames, consumer=None):
"""
Acquire and readout *num_frames*. Frames are first recorded to the
internal camera memory and then read out. The camera is triggered
automatically. *consumer* is a coroutine which is fed with live
frames. After the recording is done the frames are yielded as they are
being grabbed from the camera.
"""
# We need to provide a consumer, otherwise the generator method
# wouldn't start
consumer = null() if consumer is None else consumer
inject(self.acquire_auto(num_frames), consumer)
return (frame for frame in self.readout())