Skip to content

Commit

Permalink
Do not recreate publisher/subscriber/timers if not needed on reconfig…
Browse files Browse the repository at this point in the history
…uration. As an extra, create another timer for cmd_vel messages from any source, so we can dislodge last active source if it gets stuck without further messages
  • Loading branch information
Jorge Santos authored and Alexander Reimann committed Jan 26, 2017
1 parent c5c87a4 commit bf003ad
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 63 deletions.
2 changes: 2 additions & 0 deletions yocs_cmd_vel_mux/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ find_package(catkin REQUIRED COMPONENTS roscpp pluginlib nodelet dynamic_reconfi
find_package(PkgConfig)
pkg_search_module(yaml-cpp REQUIRED yaml-cpp)

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")

if(NOT ${yaml-cpp_VERSION} VERSION_LESS "0.5")
add_definitions(-DHAVE_NEW_YAMLCPP)
endif()
Expand Down
13 changes: 10 additions & 3 deletions yocs_cmd_vel_mux/include/yocs_cmd_vel_mux/cmd_vel_mux_nodelet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class CmdVelMuxNodelet : public nodelet::Nodelet

CmdVelMuxNodelet()
{
cmd_vel_subs.allowed = VACANT;
dynamic_reconfigure_server = NULL;
}

Expand All @@ -51,9 +52,15 @@ class CmdVelMuxNodelet : public nodelet::Nodelet
}

private:
CmdVelSubscribers cmd_vel_sub; /**< Pool of cmd_vel topics subscribers */
ros::Publisher mux_cmd_vel_pub; /**< Multiplexed command velocity topic */
ros::Publisher active_subscriber; /**< Currently allowed cmd_vel subscriber */
static const unsigned int VACANT = 666666; /**< ID for "nobody" active input; anything big is ok */
static const unsigned int GLOBAL_TIMER = 888888; /**< ID for the global timer functor; anything big is ok */

CmdVelSubscribers cmd_vel_subs; /**< Pool of cmd_vel topics subscribers */
ros::Publisher output_topic_pub; /**< Multiplexed command velocity topic */
std::string output_topic_name; /**< Multiplexed command velocity topic name */
ros::Publisher active_subscriber; /**< Currently allowed cmd_vel subscriber */
ros::Timer common_timer; /**< No messages from any subscriber timeout */
double common_timer_period; /**< No messages from any subscriber timeout period */

void timerCallback(const ros::TimerEvent& event, unsigned int idx);
void cmdVelCallback(const geometry_msgs::Twist::ConstPtr& msg, unsigned int idx);
Expand Down
25 changes: 12 additions & 13 deletions yocs_cmd_vel_mux/include/yocs_cmd_vel_mux/cmd_vel_subscribers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <ros/ros.h>
#include <geometry_msgs/Twist.h>

#include <yaml-cpp/yaml.h>

#ifdef HAVE_NEW_YAMLCPP
Expand All @@ -31,19 +32,14 @@ void operator >> (const YAML::Node& node, T& i)
}
#endif

/*****************************************************************************
** Preprocessing
*****************************************************************************/

// move to a static const?
#define VACANT std::numeric_limits<unsigned int>::max()

/*****************************************************************************
** Namespaces
*****************************************************************************/

