Skip to content
This repository has been archived by the owner on Jul 22, 2021. It is now read-only.

Commit

Permalink
ROS2 Negotiation update callbacks (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
ddengster committed Aug 20, 2020
1 parent b365c95 commit 873cc2e
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 2 deletions.
4 changes: 4 additions & 0 deletions rmf_traffic/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ Changelog for package rmf_traffic
Forthcoming
-----------

1.1.0 (2020-09-XX)
------------------
* Allow a Negotiation Table Viewer to see rejected and forfeited statuses, and to check for a submission: [#140](https://github.com/osrf/rmf_core/pull/140/)

1.0.2 (2020-07-27)
------------------
* Improved definition of "traffic conflict" for vechiles that start too close: [#136](https://github.com/osrf/rmf_core/pull/136)
Expand Down
9 changes: 9 additions & 0 deletions rmf_traffic/include/rmf_traffic/schedule/Negotiation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ class Negotiation
/// remain in sync with the state of the source Table.
bool defunct() const;

/// Returns true if the proposal put on this Table has been rejected.
bool rejected() const;

/// Returns true if the proposer for this Table has forfeited.
bool forfeited() const;

/// Return the submission on this Negotiation Table if it has one.
const Itinerary* submission() const;

class Implementation;
private:
Viewer();
Expand Down
29 changes: 28 additions & 1 deletion rmf_traffic/src/rmf_traffic/schedule/Negotiation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ class Negotiation::Table::Viewer::Implementation
rmf_utils::optional<ParticipantId> parent_id;
VersionedKeySequence sequence;
std::shared_ptr<const bool> defunct;
bool rejected;
bool forfeited;
rmf_utils::optional<Itinerary> itinerary;

Viewer::View query(
const Query::Spacetime& spacetime,
Expand Down Expand Up @@ -1043,6 +1046,27 @@ bool Negotiation::Table::Viewer::defunct() const
return *_pimpl->defunct;
}

//==============================================================================
bool Negotiation::Table::Viewer::rejected() const
{
return _pimpl->rejected;
}

//==============================================================================
bool Negotiation::Table::Viewer::forfeited() const
{
return _pimpl->forfeited;
}

//==============================================================================
const Itinerary* Negotiation::Table::Viewer::submission() const
{
if (_pimpl->itinerary)
return &(*_pimpl->itinerary);

return nullptr;
}

//==============================================================================
Negotiation::Table::Viewer::Viewer()
{
Expand All @@ -1069,7 +1093,10 @@ auto Negotiation::Table::viewer() const -> ViewerPtr
_pimpl->schedule_viewer,
parent_id,
_pimpl->sequence,
_pimpl->defunct.get()));
_pimpl->defunct.get(),
_pimpl->rejected,
_pimpl->forfeited,
_pimpl->itinerary));

