Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented rate-control-topic and rate-control-max-delay. #947

Merged
merged 11 commits into from
Feb 14, 2017
10 changes: 10 additions & 0 deletions tools/rosbag/include/rosbag/player.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@

#include "rosbag/bag.h"

#include <topic_tools/shape_shifter.h>

#include "rosbag/time_translator.h"
#include "rosbag/macros.h"

Expand Down Expand Up @@ -90,6 +92,8 @@ struct ROSBAG_DECL PlayerOptions
float duration;
bool keep_alive;
bool wait_for_subscribers;
std::string rate_control_topic;
float rate_control_max_delay;
ros::Duration skip_empty;

std::vector<std::string> bags;
Expand Down Expand Up @@ -173,6 +177,8 @@ class ROSBAG_DECL Player
void setupTerminal();
void restoreTerminal();

void updateRateTopicTime(const ros::MessageEvent<topic_tools::ShapeShifter const>& msg_event);

void doPublish(rosbag::MessageInstance const& m);

void doKeepAlive();
Expand All @@ -195,13 +201,17 @@ class ROSBAG_DECL Player
ros::ServiceServer pause_service_;

bool paused_;
bool delayed_;

bool pause_for_topics_;

bool pause_change_requested_;

bool requested_pause_state_;

ros::Subscriber rate_control_sub_;
ros::Time last_rate_control_;

ros::WallTime paused_time_;

