Skip to content

Commit

Permalink
Merge pull request #4621 from pwojcikdev/confirm-message-extensions
Browse files Browse the repository at this point in the history
Add 'rebroadcasted' flag to `confirm_ack` message
  • Loading branch information
pwojcikdev committed May 17, 2024
2 parents f7aa48c + f21adde commit 7662e45
Show file tree
Hide file tree
Showing 18 changed files with 169 additions and 38 deletions.
12 changes: 6 additions & 6 deletions nano/core_test/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ TEST (inactive_votes_cache, basic)
ASSERT_TIMELY_EQ (5s, node.vote_cache.size (), 1);
node.process_active (send);
ASSERT_TIMELY (5s, node.ledger.confirmed.block_exists_or_pruned (node.ledger.tx_begin_read (), send->hash ()));
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached));
ASSERT_EQ (1, node.stats.count (nano::stat::type::election_vote, nano::stat::detail::cache));
}

/**
Expand All @@ -276,7 +276,7 @@ TEST (inactive_votes_cache, non_final)
node.process_active (send);
std::shared_ptr<nano::election> election;
ASSERT_TIMELY (5s, election = node.active.election (send->qualified_root ()));
ASSERT_TIMELY_EQ (5s, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached), 1);
ASSERT_TIMELY_EQ (5s, node.stats.count (nano::stat::type::election_vote, nano::stat::detail::cache), 1);
ASSERT_TIMELY_EQ (5s, nano::dev::constants.genesis_amount - 100, election->tally ().begin ()->first);
ASSERT_FALSE (election->confirmed ());
}
Expand Down Expand Up @@ -318,7 +318,7 @@ TEST (inactive_votes_cache, fork)
node.process_active (send1);
ASSERT_TIMELY_EQ (5s, election->blocks ().size (), 2);
ASSERT_TIMELY (5s, node.block_confirmed (send1->hash ()));
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached));
ASSERT_EQ (1, node.stats.count (nano::stat::type::election_vote, nano::stat::detail::cache));
}

TEST (inactive_votes_cache, existing_vote)
Expand Down Expand Up @@ -354,7 +354,7 @@ TEST (inactive_votes_cache, existing_vote)
auto vote1 = nano::test::make_vote (key, { send }, nano::vote::timestamp_min * 1, 0);
node.vote_processor.vote (vote1, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY_EQ (5s, election->votes ().size (), 2);
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_new));
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::vote));
auto last_vote1 (election->votes ()[key.pub]);
ASSERT_EQ (send->hash (), last_vote1.hash);
ASSERT_EQ (nano::vote::timestamp_min * 1, last_vote1.timestamp);
Expand All @@ -372,7 +372,7 @@ TEST (inactive_votes_cache, existing_vote)
ASSERT_EQ (last_vote1.hash, last_vote2.hash);
ASSERT_EQ (last_vote1.timestamp, last_vote2.timestamp);
ASSERT_EQ (last_vote1.time, last_vote2.time);
ASSERT_EQ (0, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::election_vote, nano::stat::detail::cache));
}

TEST (inactive_votes_cache, multiple_votes)
Expand Down Expand Up @@ -425,7 +425,7 @@ TEST (inactive_votes_cache, multiple_votes)
ASSERT_EQ (1, node.vote_cache.size ());
auto election = nano::test::start_election (system, node, send1->hash ());
ASSERT_TIMELY_EQ (5s, 3, election->votes ().size ()); // 2 votes and 1 default not_an_acount
ASSERT_EQ (2, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached));
ASSERT_EQ (2, node.stats.count (nano::stat::type::election_vote, nano::stat::detail::cache));
}

TEST (inactive_votes_cache, election_start)
Expand Down
23 changes: 23 additions & 0 deletions nano/core_test/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ TEST (message, confirm_ack_hash_serialization)
ASSERT_EQ (hashes, con2.vote->hashes);
ASSERT_FALSE (header.confirm_is_v2 ());
ASSERT_EQ (header.count_get (), hashes.size ());
ASSERT_FALSE (con2.is_rebroadcasted ());
}

TEST (message, confirm_ack_hash_serialization_v2)
Expand Down Expand Up @@ -223,6 +224,28 @@ TEST (message, confirm_ack_hash_serialization_v2)
ASSERT_EQ (hashes, con2.vote->hashes);
ASSERT_TRUE (header.confirm_is_v2 ());
ASSERT_EQ (header.count_v2_get (), hashes.size ());
ASSERT_FALSE (con2.is_rebroadcasted ());
}

TEST (message, confirm_ack_rebroadcasted_flag)
{
nano::keypair representative1;
auto vote = nano::test::make_vote (representative1, std::vector<nano::block_hash> (), 0, 0);
nano::confirm_ack con1{ nano::dev::network_params.network, vote, /* rebroadcasted */ true };
ASSERT_TRUE (con1.is_rebroadcasted ());
std::vector<uint8_t> bytes;
{
nano::vectorstream stream1 (bytes);
con1.serialize (stream1);
}
nano::bufferstream stream2 (bytes.data (), bytes.size ());
bool error (false);
nano::message_header header (error, stream2);
nano::confirm_ack con2 (error, stream2, header);
ASSERT_FALSE (error);
ASSERT_EQ (con1, con2);
ASSERT_TRUE (con2.vote->hashes.empty ());
ASSERT_TRUE (con2.is_rebroadcasted ());
}

