Skip to content

Commit

Permalink
Implement time-based blacklist aging
Browse files Browse the repository at this point in the history
  • Loading branch information
Neverlord committed Apr 28, 2020
1 parent 9640781 commit 94c4c4b
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 20 deletions.
70 changes: 66 additions & 4 deletions include/broker/alm/peer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
#include <map>
#include <vector>

#include <caf/actor_system.hpp>
#include <caf/actor_system_config.hpp>
#include <caf/behavior.hpp>
#include <caf/local_actor.hpp>

#include "broker/alm/multipath.hh"
#include "broker/alm/routing_table.hh"
#include "broker/atoms.hh"
#include "broker/defaults.hh"
#include "broker/detail/assert.hh"
#include "broker/detail/lift.hh"
#include "broker/detail/prefix_matcher.hh"
Expand Down Expand Up @@ -78,7 +82,36 @@ public:

using multipath_type = multipath<peer_id_type>;

using blacklist_type = blacklist<peer_id_type>;
// -- nested types -----------------------------------------------------------

struct blacklist_type {
blacklist<peer_id_type> entries;

caf::timespan aging_interval;

caf::timespan max_age;

caf::actor_clock::time_point next_aging_cycle;
};

// -- constructors, destructors, and assignment operators --------------------

peer() {
blacklist_.aging_interval = defaults::path_blacklist_aging_interval;
blacklist_.max_age = defaults::path_blacklist_max_age;
blacklist_.next_aging_cycle = caf::actor_clock::time_point{};
}

explicit peer(caf::local_actor* selfptr) {
using caf::get_or;
auto& cfg = selfptr->system().config();
blacklist_.aging_interval = get_or(cfg, "path-blacklist-aging-interval",
defaults::path_blacklist_aging_interval);
blacklist_.max_age = get_or(cfg, "path-blacklist-aging-interval",
defaults::path_blacklist_max_age);
blacklist_.next_aging_cycle = selfptr->clock().now()
+ blacklist_.aging_interval;
}

// -- properties -------------------------------------------------------------

Expand Down Expand Up @@ -116,6 +149,10 @@ public:
return result;
}

auto& blacklist() const noexcept {
return blacklist_;
}

// -- convenience functions for subscription information ---------------------

bool has_remote_subscriber(const topic& x) const noexcept {
Expand Down Expand Up @@ -229,13 +266,33 @@ public:
return false;
}
// Drop all messages that arrive after blacklisting a path.
if (blacklisted(path, path_ts, blacklist_)) {
if (blacklisted(path, path_ts, blacklist_.entries)) {
BROKER_DEBUG("drop message from a blacklisted path");
return false;
}
return true;
}

/// Removes all entries from the blacklist that exceeded their maximum age.
/// The purpose of this function is to clean up state periodically to avoid
/// unbound growth of the blacklist. Calling it at fixed intervals is not
/// required. Triggering aging on peer messages suffices, since only peer
/// messages can grow the blacklist in the first place.
void age_blacklist() {
if (blacklist_.entries.empty())
return;
auto now = dref().self()->clock().now();
if (now < blacklist_.next_aging_cycle)
return;
auto predicate = [this, now](const auto& entry) {
return entry.first_seen + blacklist_.max_age <= now;
};
auto& entries = blacklist_.entries;
entries.erase(std::remove_if(entries.begin(), entries.end(), predicate),
entries.end());
blacklist_.next_aging_cycle = now + blacklist_.aging_interval;
}

/// Adds the reverse `path` to the routing table and stores the subscription
/// if it is new.
/// @returns A pair containing a list of new peers learned through the update
Expand Down Expand Up @@ -294,6 +351,8 @@ public:
// though.
flood_subscriptions();
}
// Clean up some state if possible.
age_blacklist();
}

void handle_path_revocation(peer_id_list& path, vector_timestamp path_ts,
Expand All @@ -308,7 +367,8 @@ public:
// Handle the subscription part of the message.
auto&& [new_peers, increased_time] = handle_update(path, path_ts, filter);
// Handle the recovation part of the message.
auto [i, added] = emplace(blacklist_, path[0], path_ts[0], revoked_hop);
auto [i, added] = emplace(blacklist_.entries, dref().self(), path[0],
path_ts[0], revoked_hop);
if (added) {
if (!increased_time)
++timestamp_;
Expand All @@ -333,6 +393,8 @@ public:
dref().peer_discovered(id);
flood_subscriptions();
}
// Clean up some state if possible.
age_blacklist();
}

