Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve simulated communication delay #149

Merged
merged 16 commits into from
Mar 26, 2024
19 changes: 15 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,26 @@ project(urg_stamped)
set(CATKIN_DEPENDS
roscpp
sensor_msgs
std_msgs
)

## Find catkin and any catkin packages
find_package(catkin REQUIRED COMPONENTS ${CATKIN_DEPENDS})
find_package(Boost 1.53 REQUIRED system chrono thread)
find_package(catkin REQUIRED COMPONENTS
${CATKIN_DEPENDS}
message_generation
)

add_message_files(FILES Status.msg)

catkin_package(
CATKIN_DEPENDS ${CATKIN_DEPENDS}
generate_messages(DEPENDENCIES std_msgs)

catkin_package(CATKIN_DEPENDS
${CATKIN_DEPENDS}
message_runtime
)

find_package(Boost 1.53 REQUIRED system chrono thread)

add_compile_options(-std=c++11)
include_directories(${catkin_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS} include)

Expand All @@ -24,6 +34,7 @@ add_executable(urg_stamped
src/ros_logger.cpp
)
target_link_libraries(urg_stamped ${catkin_LIBRARIES} ${Boost_LIBRARIES})
add_dependencies(urg_stamped ${catkin_EXPORTED_TARGETS})

add_executable(urg_sim
src/urg_sim/encode.cpp
Expand Down
17 changes: 9 additions & 8 deletions include/urg_sim/urg_sim.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#ifndef URG_SIM_URG_SIM_H
#define URG_SIM_URG_SIM_H

#include <atomic>
#include <functional>
#include <list>
#include <map>
Expand Down Expand Up @@ -96,11 +97,8 @@ class URGSimulator
const URGSimulator::Params& params,
const RawScanDataCallback raw_scan_data_cb = nopRawScanDataCallback)
: params_(params)
, io_service_()
, acceptor_(io_service_, endpoint)
, socket_(io_service_)
, input_process_timer_(io_service_)
, output_process_timer_(io_service_)
, boot_timer_(io_service_)
, scan_timer_(io_service_)
, raw_scan_data_cb_(raw_scan_data_cb)
Expand Down Expand Up @@ -154,11 +152,11 @@ class URGSimulator
std::string model_name_;

boost::asio::io_service io_service_;
boost::asio::io_service input_fifo_;
boost::asio::io_service output_fifo_;
boost::asio::ip::tcp::acceptor acceptor_;
boost::asio::ip::tcp::socket socket_;
boost::asio::streambuf input_buf_;
boost::asio::deadline_timer input_process_timer_;
boost::asio::deadline_timer output_process_timer_;
boost::asio::deadline_timer boot_timer_;
boost::asio::deadline_timer scan_timer_;
std::mutex mu_; // Mutex for sensor_state_ and boot_cnt_ for access from CI thread
Expand All @@ -168,7 +166,7 @@ class URGSimulator
std::default_random_engine rand_engine_;
std::normal_distribution<double> comm_delay_distribution_;

bool killed_;
std::atomic<bool> killed_;
boost::posix_time::ptime timestamp_epoch_;
std::map<std::string, std::function<void(const std::string)>> handlers_;
SensorState sensor_state_;
Expand All @@ -190,7 +188,7 @@ class URGSimulator
void onRead(const boost::system::error_code& ec);
void processInput(
const std::string cmd,
const boost::system::error_code& ec);
const boost::posix_time::ptime& when);
void asyncRead();
void reset();
void reboot();
Expand All @@ -203,13 +201,16 @@ class URGSimulator
const std::string echo,
const std::string status,
const KeyValues kv);
void send(const std::string data);
void send(
const std::string data,
const boost::posix_time::ptime& when);
void accept();
void accepted(
const boost::system::error_code& ec);
void nextScan();
void scan();
void sendScan();
void fifo(boost::asio::io_service& fifo);

void handleII(const std::string cmd);
void handleVV(const std::string cmd);
Expand Down
2 changes: 2 additions & 0 deletions include/urg_stamped/urg_stamped.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class UrgStampedNode
ros::NodeHandle nh_;
ros::NodeHandle pnh_;
ros::Publisher pub_scan_;
ros::Publisher pub_status_;
ros::Timer timer_sync_;
ros::Timer timer_delay_estim_;
ros::Timer timer_retry_tm_;
Expand Down Expand Up @@ -189,6 +190,7 @@ class UrgStampedNode
void delayEstimation(const ros::TimerEvent& event = ros::TimerEvent());
void retryTM(const ros::TimerEvent& event = ros::TimerEvent());
void updateOrigin(const ros::Time& now, const ros::Time& origin, const ros::Time& time_at_device_timestamp);
void publishStatus();

void errorCountIncrement(const std::string& status = "");

Expand Down
6 changes: 6 additions & 0 deletions msg/Status.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
std_msgs/Header header

time sensor_clock_origin
float64 sensor_clock_gain

duration communication_delay
9 changes: 7 additions & 2 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@
<buildtool_depend>catkin</buildtool_depend>

<depend>roscpp</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>

<build_depend>message_generation</build_depend>
<build_export_depend>message_runtime</build_export_depend>
<exec_depend>message_runtime</exec_depend>

<test_depend>rostest</test_depend>
<test_depend>roslint</test_depend>
<test_depend>rosunit</test_depend>