namespace yocs_cmd_vel_mux {


/*****************************************************************************
** CmdVelSubscribers
*****************************************************************************/
Expand All @@ -62,25 +58,27 @@ class CmdVelSubscribers
{
public:
unsigned int idx; /**< Index; assigned according to the order on YAML file */
std::string name; /**< Descriptive name */
ros::Subscriber subs; /**< The subscriber itself */
std::string name; /**< Descriptive name; must be unique to this subscriber */
std::string topic; /**< The name of the topic */
ros::Subscriber subs; /**< The subscriber itself */
ros::Timer timer; /**< No incoming messages timeout */
double timeout; /**< Timer's timeout, in seconds */
unsigned int priority; /**< UNIQUE integer from 0 (lowest priority) to MAX_INT */
std::string short_desc; /**< Short description (optional) */
bool active; /**< Whether this source is active */

CmdVelSubs(unsigned int idx) : idx(idx), active(false) {};
CmdVelSubs(unsigned int idx) : idx(idx), active(false) { };
~CmdVelSubs() { }

/** Fill attributes with a YAML node content */
void operator << (const YAML::Node& node);
};

CmdVelSubscribers() : allowed(VACANT) { }
CmdVelSubscribers() { }
~CmdVelSubscribers() { }

std::vector<CmdVelSubs>::size_type size() { return list.size(); };
CmdVelSubs& operator [] (unsigned int idx) { return list[idx]; };
std::vector<std::shared_ptr<CmdVelSubs>>::size_type size() { return list.size(); };
std::shared_ptr<CmdVelSubs>& operator [] (unsigned int idx) { return list[idx]; };

/**
* @brief Configures the subscribers from a yaml file.
Expand All @@ -95,9 +93,10 @@ class CmdVelSubscribers
unsigned int allowed;

private:
std::vector<CmdVelSubs> list;
std::vector<std::shared_ptr<CmdVelSubs>> list;
};

} // namespace yocs_cmd_vel_mux


#endif /* CMD_VEL_SUBSCRIBERS_HPP_ */
4 changes: 2 additions & 2 deletions yocs_cmd_vel_mux/include/yocs_cmd_vel_mux/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class FileNotFoundException: public std::runtime_error {

class EmptyCfgException: public std::runtime_error {
public:
EmptyCfgException()
: std::runtime_error("") {}
EmptyCfgException(const std::string& msg)
: std::runtime_error(msg) {}
virtual ~EmptyCfgException() throw() {}
};

Expand Down
115 changes: 83 additions & 32 deletions yocs_cmd_vel_mux/src/cmd_vel_mux_nodelet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,46 +29,60 @@ namespace yocs_cmd_vel_mux {

void CmdVelMuxNodelet::cmdVelCallback(const geometry_msgs::Twist::ConstPtr& msg, unsigned int idx)
{
// Reset general timer
common_timer.stop();
common_timer.start();

// Reset timer for this source
cmd_vel_sub[idx].timer.stop();
cmd_vel_sub[idx].timer.start();
cmd_vel_subs[idx]->timer.stop();
cmd_vel_subs[idx]->timer.start();

cmd_vel_sub[idx].active = true; // obviously his source is sending commands, so active
cmd_vel_subs[idx]->active = true; // obviously his source is sending commands, so active

// Give permit to publish to this source if it's the only active or is
// already allowed or has higher priority that the currently allowed
if ((cmd_vel_sub.allowed == VACANT) ||
(cmd_vel_sub.allowed == idx) ||
(cmd_vel_sub[idx].priority > cmd_vel_sub[cmd_vel_sub.allowed].priority))
if ((cmd_vel_subs.allowed == VACANT) ||
(cmd_vel_subs.allowed == idx) ||
(cmd_vel_subs[idx]->priority > cmd_vel_subs[cmd_vel_subs.allowed]->priority))
{
if (cmd_vel_sub.allowed != idx)
if (cmd_vel_subs.allowed != idx)
{
cmd_vel_sub.allowed = idx;
cmd_vel_subs.allowed = idx;

// Notify the world that a new cmd_vel source took the control
std_msgs::StringPtr acv_msg(new std_msgs::String);
acv_msg->data = cmd_vel_sub[idx].name;
acv_msg->data = cmd_vel_subs[idx]->name;
active_subscriber.publish(acv_msg);
}

mux_cmd_vel_pub.publish(msg);
output_topic_pub.publish(msg);
}
}

void CmdVelMuxNodelet::timerCallback(const ros::TimerEvent& event, unsigned int idx)
{
if (cmd_vel_sub.allowed == idx)
if (cmd_vel_subs.allowed == idx || (idx == GLOBAL_TIMER && cmd_vel_subs.allowed != VACANT))
{
if (idx == GLOBAL_TIMER)
{
// No cmd_vel messages timeout happened for ANYONE, so last active source got stuck without further
// messages; not a big problem, just dislodge it; but possibly reflect a problem in the controller
NODELET_WARN("CmdVelMux : No cmd_vel messages from ANY input received in the last %fs", common_timer_period);
NODELET_WARN("CmdVelMux : %s dislodged due to general timeout",
cmd_vel_subs[cmd_vel_subs.allowed]->name.c_str());
}

// No cmd_vel messages timeout happened to currently active source, so...
cmd_vel_sub.allowed = VACANT;
cmd_vel_subs.allowed = VACANT;

// ...notify the world that nobody is publishing on cmd_vel; its vacant
std_msgs::StringPtr acv_msg(new std_msgs::String);
acv_msg->data = "idle";
active_subscriber.publish(acv_msg);
}

cmd_vel_sub[idx].active = false;
if (idx != GLOBAL_TIMER)
cmd_vel_subs[idx]->active = false;
}

void CmdVelMuxNodelet::onInit()
Expand All @@ -90,15 +104,14 @@ void CmdVelMuxNodelet::onInit()
active_subscriber.publish(active_msg);

// could use a call to reloadConfiguration here, but it seems to automatically call it once with defaults anyway.
NODELET_DEBUG("CmdVelMux : successfully initialised");
NODELET_DEBUG("CmdVelMux : successfully initialized");
}

void CmdVelMuxNodelet::reloadConfiguration(yocs_cmd_vel_mux::reloadConfig &config, uint32_t unused_level)
{
ros::NodeHandle &nh = this->getNodeHandle();
ros::NodeHandle &nh_priv = this->getPrivateNodeHandle();
ros::NodeHandle &pnh = this->getPrivateNodeHandle();

boost::shared_ptr<std::istream> is;
std::unique_ptr<std::istream> is;

// Configuration can come directly as a yaml-formatted string or as a file path,
// but not both, so we give priority to the first option
Expand All @@ -112,7 +125,7 @@ void CmdVelMuxNodelet::reloadConfiguration(yocs_cmd_vel_mux::reloadConfig &confi
if (config.yaml_cfg_file == "")
{
// typically fired on startup, so look for a parameter to set a default
nh_priv.getParam("yaml_cfg_file", yaml_cfg_file);
pnh.getParam("yaml_cfg_file", yaml_cfg_file);
}
else
{
Expand Down Expand Up @@ -156,37 +169,75 @@ void CmdVelMuxNodelet::reloadConfiguration(yocs_cmd_vel_mux::reloadConfig &confi
*node >> output_name;
}
#endif
mux_cmd_vel_pub = nh_priv.advertise <geometry_msgs::Twist> (output_name, 10);

if (output_topic_name != output_name)
{
output_topic_name = output_name;
output_topic_pub = pnh.advertise<geometry_msgs::Twist>(output_topic_name, 10);
NODELET_DEBUG_STREAM("CmdVelMux : subscribe to output topic '" << output_name << "'");
}
else
{
NODELET_DEBUG_STREAM("CmdVelMux : no need to re-subscribe to output topic '" << output_name << "'");
}

/*********************
** Input Subscribers
**********************/
try
{
cmd_vel_sub.configure(doc["subscribers"]);
cmd_vel_subs.configure(doc["subscribers"]);
}
catch (EmptyCfgException& e)
{
NODELET_WARN("CmdVelMux : yaml configured zero subscribers, check yaml content.");
NODELET_WARN_STREAM("CmdVelMux : yaml configured zero subscribers, check yaml content");
}
catch (YamlException& e)
{
NODELET_ERROR_STREAM("CmdVelMux : yaml parsing problem [" << std::string(e.what()) + "]");
NODELET_ERROR_STREAM("CmdVelMux : yaml parsing problem [" << std::string(e.what()) << "]");
}

// Publishers and subscribers
for (unsigned int i = 0; i < cmd_vel_sub.size(); i++)
// (Re)create subscribers whose topic is invalid: new ones and those with changed names
double longest_timeout = 0.0;
for (unsigned int i = 0; i < cmd_vel_subs.size(); i++)
{
cmd_vel_sub[i].subs =
nh_priv.subscribe<geometry_msgs::Twist>(cmd_vel_sub[i].topic, 10, CmdVelFunctor(i, this));
if (!cmd_vel_subs[i]->subs)
{
cmd_vel_subs[i]->subs =
pnh.subscribe<geometry_msgs::Twist>(cmd_vel_subs[i]->topic, 10, CmdVelFunctor(i, this));
NODELET_DEBUG("CmdVelMux : subscribed to '%s' on topic '%s'. pr: %d, to: %.2f",
cmd_vel_subs[i]->name.c_str(), cmd_vel_subs[i]->topic.c_str(),
cmd_vel_subs[i]->priority, cmd_vel_subs[i]->timeout);
}
else
{
NODELET_DEBUG_STREAM("CmdVelMux : no need to re-subscribe to input topic '" << cmd_vel_subs[i]->topic << "'");
}

// Create (stopped by now) a one-shot timer for every subscriber
cmd_vel_sub[i].timer =
nh_priv.createTimer(ros::Duration(cmd_vel_sub[i].timeout), TimerFunctor(i, this), true, false);
if (!cmd_vel_subs[i]->timer)
{
// Create (stopped by now) a one-shot timer for every subscriber, if it doesn't exist yet
cmd_vel_subs[i]->timer =
pnh.createTimer(ros::Duration(cmd_vel_subs[i]->timeout), TimerFunctor(i, this), true, false);
}

NODELET_DEBUG("CmdVelMux : subscribed to '%s' on topic '%s'. pr: %d, to: %.2f",
cmd_vel_sub[i].name.c_str(), cmd_vel_sub[i].topic.c_str(),
cmd_vel_sub[i].priority, cmd_vel_sub[i].timeout);
if (cmd_vel_subs[i]->timeout > longest_timeout)
longest_timeout = cmd_vel_subs[i]->timeout;
}

if (!common_timer)
{
// Create another timer for cmd_vel messages from any source, so we can
// dislodge last active source if it gets stuck without further messages
common_timer_period = longest_timeout * 2.0;
common_timer =
pnh.createTimer(ros::Duration(common_timer_period), TimerFunctor(GLOBAL_TIMER, this), true, false);
}
else if (longest_timeout != (common_timer_period / 2.0))
{
// Longest timeout changed; just update existing timer period
common_timer_period = longest_timeout * 2.0;
common_timer.setPeriod(ros::Duration(common_timer_period));
}

NODELET_INFO_STREAM("CmdVelMux : (re)configured");
Expand Down
59 changes: 46 additions & 13 deletions yocs_cmd_vel_mux/src/cmd_vel_subscribers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,68 @@ namespace yocs_cmd_vel_mux {

void CmdVelSubscribers::CmdVelSubs::operator << (const YAML::Node& node)
{
node["name"] >> name;
node["topic"] >> topic;
node["timeout"] >> timeout;
node["priority"] >> priority;
// Fill attributes with a YAML node content
double new_timeout;
std::string new_topic;
node["name"] >> name;
node["topic"] >> new_topic;
node["timeout"] >> new_timeout;
node["priority"] >> priority;
#ifdef HAVE_NEW_YAMLCPP
if (node["short_desc"]) {
#else
if (node.FindValue("short_desc") != NULL) {
#endif
node["short_desc"] >> short_desc;
}
}

void CmdVelSubscribers::configure(const YAML::Node& node) {
if (new_topic != topic)
{
// Shutdown the topic if the name has changed so it gets recreated on configuration reload
// In the case of new subscribers, topic is empty and shutdown has just no effect
topic = new_topic;
subs.shutdown();
}

if (new_timeout != timeout)
{
// Change timer period if the timeout changed
timeout = new_timeout;
timer.setPeriod(ros::Duration(timeout));
}
}

list.clear();
void CmdVelSubscribers::configure(const YAML::Node& node)
{
try
{
if ( node.size() == 0 ) {
throw EmptyCfgException();
if (node.size() == 0)
{
throw EmptyCfgException("Configuration is empty");
}

std::vector<std::shared_ptr<CmdVelSubs>> new_list(node.size());
for (unsigned int i = 0; i < node.size(); i++)
{
// Parse every entries on YAML
CmdVelSubs subscriber(i);
subscriber << node[i];
list.push_back(subscriber);
// Parse entries on YAML
std::string new_subs_name = node[i]["name"].Scalar();
auto old_subs = std::find_if(list.begin(), list.end(),
[&new_subs_name](const std::shared_ptr<CmdVelSubs>& subs)
{return subs->name == new_subs_name;});
if (old_subs != list.end())
{
// For names already in the subscribers list, retain current object so we don't re-subscribe to the topic
new_list[i] = *old_subs;
}
else
{
new_list[i] = std::make_shared<CmdVelSubs>(i);
}
// update existing or new object with the new configuration
*new_list[i] << node[i];
}

list = new_list;
}
catch(EmptyCfgException& e) {
throw e;
Expand Down

0 comments on commit bf003ad

Please sign in to comment.