void handle_publication(message_type& msg) {
Expand Down Expand Up @@ -548,7 +610,7 @@ private:
std::unordered_map<peer_id_type, filter_type> peer_filters_;

/// Stores revoked paths.
blacklist<peer_id_type> blacklist_;
blacklist_type blacklist_;
};

} // namespace broker::alm
17 changes: 11 additions & 6 deletions include/broker/alm/routing_table.hh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <tuple>
#include <utility>

#include <caf/actor_clock.hpp>

#include "broker/alm/lamport_timestamp.hh"
#include "broker/detail/algorithms.hh"
#include "broker/detail/assert.hh"
Expand Down Expand Up @@ -267,6 +269,9 @@ struct blacklist_entry {

/// The disconnected hop.
PeerId hop;

/// Time when this blacklist entry got created.
caf::actor_clock::time_point first_seen;
};

/// @relates blacklist_entry
Expand Down Expand Up @@ -317,14 +322,14 @@ using blacklist = std::vector<blacklist_entry<PeerId>>;

/// Inserts a new entry into the sorted blacklist constructed in-place with the
/// given args if this entry does not exist yet.
template <class PeerId, class Revoker, class Hop>
auto emplace(blacklist<PeerId>& lst, Revoker&& revoker, lamport_timestamp ts,
Hop&& hop) {
template <class PeerId, class Self, class Revoker, class Hop>
auto emplace(blacklist<PeerId>& lst, Self* self, Revoker&& revoker,
lamport_timestamp ts, Hop&& hop) {
auto i = std::lower_bound(lst.begin(), lst.end(), std::tie(revoker, ts, hop));
if (i == lst.end() || i->revoker != revoker || i->ts != ts || i->hop != hop) {
auto j = lst.emplace(i,
blacklist_entry<PeerId>{std::forward<Revoker>(revoker),
ts, std::forward<Hop>(hop)});
blacklist_entry<PeerId> entry{std::forward<Revoker>(revoker), ts,
std::forward<Hop>(hop), self->clock().now()};
auto j = lst.emplace(i, std::move(entry));
return std::make_pair(j, true);
}
return std::make_pair(i, false);
Expand Down
2 changes: 1 addition & 1 deletion include/broker/alm/stream_transport.hh
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public:
// -- constructors, destructors, and assignment operators --------------------

explicit stream_transport(caf::event_based_actor* self)
: caf::stream_manager(self), out_(this) {
: super(self), caf::stream_manager(self), out_(this) {
continuous(true);
}

Expand Down
5 changes: 5 additions & 0 deletions include/broker/defaults.hh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "caf/string_view.hpp"
#include "caf/timespan.hpp"

// This header contains hard-coded default values for various Broker options.

Expand All @@ -11,5 +12,9 @@ extern const caf::string_view recording_directory;

extern const size_t output_generator_file_cap;

extern const caf::timespan path_blacklist_aging_interval;

extern const caf::timespan path_blacklist_max_age;

} // namespace defaults
} // namespace broker
7 changes: 6 additions & 1 deletion src/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <caf/atom.hpp>
#include <caf/io/middleman.hpp>
#include <caf/openssl/manager.hpp>
#include <caf/timespan.hpp>

#include "broker/address.hh"
#include "broker/alm/lamport_timestamp.hh"
Expand Down Expand Up @@ -94,7 +95,11 @@ configuration::configuration(skip_init_t) {
.add<std::string>("recording-directory",
"path for storing recorded meta information")
.add<size_t>("output-generator-file-cap",
"maximum number of entries when recording published messages");
"maximum number of entries when recording published messages")
.add<caf::timespan>("path-blacklist-aging-interval",
"sets the frequency of max-age checks")
.add<caf::timespan>("path-blacklist-max-age",
"maximum time for storing path revocations");
// Override CAF defaults.
using caf::atom;
set("logger.file-name", "broker_[PID]_[TIMESTAMP].log");
Expand Down
7 changes: 7 additions & 0 deletions src/defaults.cc
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
#include "broker/defaults.hh"

#include <chrono>
#include <limits>

using namespace std::chrono_literals;

namespace broker {
namespace defaults {

const caf::string_view recording_directory = "";

const size_t output_generator_file_cap = std::numeric_limits<size_t>::max();

const caf::timespan path_blacklist_aging_interval = 1s;

const caf::timespan path_blacklist_max_age = 5min;

} // namespace defaults
} // namespace broker
32 changes: 32 additions & 0 deletions tests/cpp/alm/peer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,16 @@ TEST(peers can revoke paths) {
CHECK_EQUAL(shortest_path(A, E), ls(J, I, E));
CHECK_EQUAL(shortest_path(B, E), ls(D, I, E));
CHECK_EQUAL(shortest_path(D, E), ls(I, E));
MESSAGE("B and E both revoked the path");
CHECK_EQUAL(get(A).blacklist().entries.size(), 2u);
CHECK_EQUAL(get(B).blacklist().entries.size(), 1u);
CHECK_EQUAL(get(C).blacklist().entries.size(), 2u);
CHECK_EQUAL(get(D).blacklist().entries.size(), 2u);
CHECK_EQUAL(get(E).blacklist().entries.size(), 1u);
CHECK_EQUAL(get(F).blacklist().entries.size(), 2u);
CHECK_EQUAL(get(H).blacklist().entries.size(), 2u);
CHECK_EQUAL(get(I).blacklist().entries.size(), 2u);
CHECK_EQUAL(get(J).blacklist().entries.size(), 2u);
MESSAGE("after I loses its connection to E, no paths to E remain");
anon_send(peers[I], atom::unpeer::value, peers[E]);
run();
Expand All @@ -312,6 +322,28 @@ TEST(peers can revoke paths) {
CHECK_UNREACHABLE(H, E);
CHECK_UNREACHABLE(I, E);
CHECK_UNREACHABLE(J, E);
MESSAGE("blacklists contain one additional entry after I <-> E revocation");
// Note: we skip E on purpose here.
CHECK_EQUAL(get(A).blacklist().entries.size(), 3u);
CHECK_EQUAL(get(B).blacklist().entries.size(), 2u);
CHECK_EQUAL(get(C).blacklist().entries.size(), 3u);
CHECK_EQUAL(get(D).blacklist().entries.size(), 3u);
CHECK_EQUAL(get(F).blacklist().entries.size(), 3u);
CHECK_EQUAL(get(H).blacklist().entries.size(), 3u);
CHECK_EQUAL(get(I).blacklist().entries.size(), 2u);
CHECK_EQUAL(get(J).blacklist().entries.size(), 3u);
MESSAGE("after max-age has expired, all peers clear their blacklist");
sched.clock().current_time += defaults::path_blacklist_max_age;
for (auto& id : {A, B, C, D, F, H, I, J})
get(id).age_blacklist();
CHECK_EQUAL(get(A).blacklist().entries.size(), 0u);
CHECK_EQUAL(get(B).blacklist().entries.size(), 0u);
CHECK_EQUAL(get(C).blacklist().entries.size(), 0u);
CHECK_EQUAL(get(D).blacklist().entries.size(), 0u);
CHECK_EQUAL(get(F).blacklist().entries.size(), 0u);
CHECK_EQUAL(get(H).blacklist().entries.size(), 0u);
CHECK_EQUAL(get(I).blacklist().entries.size(), 0u);
CHECK_EQUAL(get(J).blacklist().entries.size(), 0u);
}

TEST(only receivers forward messages locally) {
Expand Down
31 changes: 23 additions & 8 deletions tests/cpp/alm/routing_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,34 @@ TEST(blacklisting does not affect newer paths) {
TEST(inseting into blacklists creates a sorted list) {
using blacklist = alm::blacklist<std::string>;
blacklist lst;
struct dummy_self {
auto clock() {
struct dummy_clock {
auto now() {
return caf::actor_clock::clock_type::now();
}
};
return dummy_clock{};
}
};
auto emplace = [&](std::string revoker, alm::lamport_timestamp rtime,
std::string hop) {
dummy_self self;
return alm::emplace(lst, &self, revoker, rtime, hop);
};
auto to_blacklist = [](auto range) {
return blacklist(range.first, range.second);
};
MESSAGE("filling the list with new entries inserts");
CHECK(emplace(lst, A, 1_lt, B).second);
CHECK(emplace(lst, C, 2_lt, A).second);
CHECK(emplace(lst, A, 3_lt, B).second);
CHECK(emplace(lst, C, 1_lt, A).second);
CHECK(emplace(lst, B, 7_lt, A).second);
CHECK(emplace(lst, A, 2_lt, C).second);
CHECK(emplace(A, 1_lt, B).second);
CHECK(emplace(C, 2_lt, A).second);
CHECK(emplace(A, 3_lt, B).second);
CHECK(emplace(C, 1_lt, A).second);
CHECK(emplace(B, 7_lt, A).second);
CHECK(emplace(A, 2_lt, C).second);
MESSAGE("inserting twice is a no-op");
CHECK(not emplace(lst, A, 1_lt, B).second);
CHECK(not emplace(lst, B, 7_lt, A).second);
CHECK(not emplace(A, 1_lt, B).second);
CHECK(not emplace(B, 7_lt, A).second);
MESSAGE("the final list is sorted on revoker, ts, hop");
CHECK_EQUAL(lst, blacklist({{A, 1_lt, B},
{A, 2_lt, C},
Expand Down

0 comments on commit 94c4c4b

Please sign in to comment.