Permalink
Browse files

[test] add ROS test

depth/video cameras and sick laserscan
sensor_msgs.msg PointCloud2, Image, LaserScan

You must patch ROS msg with:

    path -Np0 < patches/ros_python3.diff

see: ros/genpy#9
  • Loading branch information...
pierriko committed Dec 18, 2012
1 parent 535e9cf commit 916f093cd2d7c3db8a746399d40623b431476b74
View
@@ -0,0 +1,20 @@
+--- _Image.py 2012-12-19 10:35:23.539305258 +0100
++++ /opt/ros/fuerte/lib/python2.7/dist-packages/sensor_msgs/msg/_Image.py 2012-12-19 10:36:23.247006282 +0100
+@@ -188,6 +188,3 @@
+ end += length
+- if python3:
+- self.data = str[start:end].decode('utf-8')
+- else:
+- self.data = str[start:end]
++ self.data = str[start:end]
+ return self
+--- _PointCloud2.py 2012-12-19 10:35:48.686834915 +0100
++++ /opt/ros/fuerte/lib/python2.7/dist-packages/sensor_msgs/msg/_PointCloud2.py 2012-12-19 10:36:33.331056290 +0100
+@@ -233,6 +233,3 @@
+ end += length
+- if python3:
+- self.data = str[start:end].decode('utf-8')
+- else:
+- self.data = str[start:end]
++ self.data = str[start:end]
+ start = end
@@ -0,0 +1,138 @@
+#! /usr/bin/env python
+"""
+This script tests the Depth camera with ROS in MORSE.
+"""
+
+import sys
+import time
+import math
+from morse.testing.ros import RosTestCase
+from morse.testing.testing import testlogger
+
+import roslib
+roslib.load_manifest('rospy')
+roslib.load_manifest('nav_msgs')
+roslib.load_manifest('sensor_msgs')
+roslib.load_manifest('geometry_msgs')
+import rospy
+from nav_msgs.msg import Odometry
+from geometry_msgs.msg import Twist
+from sensor_msgs.msg import PointCloud2
+
+# Include this import to be able to use your test file as a regular
+# builder script, ie, usable with: 'morse [run|exec] base_testing.py
+try:
+ from morse.builder import *
+except ImportError:
+ pass
+
+class DepthCameraTest(RosTestCase):
+
+ def setUpEnv(self):
+ """ Defines the test scenario, using the Builder API.
+ """
+
+ robot = Robot('atrv')
+
+ motion = Actuator('v_omega')
+ robot.append(motion)
+ motion.configure_mw('ros')
+
+ odometry = Sensor('odometry')
+ odometry.name = 'Odometry'
+ odometry.translate(z=0.73)
+ robot.append(odometry)
+ odometry.configure_mw('ros')
+
+ camera = Sensor('depth_camera')
+ camera.name = 'CameraDepth'
+ camera.properties(cam_near=1.0, cam_far=20.0, Depth=True,\
+ cam_width=128, cam_height=128, capturing=True)
+ camera.translate(x=0.3, z=0.76)
+ camera.frequency(3)
+ robot.append(camera)
+ camera.configure_mw('ros')
+
+ env = Environment('indoors-1/boxes')
+ # No fastmode here, no MaterialIndex in WIREFRAME mode: AttributeError:
+ # 'KX_PolygonMaterial' object has no attribute 'getMaterialIndex'
+ env.configure_service('socket')
+
+ def send_speed(self, v, w, t):
+ msg = Twist()
+ msg.linear.x = v
+ msg.angular.z = w
+ self.cmd_stream.publish(msg)
+ time.sleep(t)
+ msg.linear.x = 0.0
+ msg.angular.z = 0.0
+ self.cmd_stream.publish(msg)
+
+ def sensor_callback(self, message):
+ self.sensor_message = message
+
+ def subscribe_and_wait_for_message(self, topic, topic_type, timeout=30):
+ if hasattr(self, 'sensor_sub'):
+ self.sensor_sub.unregister()
+ self.sensor_sub = rospy.Subscriber(topic, topic_type, self.sensor_callback)
+ return self.wait_for_message(timeout)
+
+ def wait_for_message(self, timeout=10):
+ self.sensor_message = None
+ timeout_t = time.time() + timeout
+ while not self.sensor_message and timeout_t > time.time():
+ time.sleep(.1)
+ return self.sensor_message
+
+ def init_sensor_test(self, topic, topic_type):
+ rospy.init_node('morse_testing', log_level=rospy.DEBUG)
+
+ testlogger.debug("subscribe and wait %s (%s)"%(topic, topic_type.__name__))
+
+ tmp = self.subscribe_and_wait_for_message(topic, topic_type)
+ if not tmp: # remove once https://github.com/ros/genpy/pull/9 is merged
+ testlogger.error("please patch ROS, see patches/ros_python3.diff")
+ self.assertTrue(tmp != None)
+
+ self.cmd_stream = rospy.Publisher('/ATRV/Motion_Controller', Twist)
+
+ testlogger.debug("init sensor OK %s"%topic)
+
+ return self.sensor_message
+
+ def cleanup_sensor_test(self):
+ self.sensor_sub.unregister()
+ testlogger.debug("cleanup sensor")
+
+ def test_depth_camera(self):
+ msg = self.init_sensor_test('/ATRV/Odometry', Odometry)
+ precision = 0.15 # we start at the origine
+ self.assertAlmostEqual(msg.pose.pose.position.x, 0.0, delta=precision)
+ self.assertAlmostEqual(msg.pose.pose.position.y, 0.0, delta=precision)
+ self.assertAlmostEqual(msg.pose.pose.position.z, 0.0, delta=precision)
+ # see http://ros.org/doc/api/nav_msgs/html/msg/Odometry.html
+
+ msg = self.init_sensor_test('/ATRV/CameraDepth', PointCloud2)
+
+ self.assertEqual(len(msg.data), 128*128*3*4) # H*W*XYZ*sizeof(float)
+
+ import struct
+ # assert that : near <= z <= far
+ for i in range(0, len(msg.data) - 12, 12):
+ xyz = struct.unpack('fff', msg.data[i:i+12])
+ self.assertTrue(xyz[2] >= 1 and xyz[2] <= 20)
+
+ self.send_speed(1.0, math.pi / 2.0, 2.0)
+ msg = self.wait_for_message()
+
+ # TODO: more test
+
+ self.cleanup_sensor_test()
+
+########################## Run these tests ##########################
+if __name__ == "__main__":
+ import unittest
+ from morse.testing.testing import MorseTestRunner
+ suite = unittest.TestLoader().loadTestsFromTestCase(DepthCameraTest)
+ sys.exit(not MorseTestRunner().run(suite).wasSuccessful())
+
@@ -0,0 +1,129 @@
+#! /usr/bin/env python
+"""
+This script tests the Sick laser scanner with ROS in MORSE.
+"""
+
+import sys
+import time
+import math
+from morse.testing.ros import RosTestCase
+from morse.testing.testing import testlogger
+
+import roslib
+roslib.load_manifest('rospy')
+roslib.load_manifest('nav_msgs')
+roslib.load_manifest('sensor_msgs')
+roslib.load_manifest('geometry_msgs')
+import rospy
+from nav_msgs.msg import Odometry
+from geometry_msgs.msg import Twist
+from sensor_msgs.msg import LaserScan
+
+# Include this import to be able to use your test file as a regular
+# builder script, ie, usable with: 'morse [run|exec] base_testing.py
+try:
+ from morse.builder import *
+except ImportError:
+ pass
+
+class SickLaserTest(RosTestCase):
+
+ def setUpEnv(self):
+ """ Defines the test scenario, using the Builder API.
+ """
+
+ robot = Robot('atrv')
+
+ motion = Actuator('v_omega')
+ robot.append(motion)
+ motion.configure_mw('ros')
+
+ odometry = Sensor('odometry')
+ odometry.name = 'Odometry'
+ odometry.translate(z=0.73)
+ robot.append(odometry)
+ odometry.configure_mw('ros')
+
+ sick = Sensor('sick')
+ sick.name = 'Sick'
+ sick.translate(x = 0.18, z = 0.94)
+ robot.append(sick)
+ # sick.properties(scan_window = 270, resolution = .25)
+ sick.properties(scan_window = 180, resolution = 1)
+ sick.configure_mw('ros')
+
+ env = Environment('indoors-1/boxes', fastmode=True)
+ env.configure_service('socket')
+
+ def send_speed(self, v, w, t):
+ msg = Twist()
+ msg.linear.x = v
+ msg.angular.z = w
+ self.cmd_stream.publish(msg)
+ time.sleep(t)
+ msg.linear.x = 0.0
+ msg.angular.z = 0.0
+ self.cmd_stream.publish(msg)
+
+ def sensor_callback(self, message):
+ self.sensor_message = message
+
+ def subscribe_and_wait_for_message(self, topic, topic_type, timeout=30):
+ if hasattr(self, 'sensor_sub'):
+ self.sensor_sub.unregister()
+ self.sensor_sub = rospy.Subscriber(topic, topic_type, self.sensor_callback)
+ return self.wait_for_message(timeout)
+
+ def wait_for_message(self, timeout=10):
+ self.sensor_message = None
+ timeout_t = time.time() + timeout
+ while not self.sensor_message and timeout_t > time.time():
+ time.sleep(.1)
+ return self.sensor_message
+
+ def init_sensor_test(self, topic, topic_type):
+ rospy.init_node('morse_testing', log_level=rospy.DEBUG)
+
+ testlogger.debug("subscribe and wait %s (%s)"%(topic, topic_type.__name__))
+ self.assertTrue(self.subscribe_and_wait_for_message(topic, topic_type) != None)
+
+ self.cmd_stream = rospy.Publisher('/ATRV/Motion_Controller', Twist)
+
+ testlogger.debug("init sensor OK %s"%topic)
+
+ return self.sensor_message
+
+ def cleanup_sensor_test(self):
+ self.sensor_sub.unregister()
+ testlogger.debug("cleanup sensor")
+
+ def test_sick_laser(self):
+ msg = self.init_sensor_test('/ATRV/Odometry', Odometry)
+ precision = 0.15 # we start at the origine
+ self.assertAlmostEqual(msg.pose.pose.position.x, 0.0, delta=precision)
+ self.assertAlmostEqual(msg.pose.pose.position.y, 0.0, delta=precision)
+ self.assertAlmostEqual(msg.pose.pose.position.z, 0.0, delta=precision)
+ # see http://ros.org/doc/api/nav_msgs/html/msg/Odometry.html
+
+ msg = self.init_sensor_test('/ATRV/Sick', LaserScan)
+
+ self.assertEqual(len(msg.ranges), 181)
+
+ # assert that : near <= distance <= far
+ for distance in msg.ranges:
+ self.assertTrue(distance >= 0.1 and distance <= 30)
+
+ self.send_speed(1.0, math.pi / 2.0, 2.0)
+ msg = self.wait_for_message()
+
+ # TODO: more test
+
+ self.cleanup_sensor_test()
+
+########################## Run these tests ##########################
+if __name__ == "__main__":
+ import unittest
+ from morse.testing.testing import MorseTestRunner
+ suite = unittest.TestLoader().loadTestsFromTestCase(SickLaserTest)
+ sys.exit(not MorseTestRunner().run(suite).wasSuccessful())
+
Oops, something went wrong.

0 comments on commit 916f093

Please sign in to comment.