Skip to content

Commit

Permalink
Implemented rate-control-topic and rate-control-max-delay. (#947)
Browse files Browse the repository at this point in the history
  • Loading branch information
mikepurvis committed Feb 10, 2017
1 parent 7368ef8 commit d4e607f
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 5 deletions.
10 changes: 10 additions & 0 deletions tools/rosbag/include/rosbag/player.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@

#include "rosbag/bag.h"

#include <topic_tools/shape_shifter.h>

#include "rosbag/time_translator.h"
#include "rosbag/macros.h"

Expand Down Expand Up @@ -90,6 +92,8 @@ struct ROSBAG_DECL PlayerOptions
float duration;
bool keep_alive;
bool wait_for_subscribers;
std::string rate_control_topic;
float rate_control_max_delay;
ros::Duration skip_empty;

std::vector<std::string> bags;
Expand Down Expand Up @@ -173,6 +177,8 @@ class ROSBAG_DECL Player
void setupTerminal();
void restoreTerminal();

void updateRateTopicTime(const ros::MessageEvent<topic_tools::ShapeShifter const>& msg_event);

void doPublish(rosbag::MessageInstance const& m);

void doKeepAlive();
Expand All @@ -195,13 +201,17 @@ class ROSBAG_DECL Player
ros::ServiceServer pause_service_;

bool paused_;
bool delayed_;

bool pause_for_topics_;

bool pause_change_requested_;

bool requested_pause_state_;

ros::Subscriber rate_control_sub_;
ros::Time last_rate_control_;

ros::WallTime paused_time_;

std::vector<boost::shared_ptr<Bag> > bags_;
Expand Down
13 changes: 11 additions & 2 deletions tools/rosbag/src/play.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ rosbag::PlayerOptions parseOptions(int argc, char** argv) {
("topics", po::value< std::vector<std::string> >()->multitoken(), "topics to play back")
("pause-topics", po::value< std::vector<std::string> >()->multitoken(), "topics to pause playback on")
("bags", po::value< std::vector<std::string> >(), "bag files to play back from")
("wait-for-subscribers", "wait for at least one subscriber on each topic before publishing");

("wait-for-subscribers", "wait for at least one subscriber on each topic before publishing")
("rate-control-topic", po::value<std::string>(), "watch the given topic, and if the last publish was more than <rate-control-max-delay> ago, wait until the topic publishes again to continue playback")
("rate-control-max-delay", po::value<float>()->default_value(1.0f), "maximum time difference from <rate-control-topic> before pausing")
;

po::positional_options_description p;
p.add("bags", -1);

Expand Down Expand Up @@ -140,6 +143,12 @@ rosbag::PlayerOptions parseOptions(int argc, char** argv) {
opts.pause_topics.push_back(*i);
}

if (vm.count("rate-control-topic"))
opts.rate_control_topic = vm["rate-control-topic"].as<std::string>();

if (vm.count("rate-control-max-delay"))
opts.rate_control_max_delay = vm["rate-control-max-delay"].as<float>();

if (vm.count("bags"))
{
std::vector<std::string> bags = vm["bags"].as< std::vector<std::string> >();
Expand Down
112 changes: 109 additions & 3 deletions tools/rosbag/src/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ PlayerOptions::PlayerOptions() :
duration(0.0f),
keep_alive(false),
wait_for_subscribers(false),
rate_control_topic(""),
rate_control_max_delay(1.0f),
skip_empty(ros::DURATION_MAX)
{
}
Expand Down Expand Up @@ -208,6 +210,26 @@ void Player::publish() {
}
}

if (options_.rate_control_topic != "")
{
std::cout << "Creating rate control topic subscriber..." << std::flush;

boost::shared_ptr<ros::Subscriber> sub(boost::make_shared<ros::Subscriber>());
ros::SubscribeOptions ops;
ops.topic = options_.rate_control_topic;
ops.queue_size = 10;
ops.md5sum = ros::message_traits::md5sum<topic_tools::ShapeShifter>();
ops.datatype = ros::message_traits::datatype<topic_tools::ShapeShifter>();
ops.helper = boost::make_shared<ros::SubscriptionCallbackHelperT<
const ros::MessageEvent<topic_tools::ShapeShifter const> &> >(
boost::bind(&Player::updateRateTopicTime, this, _1));

rate_control_sub_ = node_handle_.subscribe(ops);

std::cout << " done." << std::endl;
}


std::cout << "Waiting " << options_.advertise_sleep.toSec() << " seconds after advertising topics..." << std::flush;
options_.advertise_sleep.sleep();
std::cout << " done." << std::endl;
Expand All @@ -230,6 +252,9 @@ void Player::publish() {
time_translator_.setRealStartTime(start_time_);
bag_length_ = view.getEndTime() - view.getBeginTime();

// Set the last rate control to now, so the program doesn't start delayed.
last_rate_control_ = start_time_;

time_publisher_.setTime(start_time_);

ros::WallTime now_wt = ros::WallTime::now();
Expand Down Expand Up @@ -269,20 +294,62 @@ void Player::publish() {
ros::shutdown();
}

void Player::updateRateTopicTime(const ros::MessageEvent<topic_tools::ShapeShifter const>& msg_event)
{
boost::shared_ptr<topic_tools::ShapeShifter const> const &ssmsg = msg_event.getConstMessage();
std::string def = ssmsg->getMessageDefinition();
size_t length = ros::serialization::serializationLength(*ssmsg);

// Check the message definition.
std::istringstream f(def);
std::string s;
bool flag = false;
while(std::getline(f, s, '\n')) {
if (s.find("#") != 0) {
// Does not start with #, is not a comment.
if(s == "Header header") {
flag = true;
}
}
}
// If the header is not the first element in the message according to the definition, throw an error.
if (!flag) {
std::cout << std::endl << "WARNING: Rate control topic is bad, header is not first. MSG may be malformed." << std::endl;
return;
}

std::vector<uint8_t> buffer(length);
ros::serialization::OStream ostream(&buffer[0], length);
ros::serialization::Serializer<topic_tools::ShapeShifter>::write(ostream, *ssmsg);

// Assuming that the header is the first several bytes of the message.
//uint32_t header_sequence_id = buffer[0] | (uint32_t)buffer[1] << 8 | (uint32_t)buffer[2] << 16 | (uint32_t)buffer[3] << 24;
int32_t header_timestamp_sec = buffer[4] | (uint32_t)buffer[5] << 8 | (uint32_t)buffer[6] << 16 | (uint32_t)buffer[7] << 24;
int32_t header_timestamp_nsec = buffer[8] | (uint32_t)buffer[9] << 8 | (uint32_t)buffer[10] << 16 | (uint32_t)buffer[11] << 24;

last_rate_control_ = ros::Time(header_timestamp_sec, header_timestamp_nsec);
}

void Player::printTime()
{
if (!options_.quiet) {

ros::Time current_time = time_publisher_.getTime();
ros::Duration d = current_time - start_time_;


if (paused_)
{
printf("\r [PAUSED] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
printf("\r [PAUSED ] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
}
else if (delayed_)
{
ros::Duration time_since_rate = std::max(ros::Time::now() - last_rate_control_, ros::Duration(0));
printf("\r [DELAYED] Bag Time: %13.6f Duration: %.6f / %.6f Delay: %.2f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec(), time_since_rate.toSec());
}
else
{
printf("\r [RUNNING] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
printf("\r [RUNNING] Bag Time: %13.6f Duration: %.6f / %.6f \r", time_publisher_.getTime().toSec(), d.toSec(), bag_length_.toSec());
}
fflush(stdout);
}
Expand Down Expand Up @@ -317,6 +384,7 @@ void Player::processPause(const bool paused, ros::WallTime &horizon)
}
else
{
// Make sure time doesn't shift after leaving pause.
ros::WallDuration shift = ros::WallTime::now() - paused_time_;
paused_time_ = ros::WallTime::now();

Expand Down Expand Up @@ -357,6 +425,9 @@ void Player::doPublish(MessageInstance const& m) {
map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
ROS_ASSERT(pub_iter != publishers_.end());

// Update subscribers.
ros::spinOnce();

// If immediate specified, play immediately
if (options_.at_once) {
time_publisher_.stepClock();
Expand Down Expand Up @@ -393,7 +464,16 @@ void Player::doPublish(MessageInstance const& m) {
}
}

while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
// Check if the rate control topic has posted recently enough to continue, or if a delay is needed.
// Delayed is separated from paused to allow more verbose printing.
if (rate_control_sub_ != NULL) {
if ((time_publisher_.getTime() - last_rate_control_).toSec() > options_.rate_control_max_delay) {
delayed_ = true;
paused_time_ = ros::WallTime::now();
}
}

while ((paused_ || delayed_ || !time_publisher_.horizonReached()) && node_handle_.ok())
{
bool charsleftorpaused = true;
while (charsleftorpaused && node_handle_.ok())
Expand Down Expand Up @@ -436,6 +516,25 @@ void Player::doPublish(MessageInstance const& m) {
{
printTime();
time_publisher_.runStalledClock(ros::WallDuration(.1));
ros::spinOnce();
}
else if (delayed_)
{
printTime();
time_publisher_.runStalledClock(ros::WallDuration(.1));
ros::spinOnce();
// You need to check the rate here too.
if(rate_control_sub_ == NULL || (time_publisher_.getTime() - last_rate_control_).toSec() <= options_.rate_control_max_delay) {
delayed_ = false;
// Make sure time doesn't shift after leaving delay.
ros::WallDuration shift = ros::WallTime::now() - paused_time_;
paused_time_ = ros::WallTime::now();

time_translator_.shift(ros::Duration(shift.sec, shift.nsec));

horizon += shift;
time_publisher_.setWCHorizon(horizon);
}
}
else
charsleftorpaused = false;
Expand All @@ -444,6 +543,7 @@ void Player::doPublish(MessageInstance const& m) {

printTime();
time_publisher_.runClock(ros::WallDuration(.1));
ros::spinOnce();
}

pub_iter->second.publish(m);
Expand All @@ -464,6 +564,9 @@ void Player::doKeepAlive() {
return;
}

// If we're done and just staying alive, don't watch the rate control topic. We aren't publishing anyway.
delayed_ = false;

while ((paused_ || !time_publisher_.horizonReached()) && node_handle_.ok())
{
bool charsleftorpaused = true;
Expand All @@ -477,6 +580,7 @@ void Player::doKeepAlive() {
}
else
{
// Make sure time doesn't shift after leaving pause.
ros::WallDuration shift = ros::WallTime::now() - paused_time_;
paused_time_ = ros::WallTime::now();

Expand All @@ -491,6 +595,7 @@ void Player::doKeepAlive() {
{
printTime();
time_publisher_.runStalledClock(ros::WallDuration(.1));
ros::spinOnce();
}
else
charsleftorpaused = false;
Expand All @@ -499,6 +604,7 @@ void Player::doKeepAlive() {

printTime();
time_publisher_.runClock(ros::WallDuration(.1));
ros::spinOnce();
}
}

Expand Down

0 comments on commit d4e607f

Please sign in to comment.