Skip to content

Commit

Permalink
Fix tests breaking with TSAN by creating a local io_context and avoid…
Browse files Browse the repository at this point in the history
…ing multi threading inside the test websocket client (#1913)
  • Loading branch information
Guilherme Lawless authored and cryptocode committed Apr 24, 2019
1 parent 0b9007f commit c63b673
Showing 1 changed file with 19 additions and 38 deletions.
57 changes: 19 additions & 38 deletions nano/core_test/websocket.cpp
Expand Up @@ -26,13 +26,15 @@ namespace
std::atomic<bool> ack_ready{ false };

/** An optionally blocking websocket client for testing */
boost::optional<std::string> websocket_test_call (boost::asio::io_context & ioc, std::string host, std::string port, std::string message_a, bool await_ack, bool await_response, seconds response_deadline = 5s)
boost::optional<std::string> websocket_test_call (std::string host, std::string port, std::string message_a, bool await_ack, bool await_response, const seconds response_deadline = 5s)
{
if (await_ack)
{
ack_ready = false;
}

boost::optional<std::string> ret;
boost::asio::io_context ioc;
boost::asio::ip::tcp::resolver resolver{ ioc };
boost::beast::websocket::stream<boost::asio::ip::tcp::socket> ws{ ioc };

Expand All @@ -50,44 +52,23 @@ boost::optional<std::string> websocket_test_call (boost::asio::io_context & ioc,
ack_ready = true;
}

boost::optional<std::string> ret;

if (await_response)
{
boost::asio::deadline_timer timer (ioc);
std::atomic<bool> timed_out{ false }, got_response{ false };
std::mutex cond_mutex;
std::condition_variable cond_var;
timer.expires_from_now (boost::posix_time::seconds (response_deadline.count ()));
timer.async_wait ([&ws, &cond_mutex, &cond_var, &timed_out](boost::system::error_code const & ec) {
if (!ec)
{
std::unique_lock<std::mutex> lock (cond_mutex);
ws.next_layer ().cancel ();
timed_out = true;
cond_var.notify_one ();
}
});

assert (response_deadline > 0s);
boost::beast::flat_buffer buffer;
ws.async_read (buffer, [&ret, &buffer, &cond_mutex, &cond_var, &got_response](boost::beast::error_code const & ec, std::size_t const n) {
ws.async_read (buffer, [&ret, &buffer](boost::beast::error_code const & ec, std::size_t const n) {
if (!ec)
{
std::unique_lock<std::mutex> lock (cond_mutex);
std::ostringstream res;
res << beast_buffers (buffer.data ());
ret = res.str ();
got_response = true;
cond_var.notify_one ();
}
});
std::unique_lock<std::mutex> lock (cond_mutex);
cond_var.wait (lock, [&] { return timed_out || got_response; });
if (got_response)
{
timer.cancel ();
ws.close (boost::beast::websocket::close_code::normal);
}
ioc.run_one_for (response_deadline);
}
if (ws.is_open ())
{
ws.close (boost::beast::websocket::close_code::normal);
}
return ret;
}
Expand Down Expand Up @@ -117,7 +98,7 @@ TEST (websocket, confirmation)
std::thread client_thread ([&system, &confirmation_event_received]() {
// This will expect two results: the acknowledgement of the subscription
// and then the block confirmation message
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
auto response = websocket_test_call ("::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, true);
ASSERT_TRUE (response);
boost::property_tree::ptree event;
Expand Down Expand Up @@ -161,7 +142,7 @@ TEST (websocket, confirmation)

std::atomic<bool> unsubscribe_ack_received{ false };
std::thread client_thread_2 ([&system, &unsubscribe_ack_received]() {
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
auto response = websocket_test_call ("::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": true})json", true, true);
ASSERT_TRUE (response);
boost::property_tree::ptree event;
Expand All @@ -171,7 +152,7 @@ TEST (websocket, confirmation)
ASSERT_EQ (event.get<std::string> ("topic"), "confirmation");

// Unsubscribe action, expects an acknowledge but no response follows
websocket_test_call (system.io_ctx, "::1", "24078",
websocket_test_call ("::1", "24078",
R"json({"action": "unsubscribe", "topic": "confirmation", "ack": true})json", true, false);
unsubscribe_ack_received = true;
});
Expand Down Expand Up @@ -227,7 +208,7 @@ TEST (websocket, confirmation_options)
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::confirmation));
std::thread client_thread ([&system, &client_thread_finished]() {
// Subscribe initially with a specific invalid account
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
auto response = websocket_test_call ("::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"accounts": ["xrb_invalid"]}})json", true, true, 1s);

ASSERT_FALSE (response);
Expand Down Expand Up @@ -266,7 +247,7 @@ TEST (websocket, confirmation_options)
std::atomic<bool> client_thread_2_finished{ false };
std::thread client_thread_2 ([&system, &client_thread_2_finished]() {
// Re-subscribe with options for all local wallet accounts
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
auto response = websocket_test_call ("::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"all_local_accounts": "true"}})json", true, true);

ASSERT_TRUE (response);
Expand Down Expand Up @@ -308,7 +289,7 @@ TEST (websocket, confirmation_options)

std::atomic<bool> client_thread_3_finished{ false };
std::thread client_thread_3 ([&system, &client_thread_3_finished]() {
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
auto response = websocket_test_call ("::1", "24078",
R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"all_local_accounts": "true"}})json", true, true, 1s);

ASSERT_FALSE (response);
Expand Down Expand Up @@ -360,7 +341,7 @@ TEST (websocket, vote)
std::thread client_thread ([&system, &client_thread_finished]() {
// This will expect two results: the acknowledgement of the subscription
// and then the vote message
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
auto response = websocket_test_call ("::1", "24078",
R"json({"action": "subscribe", "topic": "vote", "ack": true})json", true, true);

ASSERT_TRUE (response);
Expand Down Expand Up @@ -426,7 +407,7 @@ TEST (websocket, vote_options)
data << R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": [")json"
<< nano::test_genesis_key.pub.to_account ()
<< R"json("]}})json";
auto response = websocket_test_call (system.io_ctx, "::1", "24078", data.str (), true, true);
auto response = websocket_test_call ("::1", "24078", data.str (), true, true);

ASSERT_TRUE (response);
boost::property_tree::ptree event;
Expand Down Expand Up @@ -471,7 +452,7 @@ TEST (websocket, vote_options)
std::atomic<bool> client_thread_2_finished{ false };
ASSERT_FALSE (node1->websocket_server->any_subscribers (nano::websocket::topic::vote));
std::thread client_thread_2 ([&system, &client_thread_2_finished]() {
auto response = websocket_test_call (system.io_ctx, "::1", "24078",
auto response = websocket_test_call ("::1", "24078",
R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": ["xrb_invalid"]}})json", true, true, 1s);

// No response expected given the filter
Expand Down

0 comments on commit c63b673

Please sign in to comment.