-
Notifications
You must be signed in to change notification settings - Fork 231
/
depthai.py
305 lines (248 loc) · 12.8 KB
/
depthai.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import sys
from time import time
from time import sleep
import argparse
from argparse import ArgumentParser
import json
import numpy as np
import cv2
import os
import subprocess
import depthai
import consts.resource_paths
from depthai_helpers import utils
class bcolors:
HEADER = '\033[95m'
BLUE = '\033[94m'
GREEN = '\033[92m'
RED = '\033[91m'
WARNING = '\033[1;5;31m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def parse_args():
epilog_text = '''
Displays video streams captured by DepthAI.
Example usage:
# Pass thru pipeline config options
## USB3 w/onboard cameras board config:
python3 test.py -co '{"board_config": {"left_to_right_distance_cm": 7.5}}'
## Show the depth stream:
python3 test.py -co '{"streams": [{"name": "depth_sipp", "max_fps": 12.0}]}'
'''
parser = ArgumentParser(epilog=epilog_text,formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("-co", "--config_overwrite", default=None,
type=str, required=False,
help="JSON-formatted pipeline config object. This will be override defaults used in this script.")
parser.add_argument("-fv", "--field-of-view", default=71.86, type=float,
help="Horizontal field of view (HFOV) for the stereo cameras in [deg]")
parser.add_argument("-b", "--baseline", default=9.0, type=float,
help="Left/Right camera baseline in [cm]")
parser.add_argument("-r", "--rgb-baseline", default=2.0, type=float,
help="Distance the RGB camera is from the Left camera")
parser.add_argument("-w", "--no-swap-lr", dest="swap_lr", default=True, action="store_false",
help="Do not swap the Left and Right cameras")
parser.add_argument("-e", "--store-eeprom", default=False, action='store_true',
help="Store the calibration and board_config (fov, baselines, swap-lr) in the EEPROM onboard")
parser.add_argument("--clear-eeprom", default=False, action='store_true',
help="Invalidate the calib and board_config from EEPROM")
parser.add_argument("-o", "--override-calib", default=False, action='store_true',
help="Use the calib and board_config from host, ignoring the EEPROM data if programmed")
parser.add_argument("-dev", "--device-id", default='', type=str,
help="USB port number for the device to connect to. Use the word 'list' to show all devices and exit.")
parser.add_argument("-debug", "--dev_debug", default=None, action='store_true',
help="Used by board developers for debugging.")
parser.add_argument("-fusb2", "--force_usb2", default=None, action='store_true',
help="Force usb2 connection")
options = parser.parse_args()
return options
global args
try:
args = vars(parse_args())
except:
os._exit(2)
if args['config_overwrite']:
args['config_overwrite'] = json.loads(args['config_overwrite'])
print("Using Arguments=",args)
if args['force_usb2']:
print(bcolors.WARNING + "FORCE USB2 MODE" + bcolors.ENDC)
cmd_file = consts.resource_paths.device_usb2_cmd_fpath
else:
cmd_file = consts.resource_paths.device_cmd_fpath
if args['dev_debug']:
cmd_file = ''
print('depthai will not load cmd file into device.')
labels = []
with open(consts.resource_paths.blob_labels_fpath) as fp:
labels = fp.readlines()
labels = [i.strip() for i in labels]
print('depthai.__version__ == %s' % depthai.__version__)
print('depthai.__dev_version__ == %s' % depthai.__dev_version__)
ret = subprocess.call(['grep', '-irn', 'ATTRS{idVendor}=="03e7"', '/etc/udev/rules.d'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if(ret != 0):
print(bcolors.WARNING + "\nWARNING: Usb rules not found" + bcolors.ENDC)
print(bcolors.RED + "\nSet rules: \n" \
"""echo 'SUBSYSTEM=="usb", ATTRS{idVendor}=="03e7", MODE="0666"' | sudo tee /etc/udev/rules.d/80-movidius.rules \n""" \
"sudo udevadm control --reload-rules && udevadm trigger \n" \
"Disconnect/connect usb cable on host! \n" + bcolors.ENDC)
os._exit(1)
if not depthai.init_device(cmd_file, args['device_id']):
print("Error initializing device. Try to reset it.")
exit(1)
print('Available streams: ' + str(depthai.get_available_steams()))
# Do not modify the default values in the config Dict below directly. Instead, use the `-co` argument when running this script.
config = {
# Possible streams:
# ['left', 'right','previewout', 'metaout', 'depth_sipp', 'disparity', 'depth_color_h']
# If "left" is used, it must be in the first position.
# To test depth use:
# 'streams': [{'name': 'depth_sipp', "max_fps": 12.0}, {'name': 'previewout', "max_fps": 12.0}, ],
'streams': ['metaout', 'previewout'],
'depth':
{
'calibration_file': consts.resource_paths.calib_fpath,
# 'type': 'median',
'padding_factor': 0.3
},
'ai':
{
'blob_file': consts.resource_paths.blob_fpath,
'blob_file_config': consts.resource_paths.blob_config_fpath,
'calc_dist_to_bb': True
},
'board_config':
{
'swap_left_and_right_cameras': args['swap_lr'], # True for 1097 (RPi Compute) and 1098OBC (USB w/onboard cameras)
'left_fov_deg': args['field_of_view'], # Same on 1097 and 1098OBC
'left_to_right_distance_cm': args['baseline'], # Distance between stereo cameras
'left_to_rgb_distance_cm': args['rgb_baseline'], # Currently unused
'store_to_eeprom': args['store_eeprom'],
'clear_eeprom': args['clear_eeprom'],
'override_eeprom_calib': args['override_calib'],
}
}
if args['config_overwrite'] is not None:
config = utils.merge(args['config_overwrite'],config)
print("Merged Pipeline config with overwrite",config)
if 'depth_sipp' in config['streams'] and ('depth_color_h' in config['streams'] or 'depth_mm_h' in config['streams']):
print('ERROR: depth_sipp is mutually exclusive with depth_color_h')
exit(2)
# del config["streams"][config['streams'].index('depth_sipp')]
stream_names = [stream if isinstance(stream, str) else stream['name'] for stream in config['streams']]
# create the pipeline, here is the first connection with the device
p = depthai.create_pipeline(config=config)
if p is None:
print('Pipeline is not created.')
exit(3)
t_start = time()
frame_count = {}
frame_count_prev = {}
for s in stream_names:
frame_count[s] = 0
frame_count_prev[s] = 0
entries_prev = []
process_watchdog_timeout=10 #seconds
def reset_process_wd():
global wd_cutoff
wd_cutoff=time()+process_watchdog_timeout
return
reset_process_wd()
while True:
# retreive data from the device
# data is stored in packets, there are nnet (Neural NETwork) packets which have additional functions for NNet result interpretation
nnet_packets, data_packets = p.get_available_nnet_and_data_packets()
packets_len = len(nnet_packets) + len(data_packets)
if packets_len != 0:
reset_process_wd()
else:
cur_time=time()
if cur_time > wd_cutoff:
print("process watchdog timeout")
os._exit(10)
for i, nnet_packet in enumerate(nnet_packets):
# the result of the MobileSSD has detection rectangles (here: entries), and we can iterate threw them
for i, e in enumerate(nnet_packet.entries()):
# for MobileSSD entries are sorted by confidence
# {id == -1} or {confidence == 0} is the stopper (special for OpenVINO models and MobileSSD architecture)
if e[0]['id'] == -1.0 or e[0]['confidence'] == 0.0:
break
if i == 0:
entries_prev.clear()
# save entry for further usage (as image package may arrive not the same time as nnet package)
entries_prev.append(e)
for packet in data_packets:
if packet.stream_name not in stream_names:
continue # skip streams that were automatically added
elif packet.stream_name == 'previewout':
data = packet.getData()
# the format of previewout image is CHW (Chanel, Height, Width), but OpenCV needs HWC, so we
# change shape (3, 300, 300) -> (300, 300, 3)
data0 = data[0,:,:]
data1 = data[1,:,:]
data2 = data[2,:,:]
frame = cv2.merge([data0, data1, data2])
img_h = frame.shape[0]
img_w = frame.shape[1]
# iterate threw pre-saved entries & draw rectangle & text on image:
for e in entries_prev:
# the lower confidence threshold - the more we get false positives
if e[0]['confidence'] > 0.5:
x1 = int(e[0]['left'] * img_w)
y1 = int(e[0]['top'] * img_h)
pt1 = x1, y1
pt2 = int(e[0]['right'] * img_w), int(e[0]['bottom'] * img_h)
cv2.rectangle(frame, pt1, pt2, (0, 0, 255))
# Handles case where TensorEntry object label = 7552.
if e[0]['label'] > len(labels):
print("Label index=",e[0]['label'], "is out of range. Not applying text to rectangle.")
else:
pt_t1 = x1, y1 + 20
cv2.putText(frame, labels[int(e[0]['label'])], pt_t1, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
pt_t2 = x1, y1 + 40
cv2.putText(frame, '{:.2f}'.format(100*e[0]['confidence']) + ' %', pt_t2, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255))
if config['ai']['calc_dist_to_bb']:
pt_t3 = x1, y1 + 60
cv2.putText(frame, 'x:' '{:7.3f}'.format(e[0]['distance_x']) + ' m', pt_t3, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255))
pt_t4 = x1, y1 + 80
cv2.putText(frame, 'y:' '{:7.3f}'.format(e[0]['distance_y']) + ' m', pt_t4, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255))
pt_t5 = x1, y1 + 100
cv2.putText(frame, 'z:' '{:7.3f}'.format(e[0]['distance_z']) + ' m', pt_t5, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255))
cv2.putText(frame, "fps: " + str(frame_count_prev[packet.stream_name]), (25, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 0))
cv2.imshow('previewout', frame)
elif packet.stream_name == 'left' or packet.stream_name == 'right' or packet.stream_name == 'disparity':
frame_bgr = packet.getData()
cv2.putText(frame_bgr, packet.stream_name, (25, 25), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 0))
cv2.putText(frame_bgr, "fps: " + str(frame_count_prev[packet.stream_name]), (25, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 0))
cv2.imshow(packet.stream_name, frame_bgr)
elif packet.stream_name.startswith('depth'):
frame = packet.getData()
if len(frame.shape) == 2:
if frame.dtype == np.uint8: # grayscale
cv2.putText(frame, packet.stream_name, (25, 25), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255))
cv2.putText(frame, "fps: " + str(frame_count_prev[packet.stream_name]), (25, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255))
cv2.imshow(packet.stream_name, frame)
else: # uint16
frame = (65535 // frame).astype(np.uint8)
#colorize depth map, comment out code below to obtain grayscale
frame = cv2.applyColorMap(frame, cv2.COLORMAP_HOT)
# frame = cv2.applyColorMap(frame, cv2.COLORMAP_JET)
cv2.putText(frame, packet.stream_name, (25, 25), cv2.FONT_HERSHEY_SIMPLEX, 1.0, 255)
cv2.putText(frame, "fps: " + str(frame_count_prev[packet.stream_name]), (25, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, 255)
cv2.imshow(packet.stream_name, frame)
else: # bgr
cv2.putText(frame, packet.stream_name, (25, 25), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255))
cv2.putText(frame, "fps: " + str(frame_count_prev[packet.stream_name]), (25, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, 255)
cv2.imshow(packet.stream_name, frame)
frame_count[packet.stream_name] += 1
t_curr = time()
if t_start + 1.0 < t_curr:
t_start = t_curr
for s in stream_names:
frame_count_prev[s] = frame_count[s]
frame_count[s] = 0
if cv2.waitKey(1) == ord('q'):
break
del p # in order to stop the pipeline object should be deleted, otherwise device will continue working. This is required if you are going to add code after the main loop, otherwise you can ommit it.
print('py: DONE.')
os._exit(0)