<depend>sensor_msgs</depend>
</package>
63 changes: 46 additions & 17 deletions src/urg_sim/urg_sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -75,21 +76,15 @@
std::string line;
while (std::getline(stream, line))
{
input_process_timer_.expires_at(when);
input_process_timer_.async_wait(
boost::bind(
&URGSimulator::processInput,
this,
line,
boost::asio::placeholders::error));
input_fifo_.post(boost::bind(&URGSimulator::processInput, this, line, when));
}

asyncRead();
}

void URGSimulator::processInput(
const std::string cmd,
const boost::system::error_code& ec)
const boost::posix_time::ptime& when)
{
// Find handler from command string
const std::string op = cmd.substr(0, 2);
Expand All @@ -99,6 +94,10 @@
it_h->second :
std::bind(&URGSimulator::handleUnknown, this, std::placeholders::_1);

boost::asio::deadline_timer wait(io_service_);
wait.expires_at(when);
wait.wait();

h(cmd);
}

Expand Down Expand Up @@ -465,8 +464,8 @@
sensor_state_ = SensorState::BOOTING;
}

input_process_timer_.cancel();
output_process_timer_.cancel();
input_fifo_.stop();
output_fifo_.stop();
boot_timer_.cancel();
scan_timer_.cancel();

Expand Down Expand Up @@ -510,21 +509,18 @@
const std::string status,
const std::string data)
{
const auto now = boost::posix_time::microsec_clock::universal_time();
const double delay_sec = comm_delay_distribution_(rand_engine_);
const auto delay = boost::posix_time::microseconds(
static_cast<int64_t>(delay_sec * 1e6));
const auto when = now + delay;

const std::string text =
echo + "\n" +
encode::withChecksum(status) + "\n" +
data + "\n";

output_process_timer_.expires_from_now(delay);
output_process_timer_.async_wait(
boost::bind(
&URGSimulator::send,
this,
text));
output_fifo_.post(boost::bind(&URGSimulator::send, this, text, when));
}

void URGSimulator::responseKeyValues(
Expand All @@ -541,8 +537,14 @@
response(echo, status, ss.str());
}

void URGSimulator::send(const std::string data)
void URGSimulator::send(
const std::string data,
const boost::posix_time::ptime& when)
{
boost::asio::deadline_timer wait(io_service_);
wait.expires_at(when);
wait.wait();

boost::system::error_code ec;
boost::asio::write(socket_, boost::asio::buffer(data), ec);
if (ec)
Expand Down Expand Up @@ -740,16 +742,43 @@
{
reboot();

std::thread th_input_queue(
std::bind(&URGSimulator::fifo, this, std::ref(input_fifo_)));
std::thread th_output_queue(
std::bind(&URGSimulator::fifo, this, std::ref(output_fifo_)));

while (!killed_)
{
io_service_.run();
}

th_input_queue.join();
th_output_queue.join();
}

void URGSimulator::fifo(boost::asio::io_service& fifo)
{
std::function<void(const boost::system::error_code& ec)> keepalive;
keepalive = [&fifo, &keepalive](const boost::system::error_code& ec)
{
boost::asio::deadline_timer keepalive_timer(fifo);
keepalive_timer.expires_from_now(boost::posix_time::hours(1));
keepalive_timer.async_wait(keepalive);
};
while (!killed_)
{
keepalive(boost::system::error_code());

Check warning on line 770 in src/urg_sim/urg_sim.cpp

View check run for this annotation

Codecov / codecov/patch

src/urg_sim/urg_sim.cpp#L770

Added line #L770 was not covered by tests
fifo.run();
fifo.reset();
}
}

void URGSimulator::kill()
{
killed_ = true;
io_service_.stop();
input_fifo_.stop();
output_fifo_.stop();
}

void URGSimulator::setState(const SensorState s)
Expand Down
15 changes: 15 additions & 0 deletions src/urg_stamped.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#include <ros/console.h>
#include <ros/ros.h>

#include <sensor_msgs/LaserScan.h>
#include <urg_stamped/Status.h>

#include <boost/bind/bind.hpp>
#include <boost/format.hpp>
Expand Down Expand Up @@ -306,6 +308,7 @@ void UrgStampedNode::cbTM(
<< ")"
<< std::endl;
tm_success_ = true;
publishStatus();
break;
}
}
Expand Down Expand Up @@ -493,6 +496,7 @@ void UrgStampedNode::cbII(

const auto now = ros::Time::fromBoost(time_read);
updateOrigin(now, origin, time_at_device_timestamp);
publishStatus();

scip2::logger::debug()
<< "on scan delay: " << std::setprecision(6) << std::fixed << delay
Expand Down Expand Up @@ -788,6 +792,16 @@ bool UrgStampedNode::detectDeviceTimeJump(
return jumped;
}

void UrgStampedNode::publishStatus()
{
urg_stamped::Status msg;
msg.header.stamp = ros::Time::now();
msg.sensor_clock_origin = device_time_origin_.origin_;
msg.sensor_clock_gain = device_time_origin_.gain_;
msg.communication_delay = estimated_communication_delay_;
pub_status_.publish(msg);
}

UrgStampedNode::UrgStampedNode()
: nh_("")
, pnh_("~")
Expand Down Expand Up @@ -847,6 +861,7 @@ UrgStampedNode::UrgStampedNode()
}

pub_scan_ = nh_.advertise<sensor_msgs::LaserScan>("scan", 10);
pub_status_ = pnh_.advertise<urg_stamped::Status>("status", 1, true);

device_.reset(new scip2::ConnectionTcp(ip, port));
device_->registerCloseCallback(
Expand Down