diff --git a/include/bitcoin/network/channels/channel_http.hpp b/include/bitcoin/network/channels/channel_http.hpp index 65a93ac75..97553f770 100644 --- a/include/bitcoin/network/channels/channel_http.hpp +++ b/include/bitcoin/network/channels/channel_http.hpp @@ -67,6 +67,10 @@ class BCT_API channel_http void send(http::response&& response, result_handler&& handler) NOEXCEPT; + /// Send without restarting the listener. + void notify(http::response&& notification, + result_handler&& handler) NOEXCEPT; + /// Resume reading from the socket (requires strand). void resume() NOEXCEPT override; @@ -95,7 +99,7 @@ class BCT_API channel_http /// Handlers. virtual void handle_receive(const code& ec, size_t bytes, const http::request_cptr& request) NOEXCEPT; - virtual void handle_send(const code& ec, size_t bytes, + virtual void handle_send(const code& ec, size_t bytes, bool notification, const std::string& message, const result_handler& handler) NOEXCEPT; private: diff --git a/include/bitcoin/network/impl/messages/json_body.ipp b/include/bitcoin/network/impl/messages/json_body.ipp index e33b0bfe9..cf68d6c80 100644 --- a/include/bitcoin/network/impl/messages/json_body.ipp +++ b/include/bitcoin/network/impl/messages/json_body.ipp @@ -99,25 +99,29 @@ size_t CLASS::reader::put(const buffer_type& buffer, boost_code& ec) NOEXCEPT return {}; } +// Finishing can be very confusing. The derived rpc body calls this base +// method, and then the virtual done() is tested to determine whether the +// logical object is fully read (including optionally required terminator). +// Any error code here signals the beast reader (and any reader that terminates +// based on the end of the framed data, such as websockets) that the parse has +// failed (terminal error). However for custom stream readers that may not be +// aware of byte termination, the `need_more` implies that the parse has not +// failed and that more bytes may be parsed. In either case, when this is +// called and the parse is complete, then the parsed json object is moved to +// the model and the parser is released. In the case of the derived json-rpc +// reader, the json is also then converted to the rpc model if valid, otherwise +// returning a failure code. In no case is the underlying parser_.finish(ec) +// ever called, as that would preclude use in the unbounded scenario. TEMPLATE void CLASS::reader::finish(boost_code& ec) NOEXCEPT { - // Finishing can be very confusing. The derived rpc body calls this base - // method, and then the virtual done() is tested to determine whether the - // logical object is fully read (including optionally required terminator). - // Any error code here signals the beast reader (and any reader that - // terminates based on the end of the framed data, such as websockets) that - // the parse has failed (terminal error). However for custom stream readers - // that may not be aware of byte termination, the `need_more` implies that - // the parse has not failed and that more bytes may be parsed. In either - // case, when this is called and the parse is complete, then the parsed - // json object is moved to the model and the parser is released. In the - // case of the derived json-rpc reader, the json is also then converted to - // the rpc model if valid, otherwise returning a failure code. In no case - // is the underlying parser_.finish(ec) ever called, as that would preclude - // use in the unbounded scenario. - using namespace network::error; + + // The internal boost::json parser will always return !done() when parsing + // a top-level primitive as a whole document, specifically a number number + // value, because it has no way to know if more digits are coming. So this + // will return need_more even when a fixed-size buffer is being read. For + // this reason this body does not support *reading* for top-level objects. if (!done()) { ec = to_http_code(http_error_t::need_more); @@ -203,9 +207,9 @@ CLASS::writer::get(boost_code& ec) NOEXCEPT try { // Always prepares the configured max_size. - const auto buffer = value_.buffer->prepare(size); - const auto data = system::pointer_cast(buffer.data()); - const auto view = serializer_.read(data, buffer.size()); + const auto scratch = value_.buffer->prepare(size); + const auto data = system::pointer_cast(scratch.data()); + const auto view = serializer_.read(data, scratch.size()); // No progress (edge case). if (view.empty() && !serializer_.done()) @@ -215,8 +219,7 @@ CLASS::writer::get(boost_code& ec) NOEXCEPT } ec.clear(); - value_.buffer->commit(view.size()); - value_.buffer->consume(view.size()); + value_.buffer->consume(scratch.size()); const auto more = !serializer_.done(); return out_buffer{ std::make_pair(boost::asio::buffer(view), more) }; } diff --git a/include/bitcoin/network/protocols/protocol.hpp b/include/bitcoin/network/protocols/protocol.hpp index b3a6277a6..fc50bfd0c 100644 --- a/include/bitcoin/network/protocols/protocol.hpp +++ b/include/bitcoin/network/protocols/protocol.hpp @@ -213,6 +213,11 @@ class BCT_API protocol inline void send(Message&& message, Method&& method, Args&&... args) NOEXCEPT \ { channel_->send(std::forward(message), BIND_SHARED(method, args)); } +#define DECLARE_NOTIFY() \ + template \ + inline void notify(Message&& message, Method&& method, Args&&... args) NOEXCEPT \ + { channel_->notify(std::forward(message), BIND_SHARED(method, args)); } + #define DECLARE_SUBSCRIBE_CHANNEL() \ template \ inline void subscribe_channel(Method&& method, Args&&... args) NOEXCEPT \ @@ -220,6 +225,8 @@ class BCT_API protocol #define SEND(message, method, ...) \ send(message, &CLASS::method, __VA_ARGS__) +#define NOTIFY(message, method, ...) \ + notify(message, &CLASS::method, __VA_ARGS__) #define SUBSCRIBE_CHANNEL(message, method, ...) \ subscribe_channel(&CLASS::method, __VA_ARGS__) #define SUBSCRIBE_BROADCAST(message, method, ...) \ diff --git a/include/bitcoin/network/protocols/protocol_http.hpp b/include/bitcoin/network/protocols/protocol_http.hpp index cd37231e5..0fedd6141 100644 --- a/include/bitcoin/network/protocols/protocol_http.hpp +++ b/include/bitcoin/network/protocols/protocol_http.hpp @@ -52,7 +52,9 @@ class BCT_API protocol_http protocol_http(const session::ptr& session, const channel::ptr& channel, const options_t& options) NOEXCEPT; + /// Forwards to channel::send. DECLARE_SEND() + DECLARE_NOTIFY() DECLARE_SUBSCRIBE_CHANNEL() /// Message handlers by http method. diff --git a/include/bitcoin/network/protocols/protocol_peer.hpp b/include/bitcoin/network/protocols/protocol_peer.hpp index f52494dd4..422d16728 100644 --- a/include/bitcoin/network/protocols/protocol_peer.hpp +++ b/include/bitcoin/network/protocols/protocol_peer.hpp @@ -48,6 +48,7 @@ class BCT_API protocol_peer protocol_peer(const session::ptr& session, const channel::ptr& channel) NOEXCEPT; + /// Forwards to channel::send. DECLARE_SEND() DECLARE_SUBSCRIBE_CHANNEL() diff --git a/src/channels/channel_http.cpp b/src/channels/channel_http.cpp index c1a043f65..6a939286f 100644 --- a/src/channels/channel_http.cpp +++ b/src/channels/channel_http.cpp @@ -150,11 +150,14 @@ void channel_http::dispatch(const request_cptr& request) NOEXCEPT flat_buffer& channel_http::request_buffer() NOEXCEPT { + BC_ASSERT(stranded()); return request_buffer_; } request_ptr channel_http::create_request() const NOEXCEPT { + BC_ASSERT(stranded()); + const auto out = to_shared(); if (websocket()) { @@ -167,7 +170,7 @@ request_ptr channel_http::create_request() const NOEXCEPT return out; } -// Send. +// Send/notify. // ---------------------------------------------------------------------------- void channel_http::send(response&& response, result_handler&& handler) NOEXCEPT @@ -179,25 +182,43 @@ void channel_http::send(response&& response, result_handler&& handler) NOEXCEPT write(std::move(response), std::bind(&channel_http::handle_send, - shared_from_base(), _1, _2, std::move(message), + shared_from_base(), _1, _2, false, std::move(message), + std::move(handler))); +} + +void channel_http::notify(response&& notification, + result_handler&& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + BC_ASSERT(websocket()); + + auto message = log_message(notification); + + write(std::move(notification), + std::bind(&channel_http::handle_send, + shared_from_base(), _1, _2, true, std::move(message), std::move(handler))); } -void channel_http::handle_send(const code& ec, size_t bytes, +void channel_http::handle_send(const code& ec, size_t bytes, bool notification, const std::string& message, const result_handler& handler) NOEXCEPT { + BC_ASSERT(stranded()); + if (ec) stop(ec); LOGA(boost::format(message) % bytes); handler(ec); // Restart the listener (only in response to requests). - // TODO: use new ::send(rresponse, handler) method to differentiate. - receive(); + if (!notification) + receive(); } // private void channel_http::assign_json_buffer(response& response) NOEXCEPT { + BC_ASSERT(stranded()); + // websocket is full duplex, so cannot use shared json repsonse buffer. if (!websocket()) { diff --git a/src/net/proxy_actions.cpp b/src/net/proxy_actions.cpp index b17c42113..f9f2ac0c3 100644 --- a/src/net/proxy_actions.cpp +++ b/src/net/proxy_actions.cpp @@ -50,6 +50,7 @@ void proxy::cancel(result_handler&& handler) NOEXCEPT // WS (generic, framed). // ---------------------------------------------------------------------------- +// flat_buffer must have configured max_size, which will be allocated. void proxy::read(http::flat_buffer& out, count_handler&& handler) NOEXCEPT { do_reading(); @@ -109,6 +110,7 @@ void proxy::do_tcp_write(const asio::const_buffer& payload, // RPC (TCP: electrum/stratum_v1, WS: btcd). // ---------------------------------------------------------------------------- +// flat_buffer must have configured max_size, which will be allocated. void proxy::read(http::flat_buffer& buffer, rpc::request& request, count_handler&& handler) NOEXCEPT { @@ -161,7 +163,7 @@ void proxy::do_notification_write(const rpc::request_ptr& notification, // HTTP/WS (generic/rpc). // ---------------------------------------------------------------------------- -// Method reading() is invoked directly if read() is called from strand(). +// flat_buffer must have configured max_size, which will be allocated. void proxy::read(http::flat_buffer& buffer, http::request& request, count_handler&& handler) NOEXCEPT { diff --git a/src/net/socket.cpp b/src/net/socket.cpp index 27e9c939a..17a6af12d 100644 --- a/src/net/socket.cpp +++ b/src/net/socket.cpp @@ -298,6 +298,8 @@ void socket::async_write(const asio::const_buffer& buffer, bool binary, void socket::async_read_some(const asio::mutable_buffer& buffer, const count_handler& handler) NOEXCEPT { + BC_ASSERT(stranded()); + try { if (websocket()) @@ -325,6 +327,8 @@ void socket::async_read_some(const asio::mutable_buffer& buffer, void socket::async_read(http::flat_buffer& buffer, const count_handler& handler) NOEXCEPT { + BC_ASSERT(stranded()); + try { if (websocket()) @@ -336,8 +340,15 @@ void socket::async_read(http::flat_buffer& buffer, } else { + auto remain = floored_subtract(buffer.max_size(), buffer.size()); + if (is_zero(remain)) + { + handler(error::buffer_overflow, {}); + return; + } + VARIANT_DISPATCH_METHOD(get_tcp(), - async_read_some(buffer.prepare(buffer.max_size()), + async_read_some(buffer.prepare(remain), std::bind(&socket::handle_async, shared_from_this(), _1, _2, handler, "async_read_some"))); } @@ -353,6 +364,8 @@ void socket::async_read(http::flat_buffer& buffer, void socket::async_read(const asio::mutable_buffer& buffer, const count_handler& handler) NOEXCEPT { + BC_ASSERT(stranded()); + try { if (websocket())