Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes subtle memory leak, adds comments #1617

Merged
merged 1 commit into from Apr 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion llarp/path/path_context.cpp
Expand Up @@ -302,7 +302,7 @@ namespace llarp
{
if (itr->second->Expired(now))
{
m_Router->outboundMessageHandler().QueueRemoveEmptyPath(itr->first);
m_Router->outboundMessageHandler().RemovePath(itr->first);
itr = map.erase(itr);
}
else
Expand Down
4 changes: 2 additions & 2 deletions llarp/path/pathset.cpp
Expand Up @@ -85,9 +85,9 @@ namespace llarp
if (itr->second->Expired(now))
{
PathID_t txid = itr->second->TXID();
router->outboundMessageHandler().QueueRemoveEmptyPath(std::move(txid));
router->outboundMessageHandler().RemovePath(std::move(txid));
PathID_t rxid = itr->second->RXID();
router->outboundMessageHandler().QueueRemoveEmptyPath(std::move(rxid));
router->outboundMessageHandler().RemovePath(std::move(rxid));
itr = m_Paths.erase(itr);
}
else
Expand Down
2 changes: 1 addition & 1 deletion llarp/router/i_outbound_message_handler.hpp
Expand Up @@ -38,7 +38,7 @@ namespace llarp
Tick() = 0;

virtual void
QueueRemoveEmptyPath(const PathID_t& pathid) = 0;
RemovePath(const PathID_t& pathid) = 0;

virtual util::StatusObject
ExtractStatus() const = 0;
Expand Down
95 changes: 53 additions & 42 deletions llarp/router/outbound_message_handler.cpp
Expand Up @@ -15,14 +15,17 @@ namespace llarp
{
const PathID_t OutboundMessageHandler::zeroID;

using namespace std::chrono_literals;

OutboundMessageHandler::OutboundMessageHandler(size_t maxQueueSize)
: outboundQueue(maxQueueSize), removedPaths(20), removedSomePaths(false)
: outboundQueue(maxQueueSize), recentlyRemovedPaths(5s), removedSomePaths(false)
{}

bool
OutboundMessageHandler::QueueMessage(
const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback)
{
// if the destination is invalid, callback with failure and return
if (not _linkManager->SessionIsClient(remote) and not _lookupHandler->RemoteIsAllowed(remote))
{
DoCallback(callback, SendStatus::InvalidRouter);
Expand All @@ -44,26 +47,31 @@ namespace llarp

std::copy_n(buf.base, buf.sz, message.first.data());

// if we have a session to the destination, queue the message and return
if (_linkManager->HasSessionTo(remote))
{
QueueOutboundMessage(remote, std::move(message), msg.pathid, priority);
return true;
}

// if we don't have a session to the destination, queue the message onto
// a special pending session queue for that destination, and then create
// that pending session if there is not already a session establish attempt
// in progress.
bool shouldCreateSession = false;
{
util::Lock l(_mutex);

// create queue for <remote> if it doesn't exist, and get iterator
auto itr_pair = pendingSessionMessageQueues.emplace(remote, MessageQueue());
auto [queue_itr, is_new] = pendingSessionMessageQueues.emplace(remote, MessageQueue());

MessageQueueEntry entry;
entry.priority = priority;
entry.message = message;
entry.router = remote;
itr_pair.first->second.push(std::move(entry));
queue_itr->second.push(std::move(entry));

shouldCreateSession = itr_pair.second;
shouldCreateSession = is_new;
}

if (shouldCreateSession)
Expand All @@ -77,26 +85,33 @@ namespace llarp
void
OutboundMessageHandler::Tick()
{
m_Killer.TryAccess([self = this]() {
self->ProcessOutboundQueue();
self->RemoveEmptyPathQueues();
self->SendRoundRobin();
m_Killer.TryAccess([this]() {
recentlyRemovedPaths.Decay();
ProcessOutboundQueue();
SendRoundRobin();
});
}

void
OutboundMessageHandler::QueueRemoveEmptyPath(const PathID_t& pathid)
OutboundMessageHandler::RemovePath(const PathID_t& pathid)
{
m_Killer.TryAccess([self = this, pathid]() {
if (self->removedPaths.full())
m_Killer.TryAccess([this, pathid]() {
/* add the path id to a list of recently removed paths to act as a filter
* for messages that are queued but haven't been sorted into path queues yet.
*
* otherwise these messages would re-create the path queue we just removed, and
* those path queues would be leaked / never removed.
*/
recentlyRemovedPaths.Insert(pathid);
auto itr = outboundMessageQueues.find(pathid);
if (itr != outboundMessageQueues.end())
{
self->RemoveEmptyPathQueues();
outboundMessageQueues.erase(itr);
}
self->removedPaths.pushBack(pathid);
removedSomePaths = true;
});
}

// TODO: this
util::StatusObject
OutboundMessageHandler::ExtractStatus() const
{
Expand Down Expand Up @@ -241,6 +256,8 @@ namespace llarp
{
MessageQueueEntry entry;
entry.message = std::move(msg);

// copy callback in case we need to call it, so we can std::move(entry)
auto callback_copy = entry.message.second;
entry.router = remote;
entry.pathid = pathid;
Expand All @@ -266,17 +283,23 @@ namespace llarp
{
while (not outboundQueue.empty())
{
// TODO: can we add util::thread::Queue::front() for move semantics here?
MessageQueueEntry entry = outboundQueue.popFront();

auto itr_pair = outboundMessageQueues.emplace(entry.pathid, MessageQueue());
// messages may still be queued for processing when a pathid is removed,
// so check here if the pathid was recently removed.
if (recentlyRemovedPaths.Contains(entry.pathid))
{
return;
}

auto [queue_itr, is_new] = outboundMessageQueues.emplace(entry.pathid, MessageQueue());

if (itr_pair.second && !entry.pathid.IsZero())
if (is_new && !entry.pathid.IsZero())
{
roundRobinOrder.push(entry.pathid);
}

MessageQueue& path_queue = itr_pair.first->second;
MessageQueue& path_queue = queue_itr->second;

if (path_queue.size() < MAX_PATH_QUEUE_SIZE || entry.pathid.IsZero())
{
Expand All @@ -290,41 +313,25 @@ namespace llarp
}
}

void
OutboundMessageHandler::RemoveEmptyPathQueues()
{
removedSomePaths = false;
if (removedPaths.empty())
return;

while (not removedPaths.empty())
{
auto itr = outboundMessageQueues.find(removedPaths.popFront());
if (itr != outboundMessageQueues.end())
{
outboundMessageQueues.erase(itr);
}
}
removedSomePaths = true;
}

void
OutboundMessageHandler::SendRoundRobin()
{
m_queueStats.numTicks++;

// send non-routing messages first priority
auto& non_routing_mq = outboundMessageQueues[zeroID];
while (not non_routing_mq.empty())
// send routing messages first priority
auto& routing_mq = outboundMessageQueues[zeroID];
while (not routing_mq.empty())
{
const MessageQueueEntry& entry = non_routing_mq.top();
const MessageQueueEntry& entry = routing_mq.top();
Send(entry.router, entry.message);
non_routing_mq.pop();
routing_mq.pop();
}

size_t empty_count = 0;
size_t num_queues = roundRobinOrder.size();

// if any paths have been removed since last tick, remove any stale
// entries from the round-robin ordering
if (removedSomePaths)
{
for (size_t i = 0; i < num_queues; i++)
Expand All @@ -338,6 +345,7 @@ namespace llarp
}
}
}
removedSomePaths = false;

num_queues = roundRobinOrder.size();
size_t sent_count = 0;
Expand All @@ -346,7 +354,10 @@ namespace llarp
return;
}

while (sent_count < MAX_OUTBOUND_MESSAGES_PER_TICK) // TODO: better stop condition
// send messages for each pathid in roundRobinOrder, stopping when
// either every path's queue is empty or a set maximum amount of
// messages have been sent.
while (sent_count < MAX_OUTBOUND_MESSAGES_PER_TICK)
{
PathID_t pathid = std::move(roundRobinOrder.front());
roundRobinOrder.pop();
Expand Down
85 changes: 80 additions & 5 deletions llarp/router/outbound_message_handler.hpp
Expand Up @@ -4,6 +4,7 @@

#include <llarp/ev/ev.hpp>
#include <llarp/util/thread/queue.hpp>
#include <llarp/util/decaying_hashset.hpp>
#include <llarp/path/path_types.hpp>
#include <llarp/router_id.hpp>

Expand All @@ -27,15 +28,45 @@ namespace llarp

OutboundMessageHandler(size_t maxQueueSize = MAX_OUTBOUND_QUEUE_SIZE);

/* Called to queue a message to be sent to a router.
*
* If there is no session with the destination router, the message is added to a
* pending session queue for that router. If there is no pending session to that
* router, one is created.
*
* If there is a session to the destination router, the message is placed on the shared
* outbound message queue to be processed on Tick().
*
* When this class' Tick() is called, that queue is emptied and the messages there
* are placed in their paths' respective individual queues.
*
* Returns false if encoding the message into a buffer fails, true otherwise.
* A return value of true merely means we successfully processed the queue request,
* so for example an invalid destination still yields a true return.
*/
bool
QueueMessage(const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback)
override EXCLUDES(_mutex);

/* Called once per event loop tick.
*
* Processes messages on the shared message queue into their paths' respective
* individual queues.
*
* Removes the individual queues for paths which have died / expired, as informed by
* QueueRemoveEmptyPath.
*
* Sends all routing messages that have been queued, indicated by pathid 0 when queued.
* Sends messages from path queues until all are empty or a set cap has been reached.
*/
void
Tick() override;

/* Called from outside this class to inform it that a path has died / expired
* and its queue should be discarded.
*/
void
QueueRemoveEmptyPath(const PathID_t& pathid) override;
RemovePath(const PathID_t& pathid) override;

util::StatusObject
ExtractStatus() const override;
Expand All @@ -46,6 +77,9 @@ namespace llarp
private:
using Message = std::pair<std::vector<byte_t>, SendStatusHandler>;

/* A message that has been queued for sending, but not yet
* processed into an individual path's message queue.
*/
struct MessageQueueEntry
{
uint16_t priority;
Expand Down Expand Up @@ -73,6 +107,14 @@ namespace llarp

using MessageQueue = std::priority_queue<MessageQueueEntry>;

/* If a session is not yet created with the destination router for a message,
* a special queue is created for that router and an attempt is made to
* establish a session. When this establish attempt concludes, either
* the messages are then sent to that router immediately, on success, or
* the messages are dropped and their send status callbacks are invoked with
* the appropriate send status.
*/

void
OnSessionEstablished(const RouterID& router);

Expand All @@ -91,6 +133,7 @@ namespace llarp
void
OnSessionResult(const RouterID& router, const SessionResult result);

/* queues a message's send result callback onto the event loop */
void
DoCallback(SendStatusHandler callback, SendStatus status);

Expand All @@ -100,30 +143,62 @@ namespace llarp
bool
EncodeBuffer(const ILinkMessage& msg, llarp_buffer_t& buf);

/* sends the message along to the link layer, and hopefully out to the network
*
* returns the result of the call to LinkManager::SendTo()
*/
bool
Send(const RouterID& remote, const Message& msg);

/* Sends the message along to the link layer if we have a session to the remote
*
* returns the result of the Send() call, or false if no session.
*/
bool
SendIfSession(const RouterID& remote, const Message& msg);

/* queues a message to the shared outbound message queue.
*
* If the queue is full, the message is dropped and the message's status
* callback is invoked with a congestion status.
*
* When this class' Tick() is called, that queue is emptied and the messages there
* are placed in their paths' respective individual queues.
*/
bool
QueueOutboundMessage(
const RouterID& remote, Message&& msg, const PathID_t& pathid, uint16_t priority = 0);

/* Processes messages on the shared message queue into their paths' respective
* individual queues.
*/
void
ProcessOutboundQueue();

void
RemoveEmptyPathQueues();

/*
* Sends all routing messages that have been queued, indicated by pathid 0 when queued.
*
* Sends messages from path queues until all are empty or a set cap has been reached.
* This will send one message from each queue in a round-robin fashion such that they
* all have roughly equal access to bandwidth. A notion of priority may be introduced
* at a later time, but for now only routing messages get priority.
*/
void
SendRoundRobin();

/* Invoked when an outbound session establish attempt has concluded.
*
* If the outbound session was successfully created, sends any messages queued
* for that destination along to it.
*
* If the session was unsuccessful, invokes the send status callbacks of those
* queued messages and drops them.
*/
void
FinalizeSessionRequest(const RouterID& router, SendStatus status) EXCLUDES(_mutex);

llarp::thread::Queue<MessageQueueEntry> outboundQueue;
llarp::thread::Queue<PathID_t> removedPaths;
llarp::util::DecayingHashSet<PathID_t> recentlyRemovedPaths;
bool removedSomePaths;

mutable util::Mutex _mutex; // protects pendingSessionMessageQueues
Expand Down