-
Notifications
You must be signed in to change notification settings - Fork 4
/
detect
129 lines (102 loc) · 4.83 KB
/
detect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python3
import rospy
from hri_fullbody.fullbody_detector import FullbodyDetector
import random
from hri_msgs.msg import IdsList
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
from collections import OrderedDict
# body detection processing time in ms triggering a diagnostic warning
BODY_DETECTION_PROC_TIME_WARN = 1000.
def generate_id():
"""This function generates a 5 chars ID.
"""
return "".join(random.sample("abcdefghijklmnopqrstuvwxyz", 5))
class MultibodyManager:
def __init__(
self,
single_body,
use_depth,
stickman_debug):
self.single_body = single_body
self.use_depth = use_depth
self.stickman_debug = stickman_debug
# Dictionary for the detected people
self.detected_bodies = OrderedDict()
# id = unique body ID
# self.detected_bodies[id] = FullbodyDetector() for unique body ID
if single_body:
rospy.logwarn(
"hri_fullbody running in single body mode:"
+ " only one skeleton will be detected"
)
id = generate_id()
self.detected_bodies[id] = FullbodyDetector(use_depth,
stickman_debug,
id,
single_body)
rospy.loginfo("Generated single person detector for body_%s", id)
rospy.loginfo("Waiting for frames on topic %s", self.detected_bodies[id].get_image_topic())
else:
rospy.loginfo("Waiting for ids on /humans/bodies/tracked")
# Subscriber for the list of detected bodies
self.bodies_list_sub = rospy.Subscriber(
"/humans/bodies/tracked", IdsList, self.ids_list_cb, queue_size=1
)
diag_period = rospy.get_param("~diagnostic_period", 1)
self.diag_timer = rospy.Timer(rospy.Duration(diag_period), self.do_diagnostics)
self.diag_pub = rospy.Publisher("/diagnostics", DiagnosticArray, queue_size=1)
def ids_list_cb(self, msg):
for id in msg.ids:
if id not in self.detected_bodies:
self.detected_bodies[id] = FullbodyDetector(self.use_depth,
self.stickman_debug,
id)
rospy.loginfo("Generated single person detector for body_%s", id)
rospy.loginfo(
"Waiting for frames on topic %s",
self.detected_bodies[id].get_image_topic(),
)
for id in self.detected_bodies:
if not id in msg.ids:
self.detected_bodies[id].unregister()
self.detected_bodies.pop(id)
def do_diagnostics(self, event=None):
arr = DiagnosticArray()
arr.header.stamp = rospy.Time.now()
proc_time = sum(v.get_proc_time() for v in self.detected_bodies.values())
msg = DiagnosticStatus(name="Social perception: Body analysis: Skeleton extraction", hardware_id="none")
if any(v.check_timeout() for v in self.detected_bodies.values()):
msg.level = DiagnosticStatus.ERROR
msg.message = "Body detection process not responding"
elif proc_time > BODY_DETECTION_PROC_TIME_WARN:
msg.level = DiagnosticStatus.WARN
msg.message = "Body detection processing is slow"
else:
msg.level = DiagnosticStatus.OK
msg.values = [
KeyValue(key="Package name", value='hri_fullbody'),
KeyValue(key="Single body detector mode", value=str(self.single_body)),
KeyValue(key="Currently detected bodies",
value=str(sum([v.is_body_detected for v in self.detected_bodies.values()]))),
KeyValue(key="Last detected body ID", value=str(next(reversed(self.detected_bodies), ''))),
KeyValue(key="Detection processing time",
value="{:.2f}".format(proc_time * 1000) + "ms"),
]
arr.status = [msg]
self.diag_pub.publish(arr)
if __name__ == "__main__":
rospy.init_node("fullbody_manager", anonymous=True)
use_depth = rospy.get_param("~use_depth", False)
stickman_debug = rospy.get_param("~stickman_debug", False)
single_body = rospy.get_param("~single_body", True)
manager = None
rospy.loginfo("Using depth camera for body position estimation: %s", str(use_depth))
if single_body:
rospy.loginfo("Setting up for single body pose estimation")
else:
rospy.loginfo("Setting up for multibody pose estimation")
manager = MultibodyManager(
single_body,
use_depth,
stickman_debug)
rospy.spin()