diff --git a/CMakeLists.txt b/CMakeLists.txt index a17bca76..ead6d0a0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,16 +4,26 @@ project(urg_stamped) set(CATKIN_DEPENDS roscpp sensor_msgs + std_msgs ) ## Find catkin and any catkin packages -find_package(catkin REQUIRED COMPONENTS ${CATKIN_DEPENDS}) -find_package(Boost 1.53 REQUIRED system chrono thread) +find_package(catkin REQUIRED COMPONENTS + ${CATKIN_DEPENDS} + message_generation +) + +add_message_files(FILES Status.msg) -catkin_package( - CATKIN_DEPENDS ${CATKIN_DEPENDS} +generate_messages(DEPENDENCIES std_msgs) + +catkin_package(CATKIN_DEPENDS + ${CATKIN_DEPENDS} + message_runtime ) +find_package(Boost 1.53 REQUIRED system chrono thread) + add_compile_options(-std=c++11) include_directories(${catkin_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS} include) @@ -24,6 +34,7 @@ add_executable(urg_stamped src/ros_logger.cpp ) target_link_libraries(urg_stamped ${catkin_LIBRARIES} ${Boost_LIBRARIES}) +add_dependencies(urg_stamped ${catkin_EXPORTED_TARGETS}) add_executable(urg_sim src/urg_sim/encode.cpp diff --git a/include/urg_sim/urg_sim.h b/include/urg_sim/urg_sim.h index 36c62795..7e5f7fef 100644 --- a/include/urg_sim/urg_sim.h +++ b/include/urg_sim/urg_sim.h @@ -17,6 +17,7 @@ #ifndef URG_SIM_URG_SIM_H #define URG_SIM_URG_SIM_H +#include #include #include #include @@ -96,11 +97,8 @@ class URGSimulator const URGSimulator::Params& params, const RawScanDataCallback raw_scan_data_cb = nopRawScanDataCallback) : params_(params) - , io_service_() , acceptor_(io_service_, endpoint) , socket_(io_service_) - , input_process_timer_(io_service_) - , output_process_timer_(io_service_) , boot_timer_(io_service_) , scan_timer_(io_service_) , raw_scan_data_cb_(raw_scan_data_cb) @@ -154,11 +152,11 @@ class URGSimulator std::string model_name_; boost::asio::io_service io_service_; + boost::asio::io_service input_fifo_; + boost::asio::io_service output_fifo_; boost::asio::ip::tcp::acceptor acceptor_; boost::asio::ip::tcp::socket socket_; boost::asio::streambuf input_buf_; - boost::asio::deadline_timer input_process_timer_; - boost::asio::deadline_timer output_process_timer_; boost::asio::deadline_timer boot_timer_; boost::asio::deadline_timer scan_timer_; std::mutex mu_; // Mutex for sensor_state_ and boot_cnt_ for access from CI thread @@ -168,7 +166,7 @@ class URGSimulator std::default_random_engine rand_engine_; std::normal_distribution comm_delay_distribution_; - bool killed_; + std::atomic killed_; boost::posix_time::ptime timestamp_epoch_; std::map> handlers_; SensorState sensor_state_; @@ -190,7 +188,7 @@ class URGSimulator void onRead(const boost::system::error_code& ec); void processInput( const std::string cmd, - const boost::system::error_code& ec); + const boost::posix_time::ptime& when); void asyncRead(); void reset(); void reboot(); @@ -203,13 +201,16 @@ class URGSimulator const std::string echo, const std::string status, const KeyValues kv); - void send(const std::string data); + void send( + const std::string data, + const boost::posix_time::ptime& when); void accept(); void accepted( const boost::system::error_code& ec); void nextScan(); void scan(); void sendScan(); + void fifo(boost::asio::io_service& fifo); void handleII(const std::string cmd); void handleVV(const std::string cmd); diff --git a/include/urg_stamped/urg_stamped.h b/include/urg_stamped/urg_stamped.h index 602a450c..0f584c76 100644 --- a/include/urg_stamped/urg_stamped.h +++ b/include/urg_stamped/urg_stamped.h @@ -66,6 +66,7 @@ class UrgStampedNode ros::NodeHandle nh_; ros::NodeHandle pnh_; ros::Publisher pub_scan_; + ros::Publisher pub_status_; ros::Timer timer_sync_; ros::Timer timer_delay_estim_; ros::Timer timer_retry_tm_; @@ -189,6 +190,7 @@ class UrgStampedNode void delayEstimation(const ros::TimerEvent& event = ros::TimerEvent()); void retryTM(const ros::TimerEvent& event = ros::TimerEvent()); void updateOrigin(const ros::Time& now, const ros::Time& origin, const ros::Time& time_at_device_timestamp); + void publishStatus(); void errorCountIncrement(const std::string& status = ""); diff --git a/msg/Status.msg b/msg/Status.msg new file mode 100644 index 00000000..33d32e65 --- /dev/null +++ b/msg/Status.msg @@ -0,0 +1,6 @@ +std_msgs/Header header + +time sensor_clock_origin +float64 sensor_clock_gain + +duration communication_delay diff --git a/package.xml b/package.xml index 61a99b85..2879ea9e 100644 --- a/package.xml +++ b/package.xml @@ -11,9 +11,14 @@ catkin roscpp + sensor_msgs + std_msgs + + message_generation + message_runtime + message_runtime + rostest roslint rosunit - - sensor_msgs diff --git a/src/urg_sim/urg_sim.cpp b/src/urg_sim/urg_sim.cpp index 8e50b7b2..6ca396f9 100644 --- a/src/urg_sim/urg_sim.cpp +++ b/src/urg_sim/urg_sim.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -75,13 +76,7 @@ void URGSimulator::onRead(const boost::system::error_code& ec) std::string line; while (std::getline(stream, line)) { - input_process_timer_.expires_at(when); - input_process_timer_.async_wait( - boost::bind( - &URGSimulator::processInput, - this, - line, - boost::asio::placeholders::error)); + input_fifo_.post(boost::bind(&URGSimulator::processInput, this, line, when)); } asyncRead(); @@ -89,7 +84,7 @@ void URGSimulator::onRead(const boost::system::error_code& ec) void URGSimulator::processInput( const std::string cmd, - const boost::system::error_code& ec) + const boost::posix_time::ptime& when) { // Find handler from command string const std::string op = cmd.substr(0, 2); @@ -99,6 +94,10 @@ void URGSimulator::processInput( it_h->second : std::bind(&URGSimulator::handleUnknown, this, std::placeholders::_1); + boost::asio::deadline_timer wait(io_service_); + wait.expires_at(when); + wait.wait(); + h(cmd); } @@ -465,8 +464,8 @@ void URGSimulator::reboot() sensor_state_ = SensorState::BOOTING; } - input_process_timer_.cancel(); - output_process_timer_.cancel(); + input_fifo_.stop(); + output_fifo_.stop(); boot_timer_.cancel(); scan_timer_.cancel(); @@ -510,21 +509,18 @@ void URGSimulator::response( const std::string status, const std::string data) { + const auto now = boost::posix_time::microsec_clock::universal_time(); const double delay_sec = comm_delay_distribution_(rand_engine_); const auto delay = boost::posix_time::microseconds( static_cast(delay_sec * 1e6)); + const auto when = now + delay; const std::string text = echo + "\n" + encode::withChecksum(status) + "\n" + data + "\n"; - output_process_timer_.expires_from_now(delay); - output_process_timer_.async_wait( - boost::bind( - &URGSimulator::send, - this, - text)); + output_fifo_.post(boost::bind(&URGSimulator::send, this, text, when)); } void URGSimulator::responseKeyValues( @@ -541,8 +537,14 @@ void URGSimulator::responseKeyValues( response(echo, status, ss.str()); } -void URGSimulator::send(const std::string data) +void URGSimulator::send( + const std::string data, + const boost::posix_time::ptime& when) { + boost::asio::deadline_timer wait(io_service_); + wait.expires_at(when); + wait.wait(); + boost::system::error_code ec; boost::asio::write(socket_, boost::asio::buffer(data), ec); if (ec) @@ -740,16 +742,43 @@ void URGSimulator::spin() { reboot(); + std::thread th_input_queue( + std::bind(&URGSimulator::fifo, this, std::ref(input_fifo_))); + std::thread th_output_queue( + std::bind(&URGSimulator::fifo, this, std::ref(output_fifo_))); + while (!killed_) { io_service_.run(); } + + th_input_queue.join(); + th_output_queue.join(); +} + +void URGSimulator::fifo(boost::asio::io_service& fifo) +{ + std::function keepalive; + keepalive = [&fifo, &keepalive](const boost::system::error_code& ec) + { + boost::asio::deadline_timer keepalive_timer(fifo); + keepalive_timer.expires_from_now(boost::posix_time::hours(1)); + keepalive_timer.async_wait(keepalive); + }; + while (!killed_) + { + keepalive(boost::system::error_code()); + fifo.run(); + fifo.reset(); + } } void URGSimulator::kill() { killed_ = true; io_service_.stop(); + input_fifo_.stop(); + output_fifo_.stop(); } void URGSimulator::setState(const SensorState s) diff --git a/src/urg_stamped.cpp b/src/urg_stamped.cpp index 864f0135..5fdea61a 100644 --- a/src/urg_stamped.cpp +++ b/src/urg_stamped.cpp @@ -16,7 +16,9 @@ #include #include + #include +#include #include #include @@ -306,6 +308,7 @@ void UrgStampedNode::cbTM( << ")" << std::endl; tm_success_ = true; + publishStatus(); break; } } @@ -493,6 +496,7 @@ void UrgStampedNode::cbII( const auto now = ros::Time::fromBoost(time_read); updateOrigin(now, origin, time_at_device_timestamp); + publishStatus(); scip2::logger::debug() << "on scan delay: " << std::setprecision(6) << std::fixed << delay @@ -788,6 +792,16 @@ bool UrgStampedNode::detectDeviceTimeJump( return jumped; } +void UrgStampedNode::publishStatus() +{ + urg_stamped::Status msg; + msg.header.stamp = ros::Time::now(); + msg.sensor_clock_origin = device_time_origin_.origin_; + msg.sensor_clock_gain = device_time_origin_.gain_; + msg.communication_delay = estimated_communication_delay_; + pub_status_.publish(msg); +} + UrgStampedNode::UrgStampedNode() : nh_("") , pnh_("~") @@ -847,6 +861,7 @@ UrgStampedNode::UrgStampedNode() } pub_scan_ = nh_.advertise("scan", 10); + pub_status_ = pnh_.advertise("status", 1, true); device_.reset(new scip2::ConnectionTcp(ip, port)); device_->registerCloseCallback( diff --git a/test/src/e2e.cpp b/test/src/e2e.cpp index d4de0a26..abdab826 100644 --- a/test/src/e2e.cpp +++ b/test/src/e2e.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -84,10 +85,12 @@ class E2E : public ::testing::Test ros::NodeHandle nh_; ros::NodeHandle pnh_; ros::Subscriber sub_scan_; + ros::Subscriber sub_status_; urg_sim::URGSimulator* sim_; std::thread th_sim_; bool sim_killed_; + urg_stamped::Status::ConstPtr status_msg_; int cnt_; @@ -100,6 +103,11 @@ class E2E : public ::testing::Test scans_.push_back(msg); } + void cbStatus(const urg_stamped::Status ::ConstPtr& msg) + { + status_msg_ = msg; + } + void cbRawScanData(const urg_sim::RawScanData::Ptr data) { data->ranges[0] = cnt_; @@ -147,6 +155,7 @@ class E2E : public ::testing::Test ros::Duration(1).sleep(); // Wait node respawn sub_scan_ = nh_.subscribe("scan", 100, &E2E::cbScan, this); + sub_status_ = nh_.subscribe("/urg_stamped/status", 10, &E2E::cbStatus, this); } }; @@ -191,7 +200,8 @@ INSTANTIATE_TEST_CASE_P( TEST_P(E2EWithParam, Simple) { - ASSERT_NO_FATAL_FAILURE(startSimulator(GetParam())); + const auto param = GetParam(); + ASSERT_NO_FATAL_FAILURE(startSimulator(param)); // Make time sync happens frequently pnh_.setParam("/urg_stamped/sync_interval_min", 0.1); @@ -208,6 +218,11 @@ TEST_P(E2EWithParam, Simple) ASSERT_NE(true_stamps_.find(index), true_stamps_.end()) << "Can not find corresponding ground truth timestamp"; EXPECT_LT(std::abs((true_stamps_[index] - scans_[i]->header.stamp).toSec()), 0.0015) << "scan " << i; } + + ASSERT_TRUE(static_cast(status_msg_)); + ASSERT_NEAR(status_msg_->sensor_clock_gain, 1.0, 1e-3); + ASSERT_GT(status_msg_->communication_delay.toSec(), param.comm_delay_base * 2 - 1e-4); + ASSERT_LT(status_msg_->communication_delay.toSec(), param.comm_delay_base * 2 + 5e-4); } TEST_F(E2E, RebootOnError)