Skip to content

Commit

Permalink
Merge pull request #873 from tewinget/path-queues
Browse files Browse the repository at this point in the history
Add per-path queues, prioritize control messages over traffic
  • Loading branch information
majestrate committed Nov 14, 2019
2 parents c05d7b6 + 5ce6ed5 commit 17efd37
Show file tree
Hide file tree
Showing 16 changed files with 280 additions and 39 deletions.
3 changes: 1 addition & 2 deletions llarp/ev/ev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,7 @@ namespace llarp
#ifdef _WIN32
struct llarp_fd_promise
{
void
Set(std::pair< int, int >)
void Set(std::pair< int, int >)
{
}

Expand Down
3 changes: 3 additions & 0 deletions llarp/messages/link_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <link/session.hpp>
#include <router_id.hpp>
#include <util/bencode.hpp>
#include <path/path_types.hpp>

#include <vector>

Expand All @@ -19,6 +20,8 @@ namespace llarp
ILinkSession* session = nullptr;
uint64_t version = LLARP_PROTO_VERSION;

PathID_t pathid;

ILinkMessage() = default;

virtual ~ILinkMessage() = default;
Expand Down
2 changes: 0 additions & 2 deletions llarp/messages/relay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace llarp
{
struct RelayUpstreamMessage : public ILinkMessage
{
PathID_t pathid;
Encrypted< MAX_LINK_MSG_SIZE - 128 > X;
TunnelNonce Y;

Expand All @@ -37,7 +36,6 @@ namespace llarp

struct RelayDownstreamMessage : public ILinkMessage
{
PathID_t pathid;
Encrypted< MAX_LINK_MSG_SIZE - 128 > X;
TunnelNonce Y;

Expand Down
15 changes: 13 additions & 2 deletions llarp/messages/relay_commit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ namespace llarp
self->hop = nullptr;
}

// TODO: If decryption has succeeded here but we otherwise don't
// want to or can't accept the path build request, send
// a status message saying as much.
static void
HandleDecrypted(llarp_buffer_t* buf,
std::shared_ptr< LRCMFrameDecrypt > self)
Expand All @@ -344,8 +347,16 @@ namespace llarp
return;
}

info.txID = self->record.txid;
info.rxID = self->record.rxid;
info.txID = self->record.txid;
info.rxID = self->record.rxid;

if(info.txID.IsZero() || info.rxID.IsZero())
{
llarp::LogError("LRCM refusing zero pathid");
self->decrypter = nullptr;
return;
}

info.upstream = self->record.nextHop;

// generate path key as we are in a worker thread
Expand Down
11 changes: 9 additions & 2 deletions llarp/path/path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,15 @@ namespace llarp
for(size_t idx = 0; idx < hsz; ++idx)
{
hops[idx].rc = h[idx];
hops[idx].txID.Randomize();
hops[idx].rxID.Randomize();
do
{
hops[idx].txID.Randomize();
} while(hops[idx].txID.IsZero());

do
{
hops[idx].rxID.Randomize();
} while(hops[idx].rxID.IsZero());
}

for(size_t idx = 0; idx < hsz - 1; ++idx)
Expand Down
1 change: 1 addition & 0 deletions llarp/path/path_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ namespace llarp
{
if(itr->second->Expired(now))
{
m_Router->outboundMessageHandler().QueueRemoveEmptyPath(itr->first);
itr = map.erase(itr);
}
else
Expand Down
1 change: 1 addition & 0 deletions llarp/path/path_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace llarp
{
struct PathID_t final : public AlignedBuffer< PATHIDSIZE >
{
using Hash = AlignedBuffer< PATHIDSIZE >::Hash;
};

} // namespace llarp
Expand Down
2 changes: 1 addition & 1 deletion llarp/path/pathbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ namespace llarp
void Builder::Tick(llarp_time_t)
{
const auto now = llarp::time_now_ms();
ExpirePaths(now);
ExpirePaths(now, m_router);
if(ShouldBuildMore(now))
BuildOne();
TickPaths(m_router);
Expand Down
4 changes: 3 additions & 1 deletion llarp/path/pathset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ namespace llarp
}

void
PathSet::ExpirePaths(llarp_time_t now)
PathSet::ExpirePaths(llarp_time_t now, AbstractRouter* router)
{
Lock_t l(&m_PathsMutex);
if(m_Paths.size() == 0)
Expand All @@ -83,6 +83,8 @@ namespace llarp
{
if(itr->second->Expired(now))
{
router->outboundMessageHandler().QueueRemoveEmptyPath(
itr->second->TXID());
itr = m_Paths.erase(itr);
}
else
Expand Down
10 changes: 5 additions & 5 deletions llarp/path/pathset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ namespace llarp
GetByUpstream(RouterID remote, PathID_t rxid) const;

void
ExpirePaths(llarp_time_t now);
ExpirePaths(llarp_time_t now, AbstractRouter* router);

/// get the number of paths in this status
size_t
Expand Down Expand Up @@ -199,15 +199,15 @@ namespace llarp
}

/// override me in subtype
virtual bool
HandleGotIntroMessage(std::shared_ptr< const dht::GotIntroMessage >)
virtual bool HandleGotIntroMessage(
std::shared_ptr< const dht::GotIntroMessage >)
{
return false;
}

/// override me in subtype
virtual bool
HandleGotRouterMessage(std::shared_ptr< const dht::GotRouterMessage >)
virtual bool HandleGotRouterMessage(
std::shared_ptr< const dht::GotRouterMessage >)
{
return false;
}
Expand Down
11 changes: 11 additions & 0 deletions llarp/router/i_outbound_message_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ namespace llarp

struct ILinkMessage;
struct RouterID;
struct PathID_t;

using SendStatusHandler = std::function< void(SendStatus) >;

static const size_t MAX_PATH_QUEUE_SIZE = 40;
static const size_t MAX_OUTBOUND_QUEUE_SIZE = 200;
static const size_t MAX_OUTBOUND_MESSAGES_PER_TICK = 20;

struct IOutboundMessageHandler
{
virtual ~IOutboundMessageHandler() = default;
Expand All @@ -31,6 +36,12 @@ namespace llarp
QueueMessage(const RouterID &remote, const ILinkMessage *msg,
SendStatusHandler callback) = 0;

virtual void
Tick() = 0;

virtual void
QueueRemoveEmptyPath(const PathID_t &pathid) = 0;

virtual util::StatusObject
ExtractStatus() const = 0;
};
Expand Down
Loading

0 comments on commit 17efd37

Please sign in to comment.