return _pimpl->cached_table_viewer;
}
Expand Down
3 changes: 3 additions & 0 deletions rmf_traffic_ros2/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ Forthcoming
1.1.0 (2020-09-XX)
------------------
* Add a schedule node factory to the public API: [#147](https://github.com/osrf/rmf_core/pull/147)
* Allow the Negotiation class to accept callbacks for Table updates: [#140](https://github.com/osrf/rmf_core/pull/140/)
* Allow the Negotiation class to provide views for existing Tables: [#140](https://github.com/osrf/rmf_core/pull/140/)
* Allow the Negotiation class to store up to a certain number of completed negotiations: [#140](https://github.com/osrf/rmf_core/pull/140/)

1.0.2 (2020-07-27)
------------------
Expand Down
42 changes: 42 additions & 0 deletions rmf_traffic_ros2/include/rmf_traffic_ros2/schedule/Negotiation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,48 @@ class Negotiation
/// Get the current timeout duration setting.
rmf_traffic::Duration timeout_duration() const;

using TableViewPtr = rmf_traffic::schedule::Negotiation::Table::ViewerPtr;
using StatusUpdateCallback =
std::function<void (uint64_t conflict_version, TableViewPtr table_view)>;

/// Register a callback with this Negotiation manager that triggers
/// on negotiation status updates.
///
/// \param[in] cb
/// The callback function to be called upon status updates.
void on_status_update(StatusUpdateCallback cb);

using StatusConclusionCallback =
std::function<void (uint64_t conflict_version, bool success)>;

/// Register a callback with this Negotiation manager that triggers
/// on negotiation status conclusions.
///
/// \param[in] cb
/// The callback function to be called upon status conclusions.
void on_conclusion(StatusConclusionCallback cb);

/// Get a Negotiation::TableView that provides a view into what participants are
/// proposing.
///
/// This function does not care about table versioning.
/// \param[in] conflict_version
/// The conflict version of the negotiation
/// \param[in] sequence
/// The sequence of participant ids. Follows convention of other sequences
/// (ie. The last ParticipantId is the owner of the table)
///
/// \return A TableView into what participants are proposing.
TableViewPtr table_view(
uint64_t conflict_version,
const std::vector<rmf_traffic::schedule::ParticipantId>& sequence) const;

/// Set the number of negotiations to retain.
///
/// \param[in] count
/// The number of negotiations to retain
void set_retained_history_count(uint count);

/// Register a negotiator with this Negotiation manager.
///
/// \param[in] for_participant
Expand Down
122 changes: 121 additions & 1 deletion rmf_traffic_ros2/src/rmf_traffic_ros2/schedule/Negotiation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,19 @@ class Negotiation::Implementation
using Approvals = std::unordered_map<Version, ApprovalCallbackMap>;
Approvals approvals;

// Status update callbacks
using TableViewPtr = rmf_traffic::schedule::Negotiation::Table::ViewerPtr;
using StatusUpdateCallback =
std::function<void (uint64_t conflict_version, TableViewPtr& view)>;
StatusUpdateCallback status_callback;

using StatusConclusionCallback =
std::function<void (uint64_t conflict_version, bool success)>;
StatusConclusionCallback conclusion_callback;

uint retained_history_count = 0;
std::map<Version, rmf_traffic::schedule::Negotiation> history;

Implementation(
rclcpp::Node& node_,
std::shared_ptr<const rmf_traffic::schedule::Snappable> viewer_,
Expand Down Expand Up @@ -566,6 +579,12 @@ class Negotiation::Implementation
if (!updated)
return;

if (status_callback)
{
auto table_view = received_table->viewer();
status_callback(msg.conflict_version, table_view);
}

std::vector<TablePtr> queue = room.check_cache(*negotiators);

if (!participating)
Expand Down Expand Up @@ -602,7 +621,7 @@ class Negotiation::Implementation
if (!table)
{
std::string error =
"[rmf_traffic_ros2::schedule::Negotiation::receive_proposal] "
"[rmf_traffic_ros2::schedule::Negotiation::receive_rejection] "
"Receieved a rejection for negotiation ["
+ std::to_string(msg.conflict_version) + "] for an "
"unknown table: [";
Expand All @@ -622,6 +641,12 @@ class Negotiation::Implementation
if (!updated)
return;

if (status_callback)
{
auto table_view = table->viewer();
status_callback(msg.conflict_version, table_view);
}

std::vector<TablePtr> queue = room.check_cache(*negotiators);

if (!negotiate_it->second.participating)
Expand Down Expand Up @@ -657,6 +682,12 @@ class Negotiation::Implementation

table->forfeit(msg.table.back().version);

if (status_callback)
{
auto table_view = table->viewer();
status_callback(msg.conflict_version, table_view);
}

respond_to_queue(room.check_cache(*negotiators), msg.conflict_version);
}

Expand Down Expand Up @@ -858,6 +889,19 @@ class Negotiation::Implementation
approvals.erase(approval_callback_it);
}

if (conclusion_callback)
conclusion_callback(msg.conflict_version, msg.resolved);

// add to retained history
if (retained_history_count > 0)
{
while (history.size() >= retained_history_count)
history.erase(history.begin());

history.emplace(
msg.conflict_version, std::move(negotiate_it->second.room.negotiation));
}

// Erase these entries because the negotiation has concluded
negotiations.erase(negotiate_it);
}
Expand Down Expand Up @@ -949,6 +993,56 @@ class Negotiation::Implementation

return std::make_shared<Handle>(for_participant, negotiators);
}

void set_retained_history_count(uint count)
{
retained_history_count = count;
}

TableViewPtr table_view(
uint64_t conflict_version,
const std::vector<ParticipantId>& sequence) const
{
const auto negotiate_it = negotiations.find(conflict_version);
rmf_traffic::schedule::Negotiation::ConstTablePtr table;

if (negotiate_it == negotiations.end())
{
const auto history_it = history.find(conflict_version);
if (history_it == history.end())
{
RCLCPP_WARN(node.get_logger(), "Conflict version %llu does not exist."
"It may have been successful and wiped", conflict_version);
return nullptr;
}

table = history_it->second.table(sequence);
}
else
{
const auto& room = negotiate_it->second.room;
const Negotiation& negotiation = room.negotiation;
table = negotiation.table(sequence);
}

if (!table)
{
RCLCPP_WARN(node.get_logger(), "Table not found");
return nullptr;
}

return table->viewer();
}

void on_status_update(StatusUpdateCallback cb)
{
status_callback = cb;
}

void on_conclusion(StatusConclusionCallback cb)
{
conclusion_callback = cb;
}
};

//==============================================================================
Expand All @@ -962,6 +1056,18 @@ Negotiation::Negotiation(
// Do nothing
}

//==============================================================================
void Negotiation::on_status_update(StatusUpdateCallback cb)
{
_pimpl->on_status_update(cb);
}

//==============================================================================
void Negotiation::on_conclusion(StatusConclusionCallback cb)
{
_pimpl->on_conclusion(cb);
}

//==============================================================================
Negotiation& Negotiation::timeout_duration(rmf_traffic::Duration duration)
{
Expand All @@ -975,6 +1081,20 @@ rmf_traffic::Duration Negotiation::timeout_duration() const
return _pimpl->timeout;
}

//==============================================================================
Negotiation::TableViewPtr Negotiation::table_view(
uint64_t conflict_version,
const std::vector<ParticipantId>& sequence) const
{
return _pimpl->table_view(conflict_version, sequence);
}

//==============================================================================
void Negotiation::set_retained_history_count(uint count)
{
return _pimpl->set_retained_history_count(count);
}

//==============================================================================
std::shared_ptr<void> Negotiation::register_negotiator(
rmf_traffic::schedule::ParticipantId for_participant,
Expand Down

0 comments on commit 873cc2e

Please sign in to comment.