From 21705b8d66772bb11332b257e73c4a51b5c31bff Mon Sep 17 00:00:00 2001 From: evoskuil Date: Fri, 3 Jun 2016 13:25:57 -0700 Subject: [PATCH 1/2] Comments and add missing include. --- src/obelisk_client.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/obelisk_client.cpp b/src/obelisk_client.cpp index 20358b8..03023eb 100644 --- a/src/obelisk_client.cpp +++ b/src/obelisk_client.cpp @@ -19,6 +19,7 @@ */ #include +#include #include #include #include @@ -42,6 +43,7 @@ static uint32_t to_milliseconds(uint16_t seconds) static const auto on_unknown = [](const std::string&){}; // Retries is overloaded as configuration for resends as well. +// Timeout is capped at ~ 25 days by signed/millsecond conversions. obelisk_client::obelisk_client(uint16_t timeout_seconds, uint8_t retries) : socket_(context_, zmq::socket::role::dealer), stream_(socket_), From e2b9b9495fdb6b2ad71ee503a6f0912f8500d72b Mon Sep 17 00:00:00 2001 From: evoskuil Date: Fri, 3 Jun 2016 17:29:18 -0700 Subject: [PATCH 2/2] Do not suppress read/write fails, adapt to protocol return code changes. --- examples/console/client.cpp | 2 +- examples/console/read_line.cpp | 28 +++++++++------ examples/get_height/main.cpp | 2 +- include/bitcoin/client/dealer.hpp | 8 ++--- include/bitcoin/client/socket_stream.hpp | 2 +- include/bitcoin/client/stream.hpp | 2 +- src/dealer.cpp | 25 +++++++------ src/obelisk_client.cpp | 2 +- src/socket_stream.cpp | 45 ++++++++++-------------- test/proxy.cpp | 3 +- 10 files changed, 58 insertions(+), 61 deletions(-) diff --git a/examples/console/client.cpp b/examples/console/client.cpp index 12e388f..3710846 100644 --- a/examples/console/client.cpp +++ b/examples/console/client.cpp @@ -65,7 +65,7 @@ void client::cmd_connect(std::stringstream& args) zmq::socket socket(context_, zmq::socket::role::dealer); - if (!socket.connect(server)) + if (socket.connect(server) != bc::error::success) std::cout << "error: failed to connect" << std::endl; else connection_ = std::make_shared(socket, 6000); diff --git a/examples/console/read_line.cpp b/examples/console/read_line.cpp index ecbdbb3..812a162 100644 --- a/examples/console/read_line.cpp +++ b/examples/console/read_line.cpp @@ -19,6 +19,7 @@ */ #include "read_line.hpp" +#include #include #include #include @@ -36,14 +37,16 @@ read_line::~read_line() { zmq::message message; message.enqueue_little_endian(signal_halt); - message.send(socket_); + const auto ec = message.send(socket_); + assert(!ec); thread_->join(); } read_line::read_line(zmq::context& context) : socket_(context, zmq::socket::role::requester) { - socket_.bind({ "inproc://terminal" }); + const auto ec = socket_.bind({ "inproc://terminal" }); + assert(!ec); // The thread must be constructed after the socket is already bound. thread_ = std::make_shared( @@ -56,7 +59,8 @@ void read_line::show_prompt() std::cout << "> " << std::flush; zmq::message message; message.enqueue_little_endian(signal_continue); - message.send(socket_); + const auto ec = message.send(socket_); + assert(!ec); } std::string read_line::get_line() @@ -67,8 +71,9 @@ std::string read_line::get_line() if (poller.wait().contains(socket_.id())) { zmq::message message; - if (message.receive(socket_)) - return message.dequeue_text(); + auto ec = message.receive(socket_); + assert(!ec); + return message.dequeue_text(); } return{}; @@ -77,18 +82,18 @@ std::string read_line::get_line() void read_line::run(zmq::context& context) { zmq::socket socket(context, zmq::socket::role::replier); - socket.connect({ "inproc://terminal" }); + auto ec = socket.connect({ "inproc://terminal" }); + assert(!ec); while (true) { uint32_t signal; zmq::message message; + ec = message.receive(socket); + assert(!ec); - if (!message.receive(socket) || !message.dequeue(signal) || - signal == signal_halt) - { + if (!message.dequeue(signal) || signal == signal_halt) break; - } // Read input: char line[1000]; @@ -97,7 +102,8 @@ void read_line::run(zmq::context& context) zmq::message response; response.enqueue(text); - response.send(socket); + ec = response.send(socket); + assert(!ec); } } diff --git a/examples/get_height/main.cpp b/examples/get_height/main.cpp index aa1e90f..5fc9183 100644 --- a/examples/get_height/main.cpp +++ b/examples/get_height/main.cpp @@ -40,7 +40,7 @@ int main(int argc, char* argv[]) zmq::context context; zmq::socket socket(context, zmq::socket::role::dealer); - if (!socket.connect({ argv[1] })) + if (socket.connect({ argv[1] }) != error::success) { std::cerr << "Cannot connect to " << argv[1] << std::endl; return 1; diff --git a/include/bitcoin/client/dealer.hpp b/include/bitcoin/client/dealer.hpp index b3b74f4..cac4964 100644 --- a/include/bitcoin/client/dealer.hpp +++ b/include/bitcoin/client/dealer.hpp @@ -73,7 +73,7 @@ class BCC_API dealer virtual bool read(stream& stream) override; /// Write the specified data to this stream. - virtual void write(const data_stack& data) override; + virtual bool write(const data_stack& data) override; protected: typedef std::chrono::system_clock clock; @@ -102,13 +102,13 @@ class BCC_API dealer static int32_t remaining(const time& deadline); // send_request->send - void send(const obelisk_message& message); + bool send(const obelisk_message& message); // write->receive->decode_reply - void receive(const obelisk_message& message); + bool receive(const obelisk_message& message); // Sends an outgoing request, and adds handlers to pending request table. - void send_request(const std::string& command, const data_chunk& payload, + bool send_request(const std::string& command, const data_chunk& payload, error_handler on_error, decoder on_reply); // Decodes an incoming message, invoking the error and/or reply handler. diff --git a/include/bitcoin/client/socket_stream.hpp b/include/bitcoin/client/socket_stream.hpp index 57e13fa..7647028 100644 --- a/include/bitcoin/client/socket_stream.hpp +++ b/include/bitcoin/client/socket_stream.hpp @@ -40,7 +40,7 @@ class BCC_API socket_stream // stream interface. virtual int32_t refresh(); virtual bool read(stream& stream); - virtual void write(const data_stack& data); + virtual bool write(const data_stack& data); private: protocol::zmq::socket& socket_; diff --git a/include/bitcoin/client/stream.hpp b/include/bitcoin/client/stream.hpp index 5cc84b0..5bb3de8 100644 --- a/include/bitcoin/client/stream.hpp +++ b/include/bitcoin/client/stream.hpp @@ -40,7 +40,7 @@ class BCC_API stream virtual bool read(stream& stream) = 0; /// Write the specified data to this stream. - virtual void write(const data_stack& data) = 0; + virtual bool write(const data_stack& data) = 0; }; } // namespace client diff --git a/src/dealer.cpp b/src/dealer.cpp index ebefb3d..ae867ee 100644 --- a/src/dealer.cpp +++ b/src/dealer.cpp @@ -165,7 +165,7 @@ int32_t dealer::remaining(const time& deadline) // Create a mssage with identity and send it via the message stream. // This is invoked by derived class message senders, such as the proxy. -void dealer::send_request(const std::string& command, +bool dealer::send_request(const std::string& command, const data_chunk& payload, error_handler on_error, decoder on_reply) { const auto now = steady_clock::now(); @@ -176,11 +176,11 @@ void dealer::send_request(const std::string& command, request.on_reply = std::move(on_reply); request.resends = 0; request.deadline = now + milliseconds(timeout_milliseconds_); - send(request.message); + return send(request.message); } // Send or resend an existing message by writing it to the message stream. -void dealer::send(const obelisk_message& message) +bool dealer::send(const obelisk_message& message) { data_stack data; @@ -192,9 +192,8 @@ void dealer::send(const obelisk_message& message) data.push_back(to_chunk(to_little_endian(message.id))); data.push_back(message.payload); - // BUGBUG: we are losing error state here, prevents fast fail. // This creates a message and sends it on the socket. - out_.write(data); + return out_.write(data); } // Stream interface, not utilized on this class. @@ -204,11 +203,10 @@ bool dealer::read(stream& stream) } // stream interface. -void dealer::write(const data_stack& data) +bool dealer::write(const data_stack& data) { - // BUGBUG: we are losing error state here, prevents fast fail. if (data.size() < 3 || data.size() > 4) - return; + return false; obelisk_message message; auto it = data.begin(); @@ -232,35 +230,36 @@ void dealer::write(const data_stack& data) message.payload = *(++it); } - receive(message); + return receive(message); } // Handle a message, call from write. -void dealer::receive(const obelisk_message& message) +bool dealer::receive(const obelisk_message& message) { // Subscription updates are not tracked in pending. if (message.command == "address.update") { decode_update(message); - return; + return true; } // Subscription updates are not tracked in pending. if (message.command == "address.stealth_update") { decode_stealth_update(message); - return; + return true; } const auto command = pending_.find(message.id); if (command == pending_.end()) { on_unknown_(message.command); - return; + return false; } decode_reply(message, command->second.on_error, command->second.on_reply); pending_.erase(command); + return true; } void dealer::decode_reply(const obelisk_message& message, diff --git a/src/obelisk_client.cpp b/src/obelisk_client.cpp index 03023eb..87a941d 100644 --- a/src/obelisk_client.cpp +++ b/src/obelisk_client.cpp @@ -69,7 +69,7 @@ bool obelisk_client::connect(const endpoint& address) for (auto attempt = 0; attempt < 1 + retries_; ++attempt) { - if (socket_.connect(host_address)) + if (socket_.connect(host_address) == error::success) return true; // Arbitrary delay between connection attempts. diff --git a/src/socket_stream.cpp b/src/socket_stream.cpp index bed6d3d..ec94378 100644 --- a/src/socket_stream.cpp +++ b/src/socket_stream.cpp @@ -50,42 +50,33 @@ bool socket_stream::read(stream& stream) data_stack data; zmq::message message; - if (!message.receive(socket_)) + if (message.receive(socket_) != error::success) return false; // Copy the message to a data stack. while (!message.empty()) data.push_back(message.dequeue_data()); - stream.write(data); - return true; + return stream.write(data); } ////bool socket_stream::read(std::shared_ptr stream) ////{ -//// if (stream) -//// { -//// response_message message; +//// if (!stream) +//// return false; //// -//// if (message.receive(socket_)) -//// { -//// std::shared_ptr response = -//// message.get_response(); +//// response_message message; //// -//// if (response) -//// { -//// stream->write(response); -//// return true; -//// } -//// } -//// } +//// if (message.receive(socket_) != error::success) +//// return false; //// -//// return false; +//// auto response = message.get_response(); +//// return response && (stream->write(response) == error::success); ////} // TODO: optimize by passing the internal type of the message object. // Send a message built from the stack parameter to this socket. -void socket_stream::write(const data_stack& data) +bool socket_stream::write(const data_stack& data) { zmq::message message; @@ -93,17 +84,17 @@ void socket_stream::write(const data_stack& data) for (const auto& chunk: data) message.enqueue(chunk); - message.send(socket_); + return message.send(socket_) == error::success; } -////void socket_stream::write(const std::shared_ptr& request) +////bool socket_stream::write(const std::shared_ptr& request) ////{ -//// if (request) -//// { -//// request_message message; -//// message.set_request(request); -//// message.send(socket_); -//// } +//// if (!request) +//// return false; +//// +//// request_message message; +//// message.set_request(request); +//// return message.send(socket_) == error::success; ////} } // namespace client diff --git a/test/proxy.cpp b/test/proxy.cpp index fc62e4e..dc02bdf 100644 --- a/test/proxy.cpp +++ b/test/proxy.cpp @@ -44,9 +44,10 @@ class stream_fixture return false; } - virtual void write(const data_stack& data) override + virtual bool write(const data_stack& data) override { out = data; + return true; } };