Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets - check for subscriptions before proceeding #1906

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions nano/core_test/websocket.cpp
Expand Up @@ -70,6 +70,7 @@ TEST (websocket, confirmation)

// Start websocket test-client in a separate thread
std::atomic<bool> confirmation_event_received{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
std::thread client_thread ([&system, &confirmation_event_received]() {
// This will expect two results: the acknowledgement of the subscription
// and then the block confirmation message
Expand All @@ -91,6 +92,7 @@ TEST (websocket, confirmation)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));

// Quick-confirm a block
nano::keypair key;
Expand Down
39 changes: 21 additions & 18 deletions nano/node/node.cpp
Expand Up @@ -1124,31 +1124,34 @@ startup_time (std::chrono::steady_clock::now ())
if (websocket_server)
{
observers.blocks.add ([this](std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) {
if (this->block_arrival.recent (block_a->hash ()))
if (this->websocket_server->any_subscribers (nano::websocket::topic::confirmation))
{
std::string subtype;
if (is_state_send_a)
{
subtype = "send";
}
else if (block_a->type () == nano::block_type::state)
if (this->block_arrival.recent (block_a->hash ()))
{
if (block_a->link ().is_zero ())
{
subtype = "change";
}
else if (amount_a == 0 && !this->ledger.epoch_link.is_zero () && this->ledger.is_epoch_link (block_a->link ()))
std::string subtype;
if (is_state_send_a)
{
subtype = "epoch";
subtype = "send";
}
else
else if (block_a->type () == nano::block_type::state)
{
subtype = "receive";
if (block_a->link ().is_zero ())
{
subtype = "change";
}
else if (amount_a == 0 && !this->ledger.epoch_link.is_zero () && this->ledger.is_epoch_link (block_a->link ()))
{
subtype = "epoch";
}
else
{
subtype = "receive";
}
}
nano::websocket::message_builder builder;
auto msg (builder.block_confirmed (block_a, account_a, amount_a, subtype));
this->websocket_server->broadcast (msg);
}
nano::websocket::message_builder builder;
auto msg (builder.block_confirmed (block_a, account_a, amount_a, subtype));
this->websocket_server->broadcast (msg);
}
});
}
Expand Down
51 changes: 46 additions & 5 deletions nano/node/websocket.cpp
Expand Up @@ -13,6 +13,14 @@ ws_listener (listener_a), ws (std::move (socket_a)), write_strand (ws.get_execut

nano::websocket::session::~session ()
{
{
std::unique_lock<std::mutex> lk (subscriptions_mutex);
for (auto & subscription : subscriptions)
{
ws_listener.decrease_subscription_count (subscription);
}
}

ws_listener.get_node ().logger.try_log ("websocket session ended");
}

Expand Down Expand Up @@ -168,20 +176,24 @@ void nano::websocket::session::handle_message (boost::property_tree::ptree const
auto topic_l (to_topic (message_a.get<std::string> ("topic", "")));
auto ack_l (message_a.get<bool> ("ack", false));
auto id_l (message_a.get<std::string> ("id", ""));
auto subscribe_succeeded (false);
auto action_succeeded (false);
if (action == "subscribe" && topic_l != nano::websocket::topic::invalid)
{
std::lock_guard<std::mutex> lk (subscriptions_mutex);
subscriptions.insert (topic_l);
subscribe_succeeded = true;
ws_listener.increase_subscription_count (topic_l);
action_succeeded = true;
}
else if (action == "unsubscribe" && topic_l != nano::websocket::topic::invalid)
{
std::lock_guard<std::mutex> lk (subscriptions_mutex);
subscriptions.erase (topic_l);
subscribe_succeeded = true;
if (subscriptions.erase (topic_l))
{
ws_listener.decrease_subscription_count (topic_l);
}
action_succeeded = true;
}
if (ack_l && subscribe_succeeded)
if (ack_l && action_succeeded)
{
send_ack (action, id_l);
}
Expand Down Expand Up @@ -274,6 +286,35 @@ void nano::websocket::listener::broadcast (nano::websocket::message message_a)
sessions.erase (std::remove_if (sessions.begin (), sessions.end (), [](auto & elem) { return elem.expired (); }), sessions.end ());
}

bool nano::websocket::listener::any_subscribers (nano::websocket::topic const & topic_a)
{
std::lock_guard<std::mutex> lk (counts_mutex);
auto existing (topic_subscription_count.find (topic_a));
return (existing != topic_subscription_count.end ()) ? existing->second > 0 : false;
}

void nano::websocket::listener::increase_subscription_count (nano::websocket::topic const & topic_a)
{
std::lock_guard<std::mutex> lk (counts_mutex);
auto existing (topic_subscription_count.find (topic_a));
if (existing == topic_subscription_count.end ())
{
topic_subscription_count.insert ({ topic_a, 1 });
}
else
{
existing->second += 1;
}
}

void nano::websocket::listener::decrease_subscription_count (nano::websocket::topic const & topic_a)
{
std::lock_guard<std::mutex> lk (counts_mutex);
auto existing (topic_subscription_count.find (topic_a));
release_assert (existing != topic_subscription_count.end () && existing->second > 0);
existing->second -= 1;
}

nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr<nano::block> block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype)
{
nano::websocket::message msg (nano::websocket::topic::confirmation);
Expand Down
10 changes: 9 additions & 1 deletion nano/node/websocket.hpp
Expand Up @@ -143,12 +143,20 @@ namespace websocket
return node;
}

/** Per topic subscription check */
bool any_subscribers (nano::websocket::topic const & topic_a);
/** Adds to subscription count of a specific topic*/
void increase_subscription_count (nano::websocket::topic const & topic_a);
/** Removes from subscription count of a specific topic*/
void decrease_subscription_count (nano::websocket::topic const & topic_a);

private:
nano::node & node;
boost::asio::ip::tcp::acceptor acceptor;
boost::asio::ip::tcp::socket socket;
std::mutex sessions_mutex;
std::mutex sessions_mutex, counts_mutex;
std::vector<std::weak_ptr<session>> sessions;
std::unordered_map<topic, std::size_t> topic_subscription_count;
Copy link
Contributor

@clemahieu clemahieu Apr 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the number of topics is small, this could be an array indexed by the enum value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing so would also fix the CI build error (missing hash specialization for the enum class I think, required on some compilers)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, also could change to atomic size_t and thus remove the mutex. Getting the length of the enum is cumbersome.

std::atomic<bool> stopped{ false };
};
}
Expand Down