std::vector<boost::shared_ptr<Bag> > bags_;
Expand Down
13 changes: 11 additions & 2 deletions tools/rosbag/src/play.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ rosbag::PlayerOptions parseOptions(int argc, char** argv) {
("topics", po::value< std::vector<std::string> >()->multitoken(), "topics to play back")
("pause-topics", po::value< std::vector<std::string> >()->multitoken(), "topics to pause playback on")
("bags", po::value< std::vector<std::string> >(), "bag files to play back from")
("wait-for-subscribers", "wait for at least one subscriber on each topic before publishing");

("wait-for-subscribers", "wait for at least one subscriber on each topic before publishing")
("rate-control-topic", po::value<std::string>(), "watch the given topic, and if the last publish was more than <rate-control-max-delay> ago, wait until the topic publishes again to continue playback")
("rate-control-max-delay", po::value<float>()->default_value(1.0f), "maximum time difference from <rate-control-topic> before pausing")
;

po::positional_options_description p;
p.add("bags", -1);

Expand Down Expand Up @@ -140,6 +143,12 @@ rosbag::PlayerOptions parseOptions(int argc, char** argv) {
opts.pause_topics.push_back(*i);
}

if (vm.count("rate-control-topic"))
opts.rate_control_topic = vm["rate-control-topic"].as<std::string>();

if (vm.count("rate-control-max-delay"))
opts.rate_control_max_delay = vm["rate-control-max-delay"].as<float>();

if (vm.count("bags"))
{
std::vector<std::string> bags = vm["bags"].as< std::vector<std::string> >();
Expand Down
112 changes: 109 additions & 3 deletions tools/rosbag/src/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ PlayerOptions::PlayerOptions() :
duration(0.0f),
keep_alive(false),
wait_for_subscribers(false),
rate_control_topic(""),
rate_control_max_delay(1.0f),
skip_empty(ros::DURATION_MAX)
{
}
Expand Down Expand Up @@ -208,6 +210,26 @@ void Player::publish() {
}
}

if (options_.rate_control_topic != "")
{
std::cout << "Creating rate control topic subscriber..." << std::flush;

boost::shared_ptr<ros::Subscriber> sub(boost::make_shared<ros::Subscriber>());
ros::SubscribeOptions ops;
ops.topic = options_.rate_control_topic;
ops.queue_size = 10;
ops.md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
ops.datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
ops.helper = boost::make_shared<ros::SubscriptionCallbackHelperT<
const ros::MessageEvent<topic_tools::ShapeShifter const> &> >(
boost::bind(&Player::updateRateTopicTime, this, _1));

rate_control_sub_ = node_handle_.subscribe(ops);

std::cout << " done." << std::endl;
}


std::cout << "Waiting " << options_.advertise_sleep.toSec() << " seconds after advertising topics..." << std::flush;
options_.advertise_sleep.sleep();
std::cout << " done." << std::endl;
Expand All @@ -230,6 +252,9 @@ void Player::publish() {
time_translator_.setRealStartTime(start_time_);
bag_length_ = view.getEndTime() - view.getBeginTime();

// Set the last rate control to now, so the program doesn't start delayed.
last_rate_control_ = start_time_;

time_publisher_.setTime(start_time_);

ros::WallTime now_wt = ros::WallTime::now();
Expand Down Expand Up @@ -269,20 +294,62 @@ void Player::publish() {
ros::shutdown();
}

void Player::updateRateTopicTime(const ros::MessageEvent<topic_tools::ShapeShifter const>& msg_event)
{
boost::shared_ptr<topic_tools::ShapeShifter const> const &ssmsg = msg_event.getConstMessage();
std::string def = ssmsg->getMessageDefinition();
size_t length = ros::serialization::serializationLength(*ssmsg);

// Check the message definition.
std::istringstream f(def);
std::string s;
bool flag = false;
while(std::getline(f, s, '\n')) {
if (s.find("#") != 0) {
// Does not start with #, is not a comment.
if(s == "Header header") {
flag = true;
}
}
}
// If the header is not the first element in the message according to the definition, throw an error.
if (!flag) {
std::cout << std::endl << "WARNING: Rate control topic is bad, header is not first. MSG may be malformed." << std::endl;
return;
}

std::vector<uint8_t> buffer(length);
ros::serialization::OStream ostream(&buffer[0], length);
ros::serialization::Serializer<topic_tools::ShapeShifter>::write(ostream, *ssmsg);

// Assuming that the header is the first several bytes of the message.
//uint32_t header_sequence_id = buffer[0] | (uint32_t)buffer[1] << 8 | (uint32_t)buffer[2] << 16 | (uint32_t)buffer[3] << 24;
int32_t header_timestamp_sec = buffer[4] | (uint32_t)buffer[5] << 8 | (uint32_t)buffer[6] << 16 | (uint32_t)buffer[7] << 24;
int32_t header_timestamp_nsec = buffer[8] | (uint32_t)buffer[9] << 8 | (uint32_t)buffer[10] << 16 | (uint32_t)buffer[11] << 24;

last_rate_control_ = ros::Time(header_timestamp_sec, header_timestamp_nsec);
}

void Player::printTime()
{
if (!options_.quiet) {

ros::Time current_time = time_publisher_.getTime();
ros::Duration d = current_time - start_time_;


if (paused_)
{
printf("\r [PAUSED] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
printf("\r [PAUSED ] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
}
else if (delayed_)
{
ros::Duration time_since_rate = std::max(ros::Time::now() - last_rate_control_, ros::Duration(0));
printf("\r [DELAYED] Bag Time: %13.6f Duration: %.6f / %.6f Delay: %.2f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec(), time_since_rate.toSec());
}
else
{
printf("\r [RUNNING] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
printf("\r [RUNNING] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
}
fflush(stdout);
}
Expand Down Expand Up @@ -317,6 +384,7 @@ void Player::processPause(const bool paused, ros::WallTime &horizon)
}
else
{
// Make sure time doesn't shift after leaving pause.
ros::WallDuration shift = ros::WallTime::now() - paused_time_;
paused_time_ = ros::WallTime::now();

Expand Down Expand Up @@ -357,6 +425,9 @@ void Player::doPublish(MessageInstance const& m) {
map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
ROS_ASSERT(pub_iter != publishers_.end());

// Update subscribers.
ros::spinOnce();

// If immediate specified, play immediately
if (options_.at_once) {
time_publisher_.stepClock();
Expand Down Expand Up @@ -393,7 +464,16 @@ void Player::doPublish(MessageInstance const& m) {
}
}

while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
// Check if the rate control topic has posted recently enough to continue, or if a delay is needed.
// Delayed is separated from paused to allow more verbose printing.
if (rate_control_sub_ != NULL) {
if ((time_publisher_.getTime() - last_rate_control_).toSec() > options_.rate_control_max_delay) {
delayed_ = true;
paused_time_ = ros::WallTime::now();
}
}

while ((paused_ || delayed_ || !time_publisher_.horizonReached()) && node_handle_.ok())
{
bool charsleftorpaused = true;
while (charsleftorpaused && node_handle_.ok())
Expand Down Expand Up @@ -436,6 +516,25 @@ void Player::doPublish(MessageInstance const& m) {
{
printTime();
time_publisher_.runStalledClock(ros::WallDuration(.1));
ros::spinOnce();
}
else if (delayed_)
{
printTime();
time_publisher_.runStalledClock(ros::WallDuration(.1));
ros::spinOnce();
// You need to check the rate here too.
if(rate_control_sub_ == NULL || (time_publisher_.getTime() - last_rate_control_).toSec() <= options_.rate_control_max_delay) {
delayed_ = false;
// Make sure time doesn't shift after leaving delay.
ros::WallDuration shift = ros::WallTime::now() - paused_time_;
paused_time_ = ros::WallTime::now();

time_translator_.shift(ros::Duration(shift.sec, shift.nsec));

horizon += shift;
time_publisher_.setWCHorizon(horizon);
}
}
else
charsleftorpaused = false;
Expand All @@ -444,6 +543,7 @@ void Player::doPublish(MessageInstance const& m) {

printTime();
time_publisher_.runClock(ros::WallDuration(.1));
ros::spinOnce();
}

pub_iter->second.publish(m);
Expand All @@ -464,6 +564,9 @@ void Player::doKeepAlive() {
return;
}

// If we're done and just staying alive, don't watch the rate control topic. We aren't publishing anyway.
delayed_ = false;

while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
{
bool charsleftorpaused = true;
Expand All @@ -477,6 +580,7 @@ void Player::doKeepAlive() {
}
else
{
// Make sure time doesn't shift after leaving pause.
ros::WallDuration shift = ros::WallTime::now() - paused_time_;
paused_time_ = ros::WallTime::now();

Expand All @@ -491,6 +595,7 @@ void Player::doKeepAlive() {
{
printTime();
time_publisher_.runStalledClock(ros::WallDuration(.1));
ros::spinOnce();
}
else
charsleftorpaused = false;
Expand All @@ -499,6 +604,7 @@ void Player::doKeepAlive() {

printTime();
time_publisher_.runClock(ros::WallDuration(.1));
ros::spinOnce();
}
}

Expand Down