Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket new_unconfirmed_block #2729

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nano/core_test/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ inline void wait_peer_connections (nano::system & system_a)
auto wait_peer_count = [&system_a](bool in_memory) {
auto num_nodes = system_a.nodes.size ();
system_a.deadline_set (20s);
auto peer_count = 0;
size_t peer_count = 0;
while (peer_count != num_nodes * (num_nodes - 1))
{
ASSERT_NO_ERROR (system_a.poll ());
peer_count = std::accumulate (system_a.nodes.cbegin (), system_a.nodes.cend (), 0, [in_memory](auto total, auto const & node) {
peer_count = std::accumulate (system_a.nodes.cbegin (), system_a.nodes.cend (), std::size_t{ 0 }, [in_memory](auto total, auto const & node) {
if (in_memory)
{
return total += node->network.size ();
Expand Down
6 changes: 3 additions & 3 deletions nano/core_test/testutil.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,13 @@ inline uint16_t get_available_port ()
static uint16_t current = 0;
// Read the TEST_BASE_PORT environment and override the default base port if it exists
auto base_str = std::getenv ("TEST_BASE_PORT");
auto base_port = 24000;
uint16_t base_port = 24000;
if (base_str)
{
base_port = boost::lexical_cast<int> (base_str);
base_port = boost::lexical_cast<uint16_t> (base_str);
}

auto available_port = base_port + current;
uint16_t const available_port = base_port + current;
++current;
// Reset port number once we have reached the maximum
if (current == max)
Expand Down
42 changes: 42 additions & 0 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -914,3 +914,45 @@ TEST (websocket, telemetry)
// Other node should have no subscribers
EXPECT_EQ (0, node2->websocket_server->subscriber_count (nano::websocket::topic::telemetry));
}

TEST (websocket, new_unconfirmed_block)
{
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));

std::atomic<bool> ack_ready{ false };
auto task = ([&ack_ready, config, node1]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "new_unconfirmed_block", "ack": "true"})json");
client.await_ack ();
ack_ready = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::new_unconfirmed_block));
return client.get_response ();
});
auto future = std::async (std::launch::async, task);

ASSERT_TIMELY (5s, ack_ready);

// Process a new block
nano::genesis genesis;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - 1, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ())));
ASSERT_EQ (nano::process_result::progress, node1->process_local (send1).code);

ASSERT_TIMELY (5s, future.wait_for (0s) == std::future_status::ready);

// Check the response
boost::optional<std::string> response = future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
boost::property_tree::ptree event;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "new_unconfirmed_block");

auto message_contents = event.get_child ("message");
ASSERT_EQ ("state", message_contents.get<std::string> ("type"));
ASSERT_EQ ("send", message_contents.get<std::string> ("subtype"));
}
21 changes: 21 additions & 0 deletions nano/lib/blocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1707,6 +1707,27 @@ bool nano::block_details::deserialize (nano::stream & stream_a)
return result;
}

std::string nano::state_subtype (nano::block_details const details_a)
{
debug_assert (details_a.is_epoch + details_a.is_receive + details_a.is_send <= 1);
if (details_a.is_send)
{
return "send";
}
else if (details_a.is_receive)
{
return "receive";
}
else if (details_a.is_epoch)
{
return "epoch";
}
else
{
return "change";
}
}

nano::block_sideband::block_sideband (nano::account const & account_a, nano::block_hash const & successor_a, nano::amount const & balance_a, uint64_t height_a, uint64_t timestamp_a, nano::block_details const & details_a) :
successor (successor_a),
account (account_a),
Expand Down
2 changes: 2 additions & 0 deletions nano/lib/blocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class block_details
void unpack (uint8_t);
};

std::string state_subtype (nano::block_details const);

class block_sideband final
{
public:
Expand Down
6 changes: 6 additions & 0 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <nano/node/blockprocessor.hpp>
#include <nano/node/election.hpp>
#include <nano/node/node.hpp>
#include <nano/node/websocket.hpp>
#include <nano/secure/blockstore.hpp>

#include <boost/format.hpp>
Expand Down Expand Up @@ -288,6 +289,11 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std::
{
node.network.flood_block (block_a, nano::buffer_drop_policy::no_limiter_drop);
}

if (node.websocket_server && node.websocket_server->any_subscriber (nano::websocket::topic::new_unconfirmed_block))
{
node.websocket_server->broadcast (nano::websocket::message_builder ().new_block_arrived (*block_a));
}
}

nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, nano::unchecked_info info_a, const bool watch_work_a, const bool first_publish_a)
Expand Down
31 changes: 4 additions & 27 deletions nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -954,31 +954,6 @@ void nano::json_handler::available_supply ()
response_errors ();
}

