Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix culling and delay implementation #86

Merged
merged 5 commits into from Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions rmf_traffic/include/rmf_traffic/schedule/Database.hpp
Expand Up @@ -209,6 +209,23 @@ class Database : public ItineraryViewer, public Writer, public Snappable
const Query& parameters,
Version after) const;

/// Excessive cumulative delays have a risk of overloading the schedule
/// database by taking up an excessive amount of memory to track the delay
/// history. Typically this would be a cumulative delay on the scale of
/// weeks, months, years, etc, and almost certainly indicates an error in
/// the schedule participant's reporting.
///
/// Still, to avoid harmful effects of participant errors, when the database
/// detects an excessive cumulative delay for a participant, a new delay(~)
/// command will be converted into a set(~) command. This function lets you
/// decide what the threshold should be for that. The default is equivalent to
/// 2 hours.
void set_maximum_cumulative_delay(rmf_traffic::Duration maximum_delay);

/// Get the current cumulative delay for the participant.
std::optional<rmf_traffic::Duration> get_cumulative_delay(
ParticipantId participant) const;

/// Throw away all itineraries up to the specified time.
///
/// \param[in] time
Expand Down
37 changes: 26 additions & 11 deletions rmf_traffic/include/rmf_traffic/schedule/Participant.hpp
Expand Up @@ -47,25 +47,40 @@ class Participant
/// The new itinerary that the participant should reflect in the schedule.
bool set(PlanId plan, std::vector<Route> itinerary);

/// Add more routes for the participant. All of the routes currently in the
/// itinerary will still be in it.
/// The cumulative delay that has built up since the last call to
/// set(plan, ~). If plan does not match the current Plan ID of the itinerary
/// then this returns a nullopt.
///
/// \param[in] additional_routes
/// The new routes to add to the itinerary.
void extend(std::vector<Route> additional_routes);
/// \note This value will not grow when there are no are itineraries for this
/// participant.
std::optional<Duration> cumulative_delay(PlanId plan) const;

/// Set the cumulative delay of the specified plan. Returns false if plan does
/// not match the current Plan ID of the itinerary.
/// \param[in] plan
/// The unique plan ID that this cumulative delay is relevant for
///
/// \param[in] delay
/// The value for the cumulative delay
///
/// \param[in] tolerance
/// A tolerance threshold for reporting this delay. If the magnitude of
/// change in the cumulative delay is less than or equal to the magnitude of
/// this tolerance, then no change will be reported to the schedule. Use
/// Duration(0) to always report any non-zero change in cumulative delay.
bool cumulative_delay(
PlanId plan,
Duration delay,
Duration tolerance = Duration(0));

/// Delay the current itinerary.
///
/// \param[in] delay
/// The amount of time to push back the relevant waypoints.
[[deprecated("Use cumulative_delay instead")]]
void delay(Duration delay);

/// The cumulative delay that has built up since the last call to set().
///
/// \note This value will not grow when there are no are itineraries for this
/// participant.
//
// TODO(MXG): Should extend() also reset this value? Currently it does not.
[[deprecated("Use cumulative_delay instead")]]
Duration delay() const;