TEST (message, confirm_req_hash_serialization)
Expand Down
2 changes: 1 addition & 1 deletion nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2166,7 +2166,7 @@ TEST (node, vote_by_hash_bundle)
nano::keypair key1;
system.wallet (0)->insert_adhoc (key1.prv);

system.nodes[0]->observers.vote.add ([&max_hashes] (std::shared_ptr<nano::vote> const & vote_a, std::shared_ptr<nano::transport::channel> const &, nano::vote_code) {
system.nodes[0]->observers.vote.add ([&max_hashes] (std::shared_ptr<nano::vote> const & vote_a, std::shared_ptr<nano::transport::channel> const &, nano::vote_source, nano::vote_code) {
if (vote_a->hashes.size () > max_hashes)
{
max_hashes = vote_a->hashes.size ();
Expand Down
29 changes: 29 additions & 0 deletions nano/core_test/rep_crawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,32 @@ TEST (rep_crawler, two_reps_one_node)
ASSERT_TRUE (nano::dev::genesis_key.pub == reps[0].account || nano::dev::genesis_key.pub == reps[1].account);
ASSERT_TRUE (second_rep.pub == reps[0].account || second_rep.pub == reps[1].account);
}

TEST (rep_crawler, ignore_rebroadcasted)
{
nano::test::system system;
auto & node1 = *system.add_node ();
auto & node2 = *system.add_node ();

auto channel1to2 = node1.network.find_node_id (node2.node_id.pub);
ASSERT_NE (nullptr, channel1to2);

node1.rep_crawler.force_query (nano::dev::genesis->hash (), channel1to2);
ASSERT_ALWAYS_EQ (100ms, node1.rep_crawler.representative_count (), 0);

// Now we spam the vote for genesis, so it appears as a rebroadcasted vote
auto vote = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () }, 0);

auto channel2to1 = node2.network.find_node_id (node1.node_id.pub);
ASSERT_NE (nullptr, channel2to1);

node1.rep_crawler.force_query (nano::dev::genesis->hash (), channel1to2);

auto tick = [&] () {
nano::confirm_ack msg{ nano::dev::network_params.network, vote, /* rebroadcasted */ true };
channel2to1->send (msg, nullptr, nano::transport::buffer_drop_policy::no_socket_drop);
return false;
};

ASSERT_NEVER (1s, tick () || node1.rep_crawler.representative_count () > 0);
}
2 changes: 2 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ enum class type
vote_processor_tier,
vote_processor_overfill,
election,
election_vote,
http_callback,
ipc,
tcp,
Expand Down Expand Up @@ -109,6 +110,7 @@ enum class detail
success,
unknown,
cache,
rebroadcast,
queue_overflow,

// processing queue
Expand Down
8 changes: 5 additions & 3 deletions nano/node/election.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ nano::vote_code nano::election::vote (nano::account const & rep, uint64_t timest
auto max_vote = timestamp_a == std::numeric_limits<uint64_t>::max () && last_vote_l.timestamp < timestamp_a;

bool past_cooldown = true;
if (vote_source_a == vote_source::live) // Only cooldown live votes
if (vote_source_a != vote_source::cache) // Only cooldown live votes
{
const auto cooldown = cooldown_time (weight);
past_cooldown = last_vote_l.time <= std::chrono::steady_clock::now () - cooldown;
Expand All @@ -473,12 +473,14 @@ nano::vote_code nano::election::vote (nano::account const & rep, uint64_t timest
}

last_votes[rep] = { std::chrono::steady_clock::now (), timestamp_a, block_hash_a };
if (vote_source_a == vote_source::live)
if (vote_source_a != vote_source::cache)
{
live_vote_action (rep);
}

node.stats.inc (nano::stat::type::election, vote_source_a == vote_source::live ? nano::stat::detail::vote_new : nano::stat::detail::vote_cached);
node.stats.inc (nano::stat::type::election, nano::stat::detail::vote);
node.stats.inc (nano::stat::type::election_vote, to_stat_detail (vote_source_a));

node.logger.trace (nano::log::type::election, nano::log::detail::vote_processed,
nano::log::arg{ "id", id },
nano::log::arg{ "qualified_root", qualified_root },
Expand Down
2 changes: 1 addition & 1 deletion nano/node/message_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class process_visitor : public nano::message_visitor
{
if (!message.vote->account.is_zero ())
{
node.vote_processor.vote (message.vote, channel);
node.vote_processor.vote (message.vote, channel, message.is_rebroadcasted () ? nano::vote_source::rebroadcast : nano::vote_source::live);
}
}

Expand Down
8 changes: 7 additions & 1 deletion nano/node/messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,13 +616,14 @@ nano::confirm_ack::confirm_ack (bool & error_a, nano::stream & stream_a, nano::m
}
}

nano::confirm_ack::confirm_ack (nano::network_constants const & constants, std::shared_ptr<nano::vote> const & vote_a) :
nano::confirm_ack::confirm_ack (nano::network_constants const & constants, std::shared_ptr<nano::vote> const & vote_a, bool rebroadcasted_a) :
message (constants, nano::message_type::confirm_ack),
vote (vote_a)
{
debug_assert (vote->hashes.size () < 256);

header.block_type_set (nano::block_type::not_a_block);
header.flag_set (rebroadcasted_flag, rebroadcasted_a);

if (vote->hashes.size () >= 16)
{
Expand Down Expand Up @@ -671,6 +672,11 @@ std::size_t nano::confirm_ack::size (const nano::message_header & header)
return nano::vote::size (count);
}

bool nano::confirm_ack::is_rebroadcasted () const
{
return header.flag_test (rebroadcasted_flag);
}

void nano::confirm_ack::operator() (nano::object_stream & obs) const
{
nano::message::operator() (obs); // Write common data
Expand Down
62 changes: 61 additions & 1 deletion nano/node/messages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ enum class bulk_pull_account_flags : uint8_t

class message_visitor;

/*
* Common Header Binary Format:
* [2 bytes] Network (big endian)
* [1 byte] Maximum protocol version
* [1 byte] Protocol version currently in use
* [1 byte] Minimum protocol version
* [1 byte] Message type
* [2 bytes] Extensions (message-specific flags and properties)
*
* Notes:
* - The structure and bit usage of the `extensions` field vary by message type.
*/
class message_header final
{
public:
Expand Down Expand Up @@ -140,6 +152,14 @@ class message
virtual void operator() (nano::object_stream &) const;
};

/*
* Binary Format:
* [message_header] Common message header
* [8x (16 bytes (IP) + 2 bytes (port)] Array of 8 peers
*
* Header extensions:
* - No specific bits from the `extensions` field are used for `keepalive`.
*/
class keepalive final : public message
{
public:
Expand All @@ -156,6 +176,14 @@ class keepalive final : public message
void operator() (nano::object_stream &) const override;
};

/*
* Binary Format:
* [message_header] Common message header
* [variable] Block (serialized according to the block type specified in the header)
*
* Header extensions:
* - [0x0f00] Block type: Identifies the specific type of the block.
*/
class publish final : public message
{
public:
Expand All @@ -172,6 +200,20 @@ class publish final : public message
void operator() (nano::object_stream &) const override;
};

/*
* Binary Format:
* [message_header] Common message header
* [N x (32 bytes (block hash) + 32 bytes (root))] Pairs of (block_hash, root)
* - The count is determined by the header's count bits.
*
* Header extensions:
* - [0xf000] Count (for V1 protocol)
* - [0x0f00] Block type
* - Not used anymore (V25.1+), but still present and set to `not_a_block = 0x1` for backwards compatibility
* - [0xf000 (high), 0x00f0 (low)] Count V2 (for V2 protocol)
* - [0x0001] Confirm V2 flag
* - [0x0002] Reserved for V3+ versioning
*/
class confirm_req final : public message
{
public:
Expand All @@ -197,18 +239,36 @@ class confirm_req final : public message
void operator() (nano::object_stream &) const override;
};

/*
* Binary Format:
* [message_header] Common message header
* [variable] Vote
* - Serialized/deserialized by the `nano::vote` class.
*
* Header extensions:
* - [0xf000] Count (for V1 protocol)
* - [0x0f00] Block type
* - Not used anymore (V25.1+), but still present and set to `not_a_block = 0x1` for backwards compatibility
* - [0xf000 (high), 0x00f0 (low)] Count V2 masks (for V2 protocol)
* - [0x0001] Confirm V2 flag
* - [0x0002] Reserved for V3+ versioning
* - [0x0004] Rebroadcasted flag
*/
class confirm_ack final : public message
{
public:
confirm_ack (bool & error, nano::stream &, nano::message_header const &, nano::vote_uniquer * = nullptr);
confirm_ack (nano::network_constants const & constants, std::shared_ptr<nano::vote> const &);
confirm_ack (nano::network_constants const & constants, std::shared_ptr<nano::vote> const &, bool rebroadcasted = false);

void serialize (nano::stream &) const override;
void visit (nano::message_visitor &) const override;
bool operator== (nano::confirm_ack const &) const;

static std::size_t size (nano::message_header const &);

static uint8_t constexpr rebroadcasted_flag = 2; // 0x0004
bool is_rebroadcasted () const;

private:
static uint8_t hash_count (nano::message_header const &);

Expand Down
8 changes: 4 additions & 4 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,18 @@ void nano::network::flood_block_initial (std::shared_ptr<nano::block> const & bl
}
}

void nano::network::flood_vote (std::shared_ptr<nano::vote> const & vote_a, float scale)
void nano::network::flood_vote (std::shared_ptr<nano::vote> const & vote, float scale, bool rebroadcasted)
{
nano::confirm_ack message{ node.network_params.network, vote_a };
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
for (auto & i : list (fanout (scale)))
{
i->send (message, nullptr);
}
}

void nano::network::flood_vote_pr (std::shared_ptr<nano::vote> const & vote_a)
void nano::network::flood_vote_pr (std::shared_ptr<nano::vote> const & vote, bool rebroadcasted)
{
nano::confirm_ack message{ node.network_params.network, vote_a };
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
for (auto const & i : node.rep_crawler.principal_representatives ())
{
i.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop);
Expand Down
4 changes: 2 additions & 2 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ class network final
void flood_message (nano::message &, nano::transport::buffer_drop_policy const = nano::transport::buffer_drop_policy::limiter, float const = 1.0f);
void flood_keepalive (float const scale_a = 1.0f);
void flood_keepalive_self (float const scale_a = 0.5f);
void flood_vote (std::shared_ptr<nano::vote> const &, float scale);
void flood_vote_pr (std::shared_ptr<nano::vote> const &);
void flood_vote (std::shared_ptr<nano::vote> const &, float scale, bool rebroadcasted = false);
void flood_vote_pr (std::shared_ptr<nano::vote> const &, bool rebroadcasted = false);
// Flood block to all PRs and a random selection of non-PRs
void flood_block_initial (std::shared_ptr<nano::block> const &);
// Flood block to a random selection of peers
Expand Down
16 changes: 10 additions & 6 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
auto const reps = wallets.reps ();
if (!reps.have_half_rep () && !reps.exists (vote->account))
{
network.flood_vote (vote, 0.5f);
network.flood_vote (vote, 0.5f, /* rebroadcasted */ true);
}
}
});
Expand Down Expand Up @@ -358,18 +358,22 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
this->network.send_keepalive_self (channel_a);
});

observers.vote.add ([this] (std::shared_ptr<nano::vote> vote, std::shared_ptr<nano::transport::channel> const & channel, nano::vote_code code) {
observers.vote.add ([this] (std::shared_ptr<nano::vote> vote, std::shared_ptr<nano::transport::channel> const & channel, nano::vote_source source, nano::vote_code code) {
debug_assert (vote != nullptr);
debug_assert (code != nano::vote_code::invalid);
if (channel == nullptr)
{
return; // Channel expired when waiting for vote to be processed
}
bool active_in_rep_crawler = rep_crawler.process (vote, channel);
if (active_in_rep_crawler)
// Ignore republished votes
if (source == nano::vote_source::live)
{
// Representative is defined as online if replying to live votes or rep_crawler queries
online_reps.observe (vote->account);
bool active_in_rep_crawler = rep_crawler.process (vote, channel);
if (active_in_rep_crawler)
{
// Representative is defined as online if replying to live votes or rep_crawler queries
online_reps.observe (vote->account);
}
}
});

Expand Down

0 comments on commit 7662e45

Please sign in to comment.