void state_subtype (nano::transaction const & transaction_a, nano::node & node_a, std::shared_ptr<nano::block> block_a, nano::uint128_t const & balance_a, boost::property_tree::ptree & tree_a)
{
// Subtype check
auto previous_balance (node_a.ledger.balance (transaction_a, block_a->previous ()));
if (balance_a < previous_balance)
{
tree_a.put ("subtype", "send");
}
else
{
if (block_a->link ().is_zero ())
{
tree_a.put ("subtype", "change");
}
else if (balance_a == previous_balance && node_a.ledger.is_epoch_link (block_a->link ()))
{
tree_a.put ("subtype", "epoch");
}
else
{
tree_a.put ("subtype", "receive");
}
}
}

void nano::json_handler::block_info ()
{
auto hash (hash_impl ());
Expand Down Expand Up @@ -1014,7 +989,8 @@ void nano::json_handler::block_info ()
}
if (block->type () == nano::block_type::state)
{
state_subtype (transaction, node, block, balance, response_l);
auto subtype (nano::state_subtype (block->sideband ().details));
response_l.put ("subtype", subtype);
}
}
else
Expand Down Expand Up @@ -1162,7 +1138,8 @@ void nano::json_handler::blocks_info ()
}
if (block->type () == nano::block_type::state)
{
state_subtype (transaction, node, block, balance, entry);
auto subtype (nano::state_subtype (block->sideband ().details));
entry.put ("subtype", subtype);
}
if (pending)
{
Expand Down
23 changes: 23 additions & 0 deletions nano/node/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,10 @@ nano::websocket::topic to_topic (std::string const & topic_a)
{
topic = nano::websocket::topic::telemetry;
}
else if (topic_a == "new_unconfirmed_block")
{
topic = nano::websocket::topic::new_unconfirmed_block;
}

return topic;
}
Expand Down Expand Up @@ -432,6 +436,11 @@ std::string from_topic (nano::websocket::topic topic_a)
{
topic = "telemetry";
}
else if (topic_a == nano::websocket::topic::new_unconfirmed_block)
{
topic = "new_unconfirmed_block";
}

return topic;
}
}
Expand Down Expand Up @@ -883,6 +892,20 @@ nano::websocket::message nano::websocket::message_builder::telemetry_received (n
return message_l;
}

nano::websocket::message nano::websocket::message_builder::new_block_arrived (nano::block const & block_a)
{
nano::websocket::message message_l (nano::websocket::topic::new_unconfirmed_block);
set_common_fields (message_l);

boost::property_tree::ptree block_l;
block_a.serialize_json (block_l);
auto subtype (nano::state_subtype (block_a.sideband ().details));
block_l.put ("subtype", subtype);

message_l.contents.add_child ("message", block_l);
return message_l;
}

void nano::websocket::message_builder::set_common_fields (nano::websocket::message & message_a)
{
using namespace std::chrono;
Expand Down
5 changes: 3 additions & 2 deletions nano/node/websocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ namespace websocket
bootstrap,
/** A telemetry message */
telemetry,
/** New block arrival message*/
new_unconfirmed_block,
/** Auxiliary length, not a valid topic, must be the last enum */
_length
};
Expand Down Expand Up @@ -93,13 +95,13 @@ namespace websocket
message stopped_election (nano::block_hash const & hash_a);
message vote_received (std::shared_ptr<nano::vote> vote_a, nano::vote_code code_a);
message difficulty_changed (uint64_t publish_threshold_a, uint64_t difficulty_active_a);

message work_generation (nano::work_version const version_a, nano::block_hash const & root_a, uint64_t const work_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::string const & peer_a, std::vector<std::string> const & bad_peers_a, bool const completed_a = true, bool const cancelled_a = false);
message work_cancelled (nano::work_version const version_a, nano::block_hash const & root_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::vector<std::string> const & bad_peers_a);
message work_failed (nano::work_version const version_a, nano::block_hash const & root_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::vector<std::string> const & bad_peers_a);
message bootstrap_started (std::string const & id_a, std::string const & mode_a);
message bootstrap_exited (std::string const & id_a, std::string const & mode_a, std::chrono::steady_clock::time_point const start_time_a, uint64_t const total_blocks_a);
message telemetry_received (nano::telemetry_data const &, nano::endpoint const &);
message new_block_arrived (nano::block const & block_a);

private:
/** Set the common fields for messages: timestamp and topic. */
Expand Down Expand Up @@ -205,7 +207,6 @@ namespace websocket
class vote_options final : public options
{
public:
vote_options ();
vote_options (boost::property_tree::ptree const & options_a, nano::logger_mt & logger_a);

/**
Expand Down