/// Notify the schedule that a checkpoint within a plan has been reached
Expand Down
Expand Up @@ -298,7 +298,7 @@ std::vector<Plan::Waypoint> find_dependencies(
std::unordered_map<CheckpointId, Dependencies> found_dependencies;
while (!no_conflicts)
{
if (++count > 10000)
if (++count > 100)
{
// This almost certainly means there's a bug causing an infinite loop.
// A normal value would be less than 10.
Expand Down Expand Up @@ -338,7 +338,7 @@ std::vector<Plan::Waypoint> find_dependencies(
ss << "-------------------------------------------------"
<< "\n[rmf_traffic::agv::Planner::plan] WARNING: "
<< "A rare anomaly has occurred in the planner. The Route "
<< "Validator has failed o recognize a specified route "
<< "Validator has failed to recognize a specified route "
<< "dependency: " << dependent << " on {"
<< dependency.on_participant << " " << dependency.on_plan
<< " " << dependency.on_route << " "
Expand Down
146 changes: 136 additions & 10 deletions rmf_traffic/src/rmf_traffic/schedule/Database.cpp
Expand Up @@ -32,9 +32,23 @@

#include <algorithm>
#include <list>
#include <deque>

namespace rmf_traffic {
namespace schedule {

//==============================================================================
class RouteStorageException : public std::exception
{
public:
const char* what() const noexcept override
{
return "Route storage is corrupted. This indicates that Database::cull was "
"called with a bad time value. Check whether your schedule node should "
"turn on simulation time.";
}
};

//==============================================================================
class Database::Implementation
{
Expand Down Expand Up @@ -108,6 +122,7 @@ class Database::Implementation
struct ParticipantState
{
std::vector<StorageId> active_routes;
rmf_traffic::Duration cumulative_delay;
std::unique_ptr<InconsistencyTracker> tracker;
ParticipantStorage storage;
std::shared_ptr<const ParticipantDescription> description;
Expand Down Expand Up @@ -186,6 +201,7 @@ class Database::Implementation
/// The current time is used to know when participants can be culled after
/// getting unregistered
rmf_traffic::Time current_time = rmf_traffic::Time(rmf_traffic::Duration(0));
rmf_traffic::Duration maximum_cumulative_delay = std::chrono::hours(2);

mutable DependencyTracker dependencies;

Expand Down Expand Up @@ -231,10 +247,38 @@ class Database::Implementation
Duration delay)
{
ParticipantStorage& storage = state.storage;
state.cumulative_delay += delay;
if (state.cumulative_delay > maximum_cumulative_delay)
{
// We have reached the maximum supported cumulative delay, so we will
// refresh the whole itinerary to avoid overloading the database.
Itinerary refresh_itinerary;
for (const auto storage_id : state.active_routes)
{
const auto s_it = state.storage.find(storage_id);
if (s_it == storage.end())
throw RouteStorageException();

auto& entry_storage = s_it->second;
const auto& route_entry = entry_storage.entry;
auto new_route = *route_entry->route;
if (!new_route.trajectory().empty())
new_route.trajectory().front().adjust_times(delay);
refresh_itinerary.push_back(new_route);
}

clear(participant, state, false);
insert_items(participant, state, refresh_itinerary);
return;
}

for (const StorageId storage_id : state.active_routes)
{
assert(storage.find(storage_id) != storage.end());
auto& entry_storage = storage.at(storage_id);
const auto s_it = storage.find(storage_id);
if (s_it == storage.end())
throw RouteStorageException();

auto& entry_storage = s_it->second;
const auto& route_entry = entry_storage.entry;
const auto route_id = route_entry->route_id;

Expand Down Expand Up @@ -282,9 +326,11 @@ class Database::Implementation
ParticipantStorage& storage = state.storage;
for (const StorageId storage_id : state.active_routes)
{
assert(storage.find(storage_id) != storage.end());
auto& entry_storage = storage.at(storage_id);
const auto s_it = storage.find(storage_id);
if (s_it == storage.end())
throw RouteStorageException();

auto& entry_storage = s_it->second;
auto route = entry_storage.entry->route;
const auto route_id = entry_storage.entry->route_id;

Expand Down Expand Up @@ -316,13 +362,17 @@ class Database::Implementation

void clear(
ParticipantId participant,
ParticipantState& state)
ParticipantState& state,
bool clear_progress = true)
{
ParticipantStorage& storage = state.storage;
for (const StorageId storage_id : state.active_routes)
{
assert(storage.find(storage_id) != storage.end());
auto& entry_storage = storage.at(storage_id);
const auto s_it = storage.find(storage_id);
if (s_it == storage.end())
throw RouteStorageException();

auto& entry_storage = s_it->second;
const auto& entry = *entry_storage.entry;

auto transition = std::make_unique<Transition>(
Expand Down Expand Up @@ -350,8 +400,10 @@ class Database::Implementation
entry_storage.timeline_handle = timeline.insert(entry_storage.entry);
}

state.cumulative_delay = rmf_traffic::Duration(0);
state.active_routes.clear();
state.progress.reached_checkpoints.clear();
if (clear_progress)
state.progress.reached_checkpoints.clear();
}

ParticipantId get_next_participant_id()
Expand Down Expand Up @@ -437,7 +489,14 @@ std::optional<Itinerary> Database::Debug::get_itinerary(
Itinerary itinerary;
itinerary.reserve(state.active_routes.size());
for (const RouteId route : state.active_routes)
itinerary.push_back(*state.storage.at(route).entry->route);
{
const auto s_it = state.storage.find(route);
if (s_it == state.storage.end())
throw RouteStorageException();

const auto& entry_storage = s_it->second;
itinerary.push_back(*entry_storage.entry->route);
}

return itinerary;
}
Expand Down Expand Up @@ -674,6 +733,7 @@ Writer::Registration register_participant_impl(
id,
Database::Implementation::ParticipantState{
{},
rmf_traffic::Duration(0),
std::move(tracker),
{},
description_ptr,
Expand Down Expand Up @@ -1271,7 +1331,14 @@ std::optional<ItineraryView> Database::get_itinerary(
ItineraryView itinerary;
itinerary.reserve(state.active_routes.size());
for (const RouteId route : state.active_routes)
itinerary.push_back(state.storage.at(route).entry->route);
{
const auto s_it = state.storage.find(route);
if (s_it == state.storage.end())
throw RouteStorageException();

const auto& entry_storage = s_it->second;
itinerary.push_back(entry_storage.entry->route);
}

return itinerary;
}
Expand Down Expand Up @@ -1490,6 +1557,23 @@ Viewer::View Database::query(const Query& parameters, const Version after) const
return Viewer::View::Implementation::make_view(std::move(inspector.routes));
}

//==============================================================================
void Database::set_maximum_cumulative_delay(rmf_traffic::Duration maximum_delay)
{
_pimpl->maximum_cumulative_delay = maximum_delay;
}

//==============================================================================
std::optional<rmf_traffic::Duration> Database::get_cumulative_delay(
ParticipantId participant) const
{
const auto s_it = _pimpl->states.find(participant);
if (s_it == _pimpl->states.end())
return std::nullopt;

return s_it->second.cumulative_delay;
}

//==============================================================================
Version Database::cull(Time time)
{
Expand All @@ -1511,7 +1595,49 @@ Version Database::cull(Time time)
const auto r_it = storage.find(route.storage_id);
assert(r_it != storage.end());

const auto a_it = std::find(
p_it->second.active_routes.begin(),
p_it->second.active_routes.end(),
route.storage_id);
if (a_it != p_it->second.active_routes.end())
p_it->second.active_routes.erase(a_it);

// If the thread of RouteEntries gets too long, it is possible that erasing
// the entry from storage could cause a stack overflow as its predecessors
// are recursively destructed. Therefore we will first store the
// predecessors in a queue before erasing the entry, and then destruct the
// queue in an order that will avoid recursive destruction.
std::unordered_set<Implementation::RouteEntryPtr> visited;
std::deque<Implementation::RouteEntryPtr> entries;
entries.push_back(r_it->second.entry);
while (const auto* transition = entries.back()->transition.get())
{
if (transition->predecessor.entry.use_count() == 1)
{
if (!visited.insert(transition->predecessor.entry).second)
{
// A circular reference like this should never happen, but if it does
// then we should exit right away.
// TODO(MXG): Consider escalating this issue with an error printout
break;
}
entries.push_back(transition->predecessor.entry);
}
else
{
// If the use count of the route entry is already higher than one, then
// we do not have to worry about recursively destructing it.
break;
}
}

storage.erase(r_it);
visited.clear();

// Now we pop the queue from front to back to make sure the destruction
// happens one entry at a time.
while (!entries.empty())
entries.pop_front();
}

_pimpl->timeline.cull(time);
Expand Down