-
Notifications
You must be signed in to change notification settings - Fork 0
/
camera.py
251 lines (198 loc) · 7.61 KB
/
camera.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# -*- coding: utf-8 -*-
#
# Author: Lars B. Rollik <L.B.Rollik@protonmail.com>
# License: BSD 3-Clause
import logging
import time
try:
import RPi.GPIO as GPIO
import picamera
from picamera import mmal
except ImportError:
raise ImportError(
"Can only run camera module on Raspberry Pi with RPi.GPIO and picamera packages installed."
)
from rpi_camera_colony.files import close_file_safe
from rpi_camera_colony.files import DummyFileObject
from rpi_camera_colony.acquisition.streaming import StreamingOutput
GPIO.setwarnings(False)
GPIO.cleanup()
GPIO.setmode(GPIO.BOARD)
def _get_realtime():
return time.clock_gettime(time.CLOCK_REALTIME)
class VideoEncoder(picamera.PiVideoEncoder):
"""https://picamera.readthedocs.io/en/release-1.13/recipes2.html#custom-encoders"""
frame_count = 0
ttl_count = 0
_ttl_out_pin = 8 # TODO: set to final value
_ttl_out_duration = 0.001 # 1ms
def __init__(self, *args, **kwargs):
super(VideoEncoder, self).__init__(*args, **kwargs)
for arg, val in kwargs.items():
if hasattr(self, arg):
setattr(self, arg, val)
@property
def ttl_out_pin(self):
return self._ttl_out_pin
@ttl_out_pin.setter
def ttl_out_pin(self, value):
logging.debug(f"Setting TTL out pin to {value}")
self._ttl_out_pin = value
if self.ttl_out_pin is not None:
GPIO.setup(self.ttl_out_pin, GPIO.OUT, initial=GPIO.LOW)
GPIO.output(self.ttl_out_pin, False)
@property
def ttl_out_duration(self):
return self._ttl_out_duration
@ttl_out_duration.setter
def ttl_out_duration(self, value):
logging.debug(f"Setting TTL out duration to {value}")
self._ttl_out_duration = value
def start(self, output, motion_output=None):
super(VideoEncoder, self).start(output, motion_output)
def close(self):
super(VideoEncoder, self).close()
if self.ttl_count != self.frame_count:
logging.warning(
f"Frame count ({self.frame_count}) and TTL count ({self.ttl_count}) do not match."
)
def _callback_write(self, buf, key=None):
if (buf.flags & mmal.MMAL_BUFFER_HEADER_FLAG_FRAME_END) and not (
buf.flags & mmal.MMAL_BUFFER_HEADER_FLAG_CONFIG
):
if self.ttl_out_pin is not None:
GPIO.output(self.ttl_out_pin, True)
time.sleep(self.ttl_out_duration)
GPIO.output(self.ttl_out_pin, False)
self.ttl_count += 1
self.parent._write_timestamps_frame_ttl_out(
buf.pts, self.parent.timestamp
)
self.frame_count += 1
return super(VideoEncoder, self)._callback_write(buf)
class Camera(picamera.PiCamera):
_ttl_out_pin = 8
_ttl_in_pin = 16
_ttl_out_duration = 0.001
file_timestamps_ttl_out = None
file_timestamps_ttl_in = None
clock_mode = "raw"
stream_video = False
streaming_output = None
def __init__(
self,
framerate=30,
resolution=(1600, 1200),
**kwargs,
):
for attr, value in kwargs.items():
if hasattr(self, attr):
setattr(self, attr, value)
logging.debug(f"Set {self}, attr {attr} to value {value}")
super(Camera, self).__init__(
framerate=framerate,
resolution=resolution,
clock_mode=self.clock_mode,
)
if self.stream_video:
self.streaming_output = StreamingOutput()
@property
def ttl_out_pin(self):
return self._ttl_out_pin
@ttl_out_pin.setter
def ttl_out_pin(self, value):
self._ttl_out_pin = value
@property
def ttl_out_duration(self):
return self._ttl_out_duration
@ttl_out_duration.setter
def ttl_out_duration(self, value):
self._ttl_out_duration = value
@property
def ttl_in_pin(self):
return self._ttl_in_pin
@ttl_in_pin.setter
def ttl_in_pin(self, value):
self._ttl_in_pin = value
def _get_video_encoder(
self, camera_port, output_port, format, resize, **options
):
video_encoder = VideoEncoder(
self, camera_port, output_port, format, resize, **options
)
video_encoder.ttl_out_pin = self.ttl_out_pin
video_encoder.ttl_out_duration = self.ttl_out_duration
return video_encoder
def __del__(self):
GPIO.cleanup()
del self
def _write_timestamps_ttl_in(self, x=None):
if self.file_timestamps_ttl_in is not None:
self.file_timestamps_ttl_in.write(
f"{self.timestamp},{_get_realtime()}\n"
)
logging.info(f"TTL-in detected at {self.timestamp}")
def _write_timestamps_frame_ttl_out(self, cam_ts, frame_ts):
if self.file_timestamps_ttl_out is not None:
self.file_timestamps_ttl_out.write(
f"{cam_ts},{frame_ts},{_get_realtime()}\n"
)
def start_recording(self, output_files=None, **kwargs):
if isinstance(output_files["video"], DummyFileObject):
logging.debug("Not allowed to write output files.")
self.ttl_out_pin = None
self.ttl_in_pin = None
if self.ttl_out_pin is not None:
# Open TTL file and write header
self.file_timestamps_ttl_out = open(output_files["ttl.out"], "w")
self.file_timestamps_ttl_out.write(
"timestamp_frame,timestamp_ttl,sys_time\n"
)
if self.ttl_in_pin is not None:
logging.debug(f"Setting TTL in pin to {self.ttl_in_pin}")
# Add event detection callback
GPIO.setup(self.ttl_in_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.add_event_detect(
self.ttl_in_pin, GPIO.RISING, self._write_timestamps_ttl_in
)
# Open TTL file and write header
self.file_timestamps_ttl_in = open(output_files["ttl.in"], "w")
self.file_timestamps_ttl_in.write("timestamp_frame,sys_time\n")
super(Camera, self).start_recording(
output=str(output_files["video"]), **kwargs
)
if self.stream_video:
stream_kwargs = kwargs.copy()
stream_kwargs["format"] = "mjpeg"
super(Camera, self).start_recording(
output=self.streaming_output, splitter_port=2, **stream_kwargs
)
def stop_recording(self):
if self.ttl_in_pin is not None:
GPIO.remove_event_detect(self.ttl_in_pin)
try:
if self.stream_video:
super(Camera, self).stop_recording(splitter_port=2)
super(Camera, self).stop_recording()
except BaseException:
logging.error("CAMERA STOP EXCEPTION")
# Close TTL files
if self.file_timestamps_ttl_out is not None:
close_file_safe(file_handle=self.file_timestamps_ttl_out)
if self.file_timestamps_ttl_in is not None:
close_file_safe(file_handle=self.file_timestamps_ttl_in)
def preview_static(self, warmup_delay=3, alpha=255):
if self.preview is not None:
self.stop_preview()
self.awb_mode = "auto"
self.exposure_mode = "auto"
self.start_preview(alpha=alpha)
time.sleep(warmup_delay)
awb_gains = self.awb_gains
self.awb_mode = "off"
self.awb_gains = awb_gains
self.shutter_speed = self.exposure_speed
self.exposure_mode = "off"
logging.debug(
f"Camera previewing & updated awb_gains={awb_gains}, shutter_speed={self.shutter_speed}"
)