From fa26274b0399dd26478eeac7683b0bd3379cf52e Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Sun, 14 Apr 2019 15:36:52 +0100 Subject: [PATCH 1/5] Check if any subscriptions exist before creating a message for websocket broadcasting --- nano/core_test/websocket.cpp | 2 ++ nano/node/node.cpp | 39 +++++++++++++++-------------- nano/node/websocket.cpp | 48 ++++++++++++++++++++++++++++++++---- nano/node/websocket.hpp | 10 +++++++- 4 files changed, 75 insertions(+), 24 deletions(-) diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index 7dc2c30f81..c42162b8ae 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -70,6 +70,7 @@ TEST (websocket, confirmation) // Start websocket test-client in a separate thread std::atomic confirmation_event_received{ false }; + ASSERT_FALSE (node1->websocket_server->any_subscription (nano::websocket::topic::confirmation)); std::thread client_thread ([&system, &confirmation_event_received]() { // This will expect two results: the acknowledgement of the subscription // and then the block confirmation message @@ -91,6 +92,7 @@ TEST (websocket, confirmation) { ASSERT_NO_ERROR (system.poll ()); } + ASSERT_TRUE (node1->websocket_server->any_subscription (nano::websocket::topic::confirmation)); // Quick-confirm a block nano::keypair key; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 62579a1c67..12a40dc66e 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1124,31 +1124,34 @@ startup_time (std::chrono::steady_clock::now ()) if (websocket_server) { observers.blocks.add ([this](std::shared_ptr block_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) { - if (this->block_arrival.recent (block_a->hash ())) + if (this->websocket_server->any_subscription (nano::websocket::topic::confirmation)) { - std::string subtype; - if (is_state_send_a) - { - subtype = "send"; - } - else if (block_a->type () == nano::block_type::state) + if (this->block_arrival.recent (block_a->hash ())) { - if (block_a->link ().is_zero ()) - { - subtype = "change"; - } - else if (amount_a == 0 && !this->ledger.epoch_link.is_zero () && this->ledger.is_epoch_link (block_a->link ())) + std::string subtype; + if (is_state_send_a) { - subtype = "epoch"; + subtype = "send"; } - else + else if (block_a->type () == nano::block_type::state) { - subtype = "receive"; + if (block_a->link ().is_zero ()) + { + subtype = "change"; + } + else if (amount_a == 0 && !this->ledger.epoch_link.is_zero () && this->ledger.is_epoch_link (block_a->link ())) + { + subtype = "epoch"; + } + else + { + subtype = "receive"; + } } + nano::websocket::message_builder builder; + auto msg (builder.block_confirmed (block_a, account_a, amount_a, subtype)); + this->websocket_server->broadcast (msg); } - nano::websocket::message_builder builder; - auto msg (builder.block_confirmed (block_a, account_a, amount_a, subtype)); - this->websocket_server->broadcast (msg); } }); } diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index 9c5c54148b..6833375ae5 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -13,6 +13,11 @@ ws_listener (listener_a), ws (std::move (socket_a)), write_strand (ws.get_execut nano::websocket::session::~session () { + for (auto & subscription : subscriptions) + { + ws_listener.decrease_subscription_count (subscription); + } + ws_listener.get_node ().logger.try_log ("websocket session ended"); } @@ -168,20 +173,24 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const auto topic_l (to_topic (message_a.get ("topic", ""))); auto ack_l (message_a.get ("ack", false)); auto id_l (message_a.get ("id", "")); - auto subscribe_succeeded (false); + auto action_succeeded (false); if (action == "subscribe" && topic_l != nano::websocket::topic::invalid) { std::lock_guard lk (subscriptions_mutex); subscriptions.insert (topic_l); - subscribe_succeeded = true; + ws_listener.increase_subscription_count (topic_l); + action_succeeded = true; } else if (action == "unsubscribe" && topic_l != nano::websocket::topic::invalid) { std::lock_guard lk (subscriptions_mutex); - subscriptions.erase (topic_l); - subscribe_succeeded = true; + if (subscriptions.erase (topic_l)) + { + ws_listener.decrease_subscription_count (topic_l); + } + action_succeeded = true; } - if (ack_l && subscribe_succeeded) + if (ack_l && action_succeeded) { send_ack (action, id_l); } @@ -274,6 +283,35 @@ void nano::websocket::listener::broadcast (nano::websocket::message message_a) sessions.erase (std::remove_if (sessions.begin (), sessions.end (), [](auto & elem) { return elem.expired (); }), sessions.end ()); } +bool nano::websocket::listener::any_subscription (nano::websocket::topic const & topic_a) +{ + std::lock_guard lk (counts_mutex); + auto existing (topic_subscription_count.find (topic_a)); + return (existing != topic_subscription_count.end ()) ? existing->second > 0 : false; +} + +void nano::websocket::listener::increase_subscription_count (nano::websocket::topic const & topic_a) +{ + std::lock_guard lk (counts_mutex); + auto existing (topic_subscription_count.find (topic_a)); + if (existing == topic_subscription_count.end ()) + { + topic_subscription_count.insert ({ topic_a, 1 }); + } + else + { + existing->second += 1; + } +} + +void nano::websocket::listener::decrease_subscription_count (nano::websocket::topic const & topic_a) +{ + std::lock_guard lk (counts_mutex); + auto existing (topic_subscription_count.find (topic_a)); + release_assert (existing != topic_subscription_count.end () && existing->second > 0); + existing->second -= 1; +} + nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype) { nano::websocket::message msg (nano::websocket::topic::confirmation); diff --git a/nano/node/websocket.hpp b/nano/node/websocket.hpp index 842e672498..21ffee8889 100644 --- a/nano/node/websocket.hpp +++ b/nano/node/websocket.hpp @@ -143,12 +143,20 @@ namespace websocket return node; } + /** Per topic subscription check */ + bool any_subscription (nano::websocket::topic const & topic_a); + /** Adds to subscription count of a specific topic*/ + void increase_subscription_count (nano::websocket::topic const & topic_a); + /** Removes from subscription count of a specific topic*/ + void decrease_subscription_count (nano::websocket::topic const & topic_a); + private: nano::node & node; boost::asio::ip::tcp::acceptor acceptor; boost::asio::ip::tcp::socket socket; - std::mutex sessions_mutex; + std::mutex sessions_mutex, counts_mutex; std::vector> sessions; + std::unordered_map topic_subscription_count; std::atomic stopped{ false }; }; } From 81a92fb269c6fbdc5cd468be03ba1e45bd594068 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Sun, 14 Apr 2019 16:24:50 +0100 Subject: [PATCH 2/5] Mutex lock --- nano/node/websocket.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index 6833375ae5..b821878e9c 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -13,9 +13,12 @@ ws_listener (listener_a), ws (std::move (socket_a)), write_strand (ws.get_execut nano::websocket::session::~session () { - for (auto & subscription : subscriptions) { - ws_listener.decrease_subscription_count (subscription); + std::unique_lock lk (subscriptions_mutex); + for (auto & subscription : subscriptions) + { + ws_listener.decrease_subscription_count (subscription); + } } ws_listener.get_node ().logger.try_log ("websocket session ended"); From d7b6d7a624a10d70788a04566e6f2eea1abb6be1 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Sun, 14 Apr 2019 20:35:24 +0100 Subject: [PATCH 3/5] Change to any_subscribers --- nano/node/node.cpp | 2 +- nano/node/websocket.cpp | 2 +- nano/node/websocket.hpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 12a40dc66e..585f734b82 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1124,7 +1124,7 @@ startup_time (std::chrono::steady_clock::now ()) if (websocket_server) { observers.blocks.add ([this](std::shared_ptr block_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) { - if (this->websocket_server->any_subscription (nano::websocket::topic::confirmation)) + if (this->websocket_server->any_subscribers (nano::websocket::topic::confirmation)) { if (this->block_arrival.recent (block_a->hash ())) { diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index b821878e9c..5d886864ad 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -286,7 +286,7 @@ void nano::websocket::listener::broadcast (nano::websocket::message message_a) sessions.erase (std::remove_if (sessions.begin (), sessions.end (), [](auto & elem) { return elem.expired (); }), sessions.end ()); } -bool nano::websocket::listener::any_subscription (nano::websocket::topic const & topic_a) +bool nano::websocket::listener::any_subscribers (nano::websocket::topic const & topic_a) { std::lock_guard lk (counts_mutex); auto existing (topic_subscription_count.find (topic_a)); diff --git a/nano/node/websocket.hpp b/nano/node/websocket.hpp index 21ffee8889..821a424e43 100644 --- a/nano/node/websocket.hpp +++ b/nano/node/websocket.hpp @@ -144,7 +144,7 @@ namespace websocket } /** Per topic subscription check */ - bool any_subscription (nano::websocket::topic const & topic_a); + bool any_subscribers (nano::websocket::topic const & topic_a); /** Adds to subscription count of a specific topic*/ void increase_subscription_count (nano::websocket::topic const & topic_a); /** Removes from subscription count of a specific topic*/ From 2111f666cbbaf9d4d836ba1bfeebd7b5ee3529b5 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Sun, 14 Apr 2019 20:47:04 +0100 Subject: [PATCH 4/5] Forgot test renaming --- nano/core_test/websocket.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index c42162b8ae..e6e2b96509 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -70,7 +70,7 @@ TEST (websocket, confirmation) // Start websocket test-client in a separate thread std::atomic confirmation_event_received{ false }; - ASSERT_FALSE (node1->websocket_server->any_subscription (nano::websocket::topic::confirmation)); + ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation)); std::thread client_thread ([&system, &confirmation_event_received]() { // This will expect two results: the acknowledgement of the subscription // and then the block confirmation message @@ -92,7 +92,7 @@ TEST (websocket, confirmation) { ASSERT_NO_ERROR (system.poll ()); } - ASSERT_TRUE (node1->websocket_server->any_subscription (nano::websocket::topic::confirmation)); + ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation)); // Quick-confirm a block nano::keypair key; From 9e8471fa844c462ca02df4c2c5514966f145f405 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Mon, 15 Apr 2019 00:31:06 +0100 Subject: [PATCH 5/5] Change map to a fixed size array, remove mutex by changing to atomic --- nano/node/websocket.cpp | 22 +++++----------------- nano/node/websocket.hpp | 14 ++++++++++---- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index 5d886864ad..482b71d074 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -288,31 +288,19 @@ void nano::websocket::listener::broadcast (nano::websocket::message message_a) bool nano::websocket::listener::any_subscribers (nano::websocket::topic const & topic_a) { - std::lock_guard lk (counts_mutex); - auto existing (topic_subscription_count.find (topic_a)); - return (existing != topic_subscription_count.end ()) ? existing->second > 0 : false; + return topic_subscription_count[static_cast (topic_a)] > 0; } void nano::websocket::listener::increase_subscription_count (nano::websocket::topic const & topic_a) { - std::lock_guard lk (counts_mutex); - auto existing (topic_subscription_count.find (topic_a)); - if (existing == topic_subscription_count.end ()) - { - topic_subscription_count.insert ({ topic_a, 1 }); - } - else - { - existing->second += 1; - } + topic_subscription_count[static_cast (topic_a)] += 1; } void nano::websocket::listener::decrease_subscription_count (nano::websocket::topic const & topic_a) { - std::lock_guard lk (counts_mutex); - auto existing (topic_subscription_count.find (topic_a)); - release_assert (existing != topic_subscription_count.end () && existing->second > 0); - existing->second -= 1; + auto & count (topic_subscription_count[static_cast (topic_a)]); + release_assert (count > 0); + count -= 1; } nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype) diff --git a/nano/node/websocket.hpp b/nano/node/websocket.hpp index 821a424e43..b523ccb776 100644 --- a/nano/node/websocket.hpp +++ b/nano/node/websocket.hpp @@ -35,8 +35,11 @@ namespace websocket /** Acknowledgement of prior incoming message */ ack, /** A confirmation message */ - confirmation + confirmation, + /** Auxiliary length, not a valid topic, must be the last enum */ + _length }; + constexpr size_t number_topics{ static_cast (topic::_length) - static_cast (topic::invalid) }; /** A message queued for broadcasting */ class message final @@ -143,7 +146,10 @@ namespace websocket return node; } - /** Per topic subscription check */ + /** + * Per-topic subscribers check. Relies on all sessions correctly increasing and + * decreasing the subscriber counts themselves. + */ bool any_subscribers (nano::websocket::topic const & topic_a); /** Adds to subscription count of a specific topic*/ void increase_subscription_count (nano::websocket::topic const & topic_a); @@ -154,9 +160,9 @@ namespace websocket nano::node & node; boost::asio::ip::tcp::acceptor acceptor; boost::asio::ip::tcp::socket socket; - std::mutex sessions_mutex, counts_mutex; + std::mutex sessions_mutex; std::vector> sessions; - std::unordered_map topic_subscription_count; + std::array, number_topics> topic_subscription_count{}; std::atomic stopped{ false }; }; }