diff --git a/examples/console/client.cpp b/examples/console/client.cpp index a664a9e..12e388f 100644 --- a/examples/console/client.cpp +++ b/examples/console/client.cpp @@ -134,7 +134,7 @@ int client::run() delay = connection_->proxy.refresh(); } - const auto id = poller.wait(delay); + const auto ids = poller.wait(delay); if (poller.terminated()) break; @@ -142,13 +142,13 @@ int client::run() if (poller.expired()) continue; - if (id == terminal_socket_id) + if (ids.contains(terminal_socket_id)) { command(); continue; } - if (id == connection_->stream.socket().id() && connection_) + if (ids.contains(connection_->stream.socket().id()) && connection_) connection_->stream.read(connection_->proxy); else std::cout << "connect before calling" << std::endl; diff --git a/examples/console/read_line.cpp b/examples/console/read_line.cpp index 45576b5..ecbdbb3 100644 --- a/examples/console/read_line.cpp +++ b/examples/console/read_line.cpp @@ -61,14 +61,15 @@ void read_line::show_prompt() std::string read_line::get_line() { - zmq::message message; zmq::poller poller; poller.add(socket_); - const auto id = poller.wait(1); - if (id == socket_.id() && !poller.expired() && !poller.terminated()) + if (poller.wait().contains(socket_.id())) + { + zmq::message message; if (message.receive(socket_)) return message.dequeue_text(); + } return{}; } diff --git a/examples/get_height/main.cpp b/examples/get_height/main.cpp index 1c786e8..aa1e90f 100644 --- a/examples/get_height/main.cpp +++ b/examples/get_height/main.cpp @@ -61,25 +61,20 @@ int main(int argc, char* argv[]) std::cout << "height: " << height << std::endl; }; - // Make the request. socket_stream stream(socket); + + // Wait 2 seconds for the connection, with no failure retries. proxy proxy(stream, unknown_handler, 2000, 0); + + // Make the request. proxy.blockchain_fetch_last_height(error_handler, completion_handler); - // Wait for the response. zmq::poller poller; poller.add(socket); - const auto socket_id = socket.id(); - auto delay = proxy.refresh(); - while (!proxy.empty() && !poller.terminated() && !poller.expired() && - poller.wait(delay) == socket_id) - { + // Wait 1 second for the response. + if (poller.wait(1000).contains(socket.id())) stream.read(proxy); - // Update the time remaining. - delay = proxy.refresh(); - } - return 0; } diff --git a/include/bitcoin/client/dealer.hpp b/include/bitcoin/client/dealer.hpp index c212d1b..b3b74f4 100644 --- a/include/bitcoin/client/dealer.hpp +++ b/include/bitcoin/client/dealer.hpp @@ -56,7 +56,7 @@ class BCC_API dealer /// True if there are no outstanding requests. bool empty() const; - /// CLear all handlers with the specified error code. + /// Clear all handlers with the specified error code. void clear(const code& code); /// Accessors. diff --git a/src/dealer.cpp b/src/dealer.cpp index 5109eac..538ac06 100644 --- a/src/dealer.cpp +++ b/src/dealer.cpp @@ -176,6 +176,13 @@ void dealer::send_request(const std::string& command, void dealer::send(const obelisk_message& message) { data_stack data; + + // BUGBUG: + // A delimiter frame is required for a DEALER socket (currently in use). + // A REQ socket adds this automatically (to the same effect). + // Previous server versions cannot accomodate this (ROUTER improper). + data.push_back({}); + data.push_back(to_chunk(message.command)); data.push_back(to_chunk(to_little_endian(message.id))); data.push_back(message.payload); diff --git a/src/obelisk_client.cpp b/src/obelisk_client.cpp index 8f36758..20358b8 100644 --- a/src/obelisk_client.cpp +++ b/src/obelisk_client.cpp @@ -101,8 +101,7 @@ void obelisk_client::wait() poller.add(socket_); auto delay = refresh(); - while (!empty() && !poller.terminated() && !poller.expired() && - poller.wait(delay) == socket_.id()) + while (!empty() && poller.wait(delay).contains(socket_.id())) { stream_.read(*this); delay = refresh(); @@ -121,8 +120,7 @@ void obelisk_client::monitor(uint32_t timeout_seconds) poller.add(socket_); auto delay = remaining(deadline); - while (!poller.terminated() && !poller.expired() && - poller.wait(delay) == socket_.id()) + while (poller.wait(delay).contains(socket_.id())) { stream_.read(*this); delay = remaining(deadline); diff --git a/src/proxy.cpp b/src/proxy.cpp index fd71908..58bc6e7 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -221,8 +221,8 @@ void proxy::address_fetch_unspent_outputs(error_handler on_error, unspent.push_back({row.output, row.value}); chain::points_info selected_utxos; - wallet::select_outputs::select( - selected_utxos, unspent, satoshi, algorithm); + wallet::select_outputs::select(selected_utxos, unspent, satoshi, + algorithm); on_reply(selected_utxos); }; diff --git a/test/proxy.cpp b/test/proxy.cpp index cb30be3..fc62e4e 100644 --- a/test/proxy.cpp +++ b/test/proxy.cpp @@ -51,11 +51,17 @@ class stream_fixture }; // Shoves data into a std::string object. -std::string to_string(data_slice data) +static std::string to_string(data_slice data) { return std::string(data.begin(), data.end()); } +static void remove_optional_delimiter(data_stack& stack) +{ + if (!stack.empty() && stack.front().empty()) + stack.erase(stack.begin()); +} + // Arbitrary value for test cases. static const uint32_t test_height = 0x12345678; @@ -76,6 +82,10 @@ static const char address_satoshi[] = "1PeChFbhxDD9NLbU21DfD55aQBC4ZTR3tE"; stream_fixture capture; \ proxy proxy(capture, on_unknown, timeout_ms, retries) +// Allow REQ or unadressed DEALER client. +#define HANDLE_ROUTING_FRAMES(stack) \ + remove_optional_delimiter(stack); + BOOST_AUTO_TEST_SUITE(proxy_tests) BOOST_AUTO_TEST_CASE(proxy__fetch_history__test) @@ -85,6 +95,7 @@ BOOST_AUTO_TEST_CASE(proxy__fetch_history__test) const auto on_reply = [](const chain::history::list&) {}; proxy.blockchain_fetch_history(on_error, on_reply, payment_address(address_satoshi), test_height); + HANDLE_ROUTING_FRAMES(capture.out); BOOST_REQUIRE_EQUAL(capture.out.size(), 3u); BOOST_REQUIRE_EQUAL(to_string(capture.out[0]), "blockchain.fetch_history"); BOOST_REQUIRE_EQUAL(encode_base16(capture.out[2]), "0035a131e99f240a2314bb0ddb3d81d05663eb5bf878563412"); @@ -97,6 +108,7 @@ BOOST_AUTO_TEST_CASE(proxy__fetch_transaction__test) const auto on_reply = [](const chain::transaction&) {}; proxy.blockchain_fetch_transaction(on_error, on_reply, hash_literal(hash_satoshi)); + HANDLE_ROUTING_FRAMES(capture.out); BOOST_REQUIRE_EQUAL(capture.out.size(), 3u); BOOST_REQUIRE_EQUAL(to_string(capture.out[0]), "blockchain.fetch_transaction"); BOOST_REQUIRE_EQUAL(encode_base16(capture.out[2]), raw_satoshi); @@ -109,6 +121,7 @@ BOOST_AUTO_TEST_CASE(proxy__fetch_last__height_test) const auto on_reply = [](size_t) {}; proxy.blockchain_fetch_last_height(on_error, on_reply); + HANDLE_ROUTING_FRAMES(capture.out); BOOST_REQUIRE_EQUAL(capture.out.size(), 3u); BOOST_REQUIRE_EQUAL(to_string(capture.out[0]), "blockchain.fetch_last_height"); BOOST_REQUIRE_EQUAL(encode_base16(capture.out[2]), ""); @@ -121,6 +134,7 @@ BOOST_AUTO_TEST_CASE(proxy__fetch_block_header__height_test) const auto on_reply = [](const chain::header&) {}; proxy.blockchain_fetch_block_header(on_error, on_reply, test_height); + HANDLE_ROUTING_FRAMES(capture.out); BOOST_REQUIRE_EQUAL(capture.out.size(), 3u); BOOST_REQUIRE_EQUAL(to_string(capture.out[0]), "blockchain.fetch_block_header"); BOOST_REQUIRE_EQUAL(encode_base16(capture.out[2]), "78563412"); @@ -133,6 +147,7 @@ BOOST_AUTO_TEST_CASE(proxy__fetch_block_header__hash_test) auto on_reply = [](const chain::header&) {}; proxy.blockchain_fetch_block_header(on_error, on_reply, hash_literal(hash_satoshi)); + HANDLE_ROUTING_FRAMES(capture.out); BOOST_REQUIRE_EQUAL(capture.out.size(), 3u); BOOST_REQUIRE_EQUAL(to_string(capture.out[0]), "blockchain.fetch_block_header"); BOOST_REQUIRE_EQUAL(encode_base16(capture.out[2]), raw_satoshi); @@ -145,6 +160,7 @@ BOOST_AUTO_TEST_CASE(proxy__fetch_transaction__index_test) const auto on_reply = [](size_t, size_t) {}; proxy.blockchain_fetch_transaction_index(on_error, on_reply, hash_literal(hash_satoshi)); + HANDLE_ROUTING_FRAMES(capture.out); BOOST_REQUIRE_EQUAL(capture.out.size(), 3u); BOOST_REQUIRE_EQUAL(to_string(capture.out[0]), "blockchain.fetch_transaction_index"); BOOST_REQUIRE_EQUAL(encode_base16(capture.out[2]), raw_satoshi); @@ -159,6 +175,7 @@ BOOST_AUTO_TEST_CASE(proxy__fetch_stealth__test) const binary prefix(16, raw_prefix); proxy.blockchain_fetch_stealth(on_error, on_reply, prefix, test_height); + HANDLE_ROUTING_FRAMES(capture.out); BOOST_REQUIRE_EQUAL(capture.out.size(), 3u); BOOST_REQUIRE_EQUAL(to_string(capture.out[0]), "blockchain.fetch_stealth"); BOOST_REQUIRE_EQUAL(encode_base16(capture.out[2]), "10ffff78563412"); @@ -171,6 +188,7 @@ BOOST_AUTO_TEST_CASE(proxy__fetch_unconfirmed_transaction__test) const auto on_reply = [](const chain::transaction&) {}; proxy.transaction_pool_fetch_transaction(on_error, on_reply, hash_literal(hash_satoshi)); + HANDLE_ROUTING_FRAMES(capture.out); BOOST_REQUIRE_EQUAL(capture.out.size(), 3u); BOOST_REQUIRE_EQUAL(to_string(capture.out[0]), "transaction_pool.fetch_transaction"); BOOST_REQUIRE_EQUAL(encode_base16(capture.out[2]), raw_satoshi); @@ -183,6 +201,7 @@ BOOST_AUTO_TEST_CASE(proxy__address_fetch_history__test) const auto on_reply = [](const chain::history::list&) {}; proxy.address_fetch_history(on_error, on_reply, payment_address(address_satoshi), test_height); + HANDLE_ROUTING_FRAMES(capture.out); BOOST_REQUIRE_EQUAL(capture.out.size(), 3u); BOOST_REQUIRE_EQUAL(to_string(capture.out[0]), "address.fetch_history"); BOOST_REQUIRE_EQUAL(encode_base16(capture.out[2]), "0035a131e99f240a2314bb0ddb3d81d05663eb5bf878563412"); @@ -195,6 +214,7 @@ BOOST_AUTO_TEST_CASE(proxy__subscribe__test) const auto on_reply = []() {}; proxy.address_subscribe(on_error, on_reply, payment_address(address_satoshi)); + HANDLE_ROUTING_FRAMES(capture.out); BOOST_REQUIRE_EQUAL(capture.out.size(), 3u); BOOST_REQUIRE_EQUAL(to_string(capture.out[0]), "address.subscribe"); BOOST_REQUIRE_EQUAL(encode_base16(capture.out[2]), "00a0f85beb6356d0813ddb0dbb14230a249fe931a135");