From f5c907dd2df67e808ad75f3a658053b7a9e852b3 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Tue, 12 Sep 2023 10:54:03 -0700 Subject: [PATCH 01/13] add async_read_some to foxy --- .../foxy/impl/session/async_read.impl.hpp | 75 ++++++--- vendor/foxy/include/foxy/session.hpp | 151 ++++++++++-------- 2 files changed, 132 insertions(+), 94 deletions(-) diff --git a/vendor/foxy/include/foxy/impl/session/async_read.impl.hpp b/vendor/foxy/include/foxy/impl/session/async_read.impl.hpp index e187bef50..dd64c9b35 100644 --- a/vendor/foxy/include/foxy/impl/session/async_read.impl.hpp +++ b/vendor/foxy/include/foxy/impl/session/async_read.impl.hpp @@ -1,8 +1,9 @@ // -// Copyright (c) 2018-2019 Christian Mazakas (christian dot mazakas at gmail dot com) +// Copyright (c) 2018-2019 Christian Mazakas (christian dot mazakas at gmail dot +// com) // -// Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE_1_0.txt -// or copy at http://www.boost.org/LICENSE_1_0.txt) +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // // Official repository: https://github.com/LeonineKing1199/foxy // @@ -10,32 +11,56 @@ #ifndef FOXY_IMPL_SESSION_ASYNC_READ_IMPL_HPP_ #define FOXY_IMPL_SESSION_ASYNC_READ_IMPL_HPP_ -#include #include +#include + +namespace launchdarkly::foxy { +template +template +auto basic_session::async_read( + Parser& parser, + ReadHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, + std::size_t)>::return_type { + return ::launchdarkly::foxy::detail::async_timer( + [&parser, self = this, coro = boost::asio::coroutine()]( + auto& cb, boost::system::error_code ec = {}, + std::size_t bytes_transferrred = 0) mutable { + BOOST_ASIO_CORO_REENTER(coro) { + BOOST_ASIO_CORO_YIELD boost::beast::http::async_read( + self->stream, self->buffer, parser, std::move(cb)); + + cb.complete(ec, bytes_transferrred); + } + }, + *this, std::forward(handler)); +} -namespace launchdarkly::foxy -{ template template -auto -basic_session::async_read(Parser& parser, ReadHandler&& handler) & -> - typename boost::asio::async_result, - void(boost::system::error_code, std::size_t)>::return_type -{ - return ::launchdarkly::foxy::detail::async_timer( - [&parser, self = this, coro = boost::asio::coroutine()]( - auto& cb, boost::system::error_code ec = {}, std::size_t bytes_transferrred = 0) mutable { - BOOST_ASIO_CORO_REENTER(coro) - { - BOOST_ASIO_CORO_YIELD boost::beast::http::async_read(self->stream, self->buffer, parser, - std::move(cb)); - - cb.complete(ec, bytes_transferrred); - } - }, - *this, std::forward(handler)); +auto basic_session::async_read_some( + Parser& parser, + ReadHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, + std::size_t)>::return_type { + return ::launchdarkly::foxy::detail::async_timer( + [&parser, self = this, coro = boost::asio::coroutine()]( + auto& cb, boost::system::error_code ec = {}, + std::size_t bytes_transferrred = 0) mutable { + BOOST_ASIO_CORO_REENTER(coro) { + BOOST_ASIO_CORO_YIELD boost::beast::http::async_read_some( + self->stream, self->buffer, parser, std::move(cb)); + + cb.complete(ec, bytes_transferrred); + } + }, + *this, std::forward(handler)); } -} // namespace launchdarkly::foxy +} // namespace launchdarkly::foxy -#endif // FOXY_IMPL_SESSION_ASYNC_READ_IMPL_HPP_ +#endif // FOXY_IMPL_SESSION_ASYNC_READ_IMPL_HPP_ diff --git a/vendor/foxy/include/foxy/session.hpp b/vendor/foxy/include/foxy/session.hpp index 17fb94b95..c2f6f0dac 100644 --- a/vendor/foxy/include/foxy/session.hpp +++ b/vendor/foxy/include/foxy/session.hpp @@ -1,8 +1,9 @@ // -// Copyright (c) 2018-2019 Christian Mazakas (christian dot mazakas at gmail dot com) +// Copyright (c) 2018-2019 Christian Mazakas (christian dot mazakas at gmail dot +// com) // -// Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE_1_0.txt -// or copy at http://www.boost.org/LICENSE_1_0.txt) +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // // Official repository: https://github.com/LeonineKing1199/foxy // @@ -10,8 +11,8 @@ #ifndef FOXY_SESSION_HPP_ #define FOXY_SESSION_HPP_ -#include #include +#include #include #include @@ -22,78 +23,90 @@ #include #include +#include #include #include -#include -namespace launchdarkly::foxy -{ +namespace launchdarkly::foxy { template -struct basic_session -{ -public: - static_assert(boost::beast::is_async_stream::value, - "Requirements on the Stream type were not met. Stream must be a Beast.AsyncStream"); - - static_assert(boost::asio::is_dynamic_buffer::value, - "Requirements on the DynamicBuffer type were not met. DynamicBuffer must be an " - "Asio.DynamicBuffer"); - - using stream_type = ::launchdarkly::foxy::basic_multi_stream; - using buffer_type = DynamicBuffer; - using timer_type = boost::asio::steady_timer; - using executor_type = typename stream_type::executor_type; - - session_opts opts; - stream_type stream; - buffer_type buffer; - timer_type timer; - - basic_session() = delete; - basic_session(basic_session const&) = delete; - basic_session(basic_session&&) = default; - - template - basic_session(boost::asio::any_io_executor executor, session_opts opts_, BufferArgs&&... bargs); - - template - basic_session(boost::asio::io_context& io, session_opts opts_, BufferArgs&&... bargs); - - template - basic_session(stream_type stream_, session_opts opts_, BufferArgs&&... bargs); - - auto - get_executor() -> executor_type; - - template - auto - async_read_header(Parser& parser, ReadHandler&& handler) & -> - typename boost::asio::async_result, - void(boost::system::error_code, std::size_t)>::return_type; - - template - auto - async_read(Parser& parser, ReadHandler&& handler) & -> - typename boost::asio::async_result, - void(boost::system::error_code, std::size_t)>::return_type; - - template - auto - async_write_header(Serializer& serializer, WriteHandler&& handler) & -> - typename boost::asio::async_result, - void(boost::system::error_code, std::size_t)>::return_type; - - template - auto - async_write(Serializer& serializer, WriteHandler&& handler) & -> - typename boost::asio::async_result, - void(boost::system::error_code, std::size_t)>::return_type; +struct basic_session { + public: + static_assert(boost::beast::is_async_stream::value, + "Requirements on the Stream type were not met. Stream must " + "be a Beast.AsyncStream"); + + static_assert(boost::asio::is_dynamic_buffer::value, + "Requirements on the DynamicBuffer type were not met. " + "DynamicBuffer must be an " + "Asio.DynamicBuffer"); + + using stream_type = ::launchdarkly::foxy::basic_multi_stream; + using buffer_type = DynamicBuffer; + using timer_type = boost::asio::steady_timer; + using executor_type = typename stream_type::executor_type; + + session_opts opts; + stream_type stream; + buffer_type buffer; + timer_type timer; + + basic_session() = delete; + basic_session(basic_session const&) = delete; + basic_session(basic_session&&) = default; + + template + basic_session(boost::asio::any_io_executor executor, + session_opts opts_, + BufferArgs&&... bargs); + + template + basic_session(boost::asio::io_context& io, + session_opts opts_, + BufferArgs&&... bargs); + + template + basic_session(stream_type stream_, + session_opts opts_, + BufferArgs&&... bargs); + + auto get_executor() -> executor_type; + + template + auto async_read_header(Parser& parser, ReadHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, + std::size_t)>::return_type; + + template + auto async_read(Parser& parser, ReadHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, + std::size_t)>::return_type; + + template + auto async_read_some(Parser& parser, ReadHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, + std::size_t)>::return_type; + + template + auto async_write_header(Serializer& serializer, WriteHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, + std::size_t)>::return_type; + + template + auto async_write(Serializer& serializer, WriteHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, + std::size_t)>::return_type; }; -using session = basic_session; +using session = + basic_session; -} // namespace launchdarkly::foxy +} // namespace launchdarkly::foxy #include -#endif // FOXY_SESSION_HPP_ +#endif // FOXY_SESSION_HPP_ From d94c4c235d21bd259177dd08aab1d3b2b34ff707 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Tue, 12 Sep 2023 10:55:30 -0700 Subject: [PATCH 02/13] use async_read_some in sse-client --- .../data_sources/streaming_data_source.cpp | 2 +- libs/common/src/config/logging_builder.cpp | 4 +- libs/server-sent-events/src/client.cpp | 42 ++++++++++++++++++- 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/libs/client-sdk/src/data_sources/streaming_data_source.cpp b/libs/client-sdk/src/data_sources/streaming_data_source.cpp index 9d7c504cc..d645134c3 100644 --- a/libs/client-sdk/src/data_sources/streaming_data_source.cpp +++ b/libs/client-sdk/src/data_sources/streaming_data_source.cpp @@ -138,7 +138,7 @@ void StreamingDataSource::Start() { client_builder.logger([weak_self](auto msg) { if (auto self = weak_self.lock()) { - LD_LOG(self->logger_, LogLevel::kDebug) << msg; + LD_LOG(self->logger_, LogLevel::kDebug) << "sse-client: " << msg; } }); diff --git a/libs/common/src/config/logging_builder.cpp b/libs/common/src/config/logging_builder.cpp index 378494535..eb8c5b1c6 100644 --- a/libs/common/src/config/logging_builder.cpp +++ b/libs/common/src/config/logging_builder.cpp @@ -60,7 +60,9 @@ LoggingBuilder::BasicLogging& LoggingBuilder::BasicLogging::Tag( return *this; } LoggingBuilder::BasicLogging::BasicLogging() - : level_(Defaults::LogLevel()), tag_(Defaults::LogTag()) {} + : level_(GetLogLevelEnum(std::getenv("LD_LOG_LEVEL"), + Defaults::LogLevel())), + tag_(Defaults::LogTag()) {} LoggingBuilder::CustomLogging& LoggingBuilder::CustomLogging::Backend( std::shared_ptr backend) { diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index 8d8432382..20269ea1f 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -90,10 +90,36 @@ class FoxyClient : public Client, backoff_timer_(session_.get_executor()), event_receiver_(std::move(receiver)), logger_(std::move(logger)), - errors_(std::move(errors)) { + errors_(std::move(errors)), + last_read_(std::nullopt) { create_parser(); } + /** Logs a message indicating that an async_read_some operation + * on the response's body has completed, and how long that operation took + * (using a monotonic clock.) + * + * This is useful for debugging timeout-related issues, like receiving a + * heartbeat message. + * + * The timestamp of the last read is initialized after receiving successful + * headers, but before initiating the first body read. Future reads then + * update the timestamp. + */ + void log_and_update_last_read(std::size_t amount) { + if (last_read_) { + auto sec_since_last_read = + std::chrono::duration_cast( + std::chrono::steady_clock::now() - *last_read_) + .count(); + logger_("read (" + std::to_string(amount) + ") bytes in (" + + std::to_string(sec_since_last_read) + ") sec"); + } else { + logger_("read (" + std::to_string(amount) + ") bytes"); + } + last_read_ = std::chrono::steady_clock::now(); + } + /** The body parser is recreated each time a connection is made because * its internal state cannot be explicitly reset. * @@ -216,7 +242,9 @@ class FoxyClient : public Client, } backoff_.succeed(); - return session_.async_read( + + last_read_ = std::chrono::steady_clock::now(); + return session_.async_read_some( *body_parser_, beast::bind_front_handler(&FoxyClient::on_read_body, shared_from_this())); @@ -263,8 +291,16 @@ class FoxyClient : public Client, void on_read_body(boost::system::error_code ec, std::size_t amount) { boost::ignore_unused(amount); if (ec == boost::asio::error::operation_aborted) { + logger_("read HTTP response body aborted"); return; } + if (!ec) { + log_and_update_last_read(amount); + return session_.async_read_some( + *body_parser_, + beast::bind_front_handler(&FoxyClient::on_read_body, + shared_from_this())); + } do_backoff(ec.what()); } @@ -349,6 +385,8 @@ class FoxyClient : public Client, std::optional last_event_id_; Backoff backoff_; boost::asio::steady_timer backoff_timer_; + + std::optional last_read_; }; Builder::Builder(net::any_io_executor ctx, std::string url) From 1e780ce91cdf3e0e80301f45ad93d3cd9e254b7f Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Tue, 12 Sep 2023 15:06:03 -0700 Subject: [PATCH 03/13] fix sse behavior when reconnecting to streams --- libs/server-sent-events/src/client.cpp | 55 ++++++++++++++++---------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index 20269ea1f..a4ce49e97 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -79,20 +79,20 @@ class FoxyClient : public Client, read_timeout_(read_timeout), write_timeout_(write_timeout), req_(std::move(req)), - session_(std::move(executor), - launchdarkly::foxy::session_opts{ - ToOptRef(ssl_context_), - connect_timeout.value_or(kNoTimeout)}), backoff_( initial_reconnect_delay.value_or(kDefaultInitialReconnectDelay), kDefaultMaxBackoffDelay), last_event_id_(std::nullopt), - backoff_timer_(session_.get_executor()), + backoff_timer_(std::move(executor)), event_receiver_(std::move(receiver)), logger_(std::move(logger)), errors_(std::move(errors)), - last_read_(std::nullopt) { + last_read_(std::nullopt), + shutting_down_(false), + body_parser_(std::nullopt), + session_(std::nullopt) { create_parser(); + create_session(); } /** Logs a message indicating that an async_read_some operation @@ -132,6 +132,13 @@ class FoxyClient : public Client, body_parser_->get().body().on_event(event_receiver_); } + void create_session() { + session_.emplace( + backoff_timer_.get_executor(), + launchdarkly::foxy::session_opts{ + ToOptRef(ssl_context_), connect_timeout_.value_or(kNoTimeout)}); + } + /** * Called whenever the connection needs to be reattempted, triggering * a timed wait for the current backoff duration. @@ -158,6 +165,8 @@ class FoxyClient : public Client, logger_(msg.str()); create_parser(); + create_session(); + backoff_timer_.expires_from_now(backoff_.delay()); backoff_timer_.async_wait(beast::bind_front_handler( &FoxyClient::on_backoff, shared_from_this())); @@ -171,7 +180,7 @@ class FoxyClient : public Client, } void run() override { - session_.async_connect( + session_->async_connect( host_, port_, beast::bind_front_handler(&FoxyClient::on_connect, shared_from_this())); @@ -190,10 +199,10 @@ class FoxyClient : public Client, } else { req_.erase("last-event-id"); } - session_.opts.timeout = write_timeout_.value_or(kNoTimeout); - session_.async_write(req_, - beast::bind_front_handler(&FoxyClient::on_write, - shared_from_this())); + session_->opts.timeout = write_timeout_.value_or(kNoTimeout); + session_->async_write(req_, + beast::bind_front_handler(&FoxyClient::on_write, + shared_from_this())); } void on_write(boost::system::error_code ec, std::size_t amount) { @@ -205,8 +214,8 @@ class FoxyClient : public Client, return do_backoff(ec.what()); } - session_.opts.timeout = read_timeout_.value_or(kNoTimeout); - session_.async_read_header( + session_->opts.timeout = read_timeout_.value_or(kNoTimeout); + session_->async_read_header( *body_parser_, beast::bind_front_handler(&FoxyClient::on_headers, shared_from_this())); } @@ -222,7 +231,7 @@ class FoxyClient : public Client, if (!body_parser_->is_header_done()) { /* keep reading headers */ - return session_.async_read_header( + return session_->async_read_header( *body_parser_, beast::bind_front_handler(&FoxyClient::on_headers, shared_from_this())); @@ -244,7 +253,7 @@ class FoxyClient : public Client, backoff_.succeed(); last_read_ = std::chrono::steady_clock::now(); - return session_.async_read_some( + return session_->async_read_some( *body_parser_, beast::bind_front_handler(&FoxyClient::on_read_body, shared_from_this())); @@ -290,13 +299,13 @@ class FoxyClient : public Client, void on_read_body(boost::system::error_code ec, std::size_t amount) { boost::ignore_unused(amount); - if (ec == boost::asio::error::operation_aborted) { - logger_("read HTTP response body aborted"); + if (ec == boost::asio::error::operation_aborted && shutting_down_) { + logger_("read HTTP response body aborted (shutting down)"); return; } if (!ec) { log_and_update_last_read(amount); - return session_.async_read_some( + return session_->async_read_some( *body_parser_, beast::bind_front_handler(&FoxyClient::on_read_body, shared_from_this())); @@ -305,14 +314,16 @@ class FoxyClient : public Client, } void async_shutdown(std::function completion) override { - boost::asio::post(session_.get_executor(), + boost::asio::post(session_->get_executor(), beast::bind_front_handler(&FoxyClient::do_shutdown, shared_from_this(), std::move(completion))); } void do_shutdown(std::function completion) { - session_.async_shutdown(beast::bind_front_handler( + shutting_down_ = true; + backoff_timer_.cancel(); + session_->async_shutdown(beast::bind_front_handler( &FoxyClient::on_shutdown, std::move(completion))); } @@ -381,12 +392,14 @@ class FoxyClient : public Client, Builder::ErrorCallback errors_; std::optional> body_parser_; - launchdarkly::foxy::client_session session_; + std::optional session_; std::optional last_event_id_; Backoff backoff_; boost::asio::steady_timer backoff_timer_; std::optional last_read_; + + bool shutting_down_; }; Builder::Builder(net::any_io_executor ctx, std::string url) From 4f7dc9e8fb2133b367885b7b7a8cfd7e10d24c5d Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Tue, 12 Sep 2023 17:12:15 -0700 Subject: [PATCH 04/13] client should backoff when receiving final chunk --- contract-tests/sse-contract-tests/src/event_outbox.cpp | 2 +- libs/server-sent-events/src/client.cpp | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/contract-tests/sse-contract-tests/src/event_outbox.cpp b/contract-tests/sse-contract-tests/src/event_outbox.cpp index e1c62d25b..010e1ecdc 100644 --- a/contract-tests/sse-contract-tests/src/event_outbox.cpp +++ b/contract-tests/sse-contract-tests/src/event_outbox.cpp @@ -67,7 +67,7 @@ EventOutbox::RequestType EventOutbox::build_request( RequestType req; req.set(http::field::host, callback_host_); - req.method(http::verb::get); + req.method(http::verb::post); req.target(callback_url_ + "/" + std::to_string(counter)); nlohmann::json json; diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index a4ce49e97..73bec87df 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -303,6 +303,12 @@ class FoxyClient : public Client, logger_("read HTTP response body aborted (shutting down)"); return; } + if (body_parser_->is_done()) { + // The server can indicate that the chunk encoded response is done + // by sending a final chunk + CRLF. The body parser will have + // detected this. The correct response is to attempt to reconnect. + return do_backoff("receiving final chunk"); + } if (!ec) { log_and_update_last_read(amount); return session_->async_read_some( From 300acdc6d64c8186d519feeb2b251c2fd3cfd926 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Tue, 12 Sep 2023 17:28:16 -0700 Subject: [PATCH 05/13] revert session construction changes --- libs/server-sent-events/src/client.cpp | 54 ++++++++++++-------------- 1 file changed, 24 insertions(+), 30 deletions(-) diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index 73bec87df..5b79d410f 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -79,20 +79,22 @@ class FoxyClient : public Client, read_timeout_(read_timeout), write_timeout_(write_timeout), req_(std::move(req)), - backoff_( - initial_reconnect_delay.value_or(kDefaultInitialReconnectDelay), - kDefaultMaxBackoffDelay), - last_event_id_(std::nullopt), - backoff_timer_(std::move(executor)), event_receiver_(std::move(receiver)), logger_(std::move(logger)), errors_(std::move(errors)), - last_read_(std::nullopt), - shutting_down_(false), body_parser_(std::nullopt), - session_(std::nullopt) { + session_(std::move(executor), + launchdarkly::foxy::session_opts{ + ToOptRef(ssl_context_), + connect_timeout_.value_or(kNoTimeout)}), + last_event_id_(std::nullopt), + backoff_( + initial_reconnect_delay.value_or(kDefaultInitialReconnectDelay), + kDefaultMaxBackoffDelay), + backoff_timer_(session_.get_executor()), + last_read_(std::nullopt), + shutting_down_(false) { create_parser(); - create_session(); } /** Logs a message indicating that an async_read_some operation @@ -132,13 +134,6 @@ class FoxyClient : public Client, body_parser_->get().body().on_event(event_receiver_); } - void create_session() { - session_.emplace( - backoff_timer_.get_executor(), - launchdarkly::foxy::session_opts{ - ToOptRef(ssl_context_), connect_timeout_.value_or(kNoTimeout)}); - } - /** * Called whenever the connection needs to be reattempted, triggering * a timed wait for the current backoff duration. @@ -165,7 +160,6 @@ class FoxyClient : public Client, logger_(msg.str()); create_parser(); - create_session(); backoff_timer_.expires_from_now(backoff_.delay()); backoff_timer_.async_wait(beast::bind_front_handler( @@ -180,7 +174,7 @@ class FoxyClient : public Client, } void run() override { - session_->async_connect( + session_.async_connect( host_, port_, beast::bind_front_handler(&FoxyClient::on_connect, shared_from_this())); @@ -199,10 +193,10 @@ class FoxyClient : public Client, } else { req_.erase("last-event-id"); } - session_->opts.timeout = write_timeout_.value_or(kNoTimeout); - session_->async_write(req_, - beast::bind_front_handler(&FoxyClient::on_write, - shared_from_this())); + session_.opts.timeout = write_timeout_.value_or(kNoTimeout); + session_.async_write(req_, + beast::bind_front_handler(&FoxyClient::on_write, + shared_from_this())); } void on_write(boost::system::error_code ec, std::size_t amount) { @@ -214,8 +208,8 @@ class FoxyClient : public Client, return do_backoff(ec.what()); } - session_->opts.timeout = read_timeout_.value_or(kNoTimeout); - session_->async_read_header( + session_.opts.timeout = read_timeout_.value_or(kNoTimeout); + session_.async_read_header( *body_parser_, beast::bind_front_handler(&FoxyClient::on_headers, shared_from_this())); } @@ -231,7 +225,7 @@ class FoxyClient : public Client, if (!body_parser_->is_header_done()) { /* keep reading headers */ - return session_->async_read_header( + return session_.async_read_header( *body_parser_, beast::bind_front_handler(&FoxyClient::on_headers, shared_from_this())); @@ -253,7 +247,7 @@ class FoxyClient : public Client, backoff_.succeed(); last_read_ = std::chrono::steady_clock::now(); - return session_->async_read_some( + return session_.async_read_some( *body_parser_, beast::bind_front_handler(&FoxyClient::on_read_body, shared_from_this())); @@ -311,7 +305,7 @@ class FoxyClient : public Client, } if (!ec) { log_and_update_last_read(amount); - return session_->async_read_some( + return session_.async_read_some( *body_parser_, beast::bind_front_handler(&FoxyClient::on_read_body, shared_from_this())); @@ -320,7 +314,7 @@ class FoxyClient : public Client, } void async_shutdown(std::function completion) override { - boost::asio::post(session_->get_executor(), + boost::asio::post(session_.get_executor(), beast::bind_front_handler(&FoxyClient::do_shutdown, shared_from_this(), std::move(completion))); @@ -329,7 +323,7 @@ class FoxyClient : public Client, void do_shutdown(std::function completion) { shutting_down_ = true; backoff_timer_.cancel(); - session_->async_shutdown(beast::bind_front_handler( + session_.async_shutdown(beast::bind_front_handler( &FoxyClient::on_shutdown, std::move(completion))); } @@ -398,7 +392,7 @@ class FoxyClient : public Client, Builder::ErrorCallback errors_; std::optional> body_parser_; - std::optional session_; + launchdarkly::foxy::client_session session_; std::optional last_event_id_; Backoff backoff_; boost::asio::steady_timer backoff_timer_; From ed5ff7f8e3485f51395dc65c4303bcc9b7fe1b29 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Tue, 12 Sep 2023 17:31:38 -0700 Subject: [PATCH 06/13] remove shutdown bool --- libs/server-sent-events/src/client.cpp | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index 5b79d410f..e2ca4c6da 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -92,8 +92,7 @@ class FoxyClient : public Client, initial_reconnect_delay.value_or(kDefaultInitialReconnectDelay), kDefaultMaxBackoffDelay), backoff_timer_(session_.get_executor()), - last_read_(std::nullopt), - shutting_down_(false) { + last_read_(std::nullopt) { create_parser(); } @@ -160,7 +159,6 @@ class FoxyClient : public Client, logger_(msg.str()); create_parser(); - backoff_timer_.expires_from_now(backoff_.delay()); backoff_timer_.async_wait(beast::bind_front_handler( &FoxyClient::on_backoff, shared_from_this())); @@ -293,8 +291,8 @@ class FoxyClient : public Client, void on_read_body(boost::system::error_code ec, std::size_t amount) { boost::ignore_unused(amount); - if (ec == boost::asio::error::operation_aborted && shutting_down_) { - logger_("read HTTP response body aborted (shutting down)"); + if (ec == boost::asio::error::operation_aborted) { + logger_("read HTTP response body aborted"); return; } if (body_parser_->is_done()) { @@ -321,7 +319,6 @@ class FoxyClient : public Client, } void do_shutdown(std::function completion) { - shutting_down_ = true; backoff_timer_.cancel(); session_.async_shutdown(beast::bind_front_handler( &FoxyClient::on_shutdown, std::move(completion))); @@ -398,8 +395,6 @@ class FoxyClient : public Client, boost::asio::steady_timer backoff_timer_; std::optional last_read_; - - bool shutting_down_; }; Builder::Builder(net::any_io_executor ctx, std::string url) From 5e2886067b4911e9af9efaa555c6a57575c9ac2e Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Wed, 13 Sep 2023 08:03:02 -0700 Subject: [PATCH 07/13] ssl session cannot be resused for new connections --- libs/server-sent-events/src/client.cpp | 47 +++++++++++++++----------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index e2ca4c6da..8b803aa19 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -83,16 +83,14 @@ class FoxyClient : public Client, logger_(std::move(logger)), errors_(std::move(errors)), body_parser_(std::nullopt), - session_(std::move(executor), - launchdarkly::foxy::session_opts{ - ToOptRef(ssl_context_), - connect_timeout_.value_or(kNoTimeout)}), + session_(std::nullopt), last_event_id_(std::nullopt), backoff_( initial_reconnect_delay.value_or(kDefaultInitialReconnectDelay), kDefaultMaxBackoffDelay), - backoff_timer_(session_.get_executor()), + backoff_timer_(std::move(executor)), last_read_(std::nullopt) { + create_session(); create_parser(); } @@ -133,6 +131,13 @@ class FoxyClient : public Client, body_parser_->get().body().on_event(event_receiver_); } + void create_session() { + session_.emplace( + backoff_timer_.get_executor(), + launchdarkly::foxy::session_opts{ + ToOptRef(ssl_context_), connect_timeout_.value_or(kNoTimeout)}); + } + /** * Called whenever the connection needs to be reattempted, triggering * a timed wait for the current backoff duration. @@ -158,6 +163,7 @@ class FoxyClient : public Client, logger_(msg.str()); + create_session(); create_parser(); backoff_timer_.expires_from_now(backoff_.delay()); backoff_timer_.async_wait(beast::bind_front_handler( @@ -172,7 +178,7 @@ class FoxyClient : public Client, } void run() override { - session_.async_connect( + session_->async_connect( host_, port_, beast::bind_front_handler(&FoxyClient::on_connect, shared_from_this())); @@ -191,10 +197,10 @@ class FoxyClient : public Client, } else { req_.erase("last-event-id"); } - session_.opts.timeout = write_timeout_.value_or(kNoTimeout); - session_.async_write(req_, - beast::bind_front_handler(&FoxyClient::on_write, - shared_from_this())); + session_->opts.timeout = write_timeout_.value_or(kNoTimeout); + session_->async_write(req_, + beast::bind_front_handler(&FoxyClient::on_write, + shared_from_this())); } void on_write(boost::system::error_code ec, std::size_t amount) { @@ -206,8 +212,8 @@ class FoxyClient : public Client, return do_backoff(ec.what()); } - session_.opts.timeout = read_timeout_.value_or(kNoTimeout); - session_.async_read_header( + session_->opts.timeout = read_timeout_.value_or(kNoTimeout); + session_->async_read_header( *body_parser_, beast::bind_front_handler(&FoxyClient::on_headers, shared_from_this())); } @@ -223,7 +229,7 @@ class FoxyClient : public Client, if (!body_parser_->is_header_done()) { /* keep reading headers */ - return session_.async_read_header( + return session_->async_read_header( *body_parser_, beast::bind_front_handler(&FoxyClient::on_headers, shared_from_this())); @@ -242,10 +248,11 @@ class FoxyClient : public Client, return do_backoff("invalid Content-Type"); } + logger_("connected"); backoff_.succeed(); last_read_ = std::chrono::steady_clock::now(); - return session_.async_read_some( + return session_->async_read_some( *body_parser_, beast::bind_front_handler(&FoxyClient::on_read_body, shared_from_this())); @@ -292,8 +299,8 @@ class FoxyClient : public Client, void on_read_body(boost::system::error_code ec, std::size_t amount) { boost::ignore_unused(amount); if (ec == boost::asio::error::operation_aborted) { - logger_("read HTTP response body aborted"); - return; + return do_backoff( + "aborting read of response body (timeout/shutdown)"); } if (body_parser_->is_done()) { // The server can indicate that the chunk encoded response is done @@ -303,7 +310,7 @@ class FoxyClient : public Client, } if (!ec) { log_and_update_last_read(amount); - return session_.async_read_some( + return session_->async_read_some( *body_parser_, beast::bind_front_handler(&FoxyClient::on_read_body, shared_from_this())); @@ -312,7 +319,7 @@ class FoxyClient : public Client, } void async_shutdown(std::function completion) override { - boost::asio::post(session_.get_executor(), + boost::asio::post(session_->get_executor(), beast::bind_front_handler(&FoxyClient::do_shutdown, shared_from_this(), std::move(completion))); @@ -320,7 +327,7 @@ class FoxyClient : public Client, void do_shutdown(std::function completion) { backoff_timer_.cancel(); - session_.async_shutdown(beast::bind_front_handler( + session_->async_shutdown(beast::bind_front_handler( &FoxyClient::on_shutdown, std::move(completion))); } @@ -389,7 +396,7 @@ class FoxyClient : public Client, Builder::ErrorCallback errors_; std::optional> body_parser_; - launchdarkly::foxy::client_session session_; + std::optional session_; std::optional last_event_id_; Backoff backoff_; boost::asio::steady_timer backoff_timer_; From 3765fcb36688c5f9f215f402269440688fc5fcba Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Wed, 13 Sep 2023 08:16:28 -0700 Subject: [PATCH 08/13] add shutdown sentinel to ensure async_read_some completes when client is explicitely shut down --- libs/server-sent-events/src/client.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index 8b803aa19..ab296c049 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -89,7 +89,8 @@ class FoxyClient : public Client, initial_reconnect_delay.value_or(kDefaultInitialReconnectDelay), kDefaultMaxBackoffDelay), backoff_timer_(std::move(executor)), - last_read_(std::nullopt) { + last_read_(std::nullopt), + shutting_down_(false) { create_session(); create_parser(); } @@ -299,6 +300,9 @@ class FoxyClient : public Client, void on_read_body(boost::system::error_code ec, std::size_t amount) { boost::ignore_unused(amount); if (ec == boost::asio::error::operation_aborted) { + if (shutting_down_) { + return; + } return do_backoff( "aborting read of response body (timeout/shutdown)"); } @@ -326,6 +330,7 @@ class FoxyClient : public Client, } void do_shutdown(std::function completion) { + shutting_down_ = true; backoff_timer_.cancel(); session_->async_shutdown(beast::bind_front_handler( &FoxyClient::on_shutdown, std::move(completion))); @@ -402,6 +407,8 @@ class FoxyClient : public Client, boost::asio::steady_timer backoff_timer_; std::optional last_read_; + + bool shutting_down_; }; Builder::Builder(net::any_io_executor ctx, std::string url) From 24fcdfc3b542ba776a7b7ad7abefb77c7b2c8555 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Wed, 13 Sep 2023 08:30:21 -0700 Subject: [PATCH 09/13] make run() post to sessions' executor; rename to async_connect --- .../sse-contract-tests/src/entity_manager.cpp | 2 +- .../src/data_sources/streaming_data_source.cpp | 2 +- .../include/launchdarkly/sse/client.hpp | 2 +- libs/server-sent-events/src/client.cpp | 10 ++++++++-- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/contract-tests/sse-contract-tests/src/entity_manager.cpp b/contract-tests/sse-contract-tests/src/entity_manager.cpp index 8c9fd1d7f..34184f730 100644 --- a/contract-tests/sse-contract-tests/src/entity_manager.cpp +++ b/contract-tests/sse-contract-tests/src/entity_manager.cpp @@ -53,7 +53,7 @@ std::optional EntityManager::create(ConfigParams const& params) { return std::nullopt; } - client->run(); + client->async_connect(); entities_.emplace(id, std::make_pair(client, poster)); return id; diff --git a/libs/client-sdk/src/data_sources/streaming_data_source.cpp b/libs/client-sdk/src/data_sources/streaming_data_source.cpp index d645134c3..934be053b 100644 --- a/libs/client-sdk/src/data_sources/streaming_data_source.cpp +++ b/libs/client-sdk/src/data_sources/streaming_data_source.cpp @@ -163,7 +163,7 @@ void StreamingDataSource::Start() { kCouldNotParseEndpoint); return; } - client_->run(); + client_->async_connect(); } void StreamingDataSource::ShutdownAsync(std::function completion) { diff --git a/libs/server-sent-events/include/launchdarkly/sse/client.hpp b/libs/server-sent-events/include/launchdarkly/sse/client.hpp index 904904a23..9b9e191c6 100644 --- a/libs/server-sent-events/include/launchdarkly/sse/client.hpp +++ b/libs/server-sent-events/include/launchdarkly/sse/client.hpp @@ -160,7 +160,7 @@ class Builder { class Client { public: virtual ~Client() = default; - virtual void run() = 0; + virtual void async_connect() = 0; virtual void async_shutdown(std::function completion) = 0; }; diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index ab296c049..088b124c1 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -175,10 +175,16 @@ class FoxyClient : public Client, if (ec == boost::asio::error::operation_aborted) { return; } - run(); + do_run(); } - void run() override { + void async_connect() override { + boost::asio::post( + session_->get_executor(), + beast::bind_front_handler(&FoxyClient::do_run, shared_from_this())); + } + + void do_run() { session_->async_connect( host_, port_, beast::bind_front_handler(&FoxyClient::on_connect, From 249e1477829eeb2c9e80dca4ac065e4f0575fa47 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Wed, 13 Sep 2023 09:01:24 -0700 Subject: [PATCH 10/13] implement read timeout capability in contract tests --- contract-tests/sse-contract-tests/src/entity_manager.cpp | 5 +++++ contract-tests/sse-contract-tests/src/event_outbox.cpp | 2 ++ contract-tests/sse-contract-tests/src/main.cpp | 3 ++- libs/server-sent-events/include/launchdarkly/sse/error.hpp | 1 + libs/server-sent-events/src/client.cpp | 1 + 5 files changed, 11 insertions(+), 1 deletion(-) diff --git a/contract-tests/sse-contract-tests/src/entity_manager.cpp b/contract-tests/sse-contract-tests/src/entity_manager.cpp index 34184f730..142ce8c6d 100644 --- a/contract-tests/sse-contract-tests/src/entity_manager.cpp +++ b/contract-tests/sse-contract-tests/src/entity_manager.cpp @@ -35,6 +35,11 @@ std::optional EntityManager::create(ConfigParams const& params) { std::chrono::milliseconds(*params.readTimeoutMs)); } + if (params.initialDelayMs) { + client_builder.initial_reconnect_delay( + std::chrono::milliseconds(*params.initialDelayMs)); + } + client_builder.logger([this](std::string msg) { LD_LOG(logger_, LogLevel::kDebug) << std::move(msg); }); diff --git a/contract-tests/sse-contract-tests/src/event_outbox.cpp b/contract-tests/sse-contract-tests/src/event_outbox.cpp index 010e1ecdc..ee69653a6 100644 --- a/contract-tests/sse-contract-tests/src/event_outbox.cpp +++ b/contract-tests/sse-contract-tests/src/event_outbox.cpp @@ -93,6 +93,8 @@ EventOutbox::RequestType EventOutbox::build_request( break; case Error::UnrecoverableClientError: msg.comment = "unrecoverable client error"; + case Error::ReadTimeout: + msg.comment = "read timeout"; default: msg.comment = "unspecified error"; } diff --git a/contract-tests/sse-contract-tests/src/main.cpp b/contract-tests/sse-contract-tests/src/main.cpp index 733f457ce..0a9881466 100644 --- a/contract-tests/sse-contract-tests/src/main.cpp +++ b/contract-tests/sse-contract-tests/src/main.cpp @@ -20,7 +20,7 @@ int main(int argc, char* argv[]) { launchdarkly::Logger logger{ std::make_unique("sse-contract-tests")}; - const std::string default_port = "8123"; + std::string const default_port = "8123"; std::string port = default_port; if (argc == 2) { port = @@ -38,6 +38,7 @@ int main(int argc, char* argv[]) { srv.add_capability("report"); srv.add_capability("post"); srv.add_capability("reconnection"); + srv.add_capability("read-timeout"); net::signal_set signals{ioc, SIGINT, SIGTERM}; diff --git a/libs/server-sent-events/include/launchdarkly/sse/error.hpp b/libs/server-sent-events/include/launchdarkly/sse/error.hpp index f9ac4b293..df6ffe0ea 100644 --- a/libs/server-sent-events/include/launchdarkly/sse/error.hpp +++ b/libs/server-sent-events/include/launchdarkly/sse/error.hpp @@ -6,5 +6,6 @@ enum class Error { NoContent = 1, InvalidRedirectLocation = 2, UnrecoverableClientError = 3, + ReadTimeout = 4, }; } diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index 088b124c1..95d1b02df 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -309,6 +309,7 @@ class FoxyClient : public Client, if (shutting_down_) { return; } + errors_(Error::ReadTimeout); return do_backoff( "aborting read of response body (timeout/shutdown)"); } From d8e7b1d2982505fafda8225b183125fab39897c8 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Wed, 13 Sep 2023 09:38:42 -0700 Subject: [PATCH 11/13] fix some lints --- .../include/event_outbox.hpp | 4 ++-- .../sse-contract-tests/src/entity_manager.cpp | 11 +++++----- .../sse-contract-tests/src/event_outbox.cpp | 22 ++++++++----------- .../data_sources/streaming_data_source.cpp | 2 ++ 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/contract-tests/sse-contract-tests/include/event_outbox.hpp b/contract-tests/sse-contract-tests/include/event_outbox.hpp index 20e609051..6ed957bd9 100644 --- a/contract-tests/sse-contract-tests/include/event_outbox.hpp +++ b/contract-tests/sse-contract-tests/include/event_outbox.hpp @@ -72,11 +72,11 @@ class EventOutbox : public std::enable_shared_from_this { private: RequestType build_request( std::size_t counter, - std::variant ev); + std::variant event); void on_resolve(beast::error_code ec, tcp::resolver::results_type results); void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type); void on_flush_timer(boost::system::error_code ec); void on_write(beast::error_code ec, std::size_t); - void do_shutdown(beast::error_code ec, std::string what); + void do_shutdown(beast::error_code ec); }; diff --git a/contract-tests/sse-contract-tests/src/entity_manager.cpp b/contract-tests/sse-contract-tests/src/entity_manager.cpp index 142ce8c6d..798d55ce5 100644 --- a/contract-tests/sse-contract-tests/src/entity_manager.cpp +++ b/contract-tests/sse-contract-tests/src/entity_manager.cpp @@ -27,7 +27,7 @@ std::optional EntityManager::create(ConfigParams const& params) { } if (params.body) { - client_builder.body(std::move(*params.body)); + client_builder.body(*params.body); } if (params.readTimeoutMs) { @@ -44,12 +44,13 @@ std::optional EntityManager::create(ConfigParams const& params) { LD_LOG(logger_, LogLevel::kDebug) << std::move(msg); }); - client_builder.receiver([copy = poster](launchdarkly::sse::Event e) { - copy->post_event(std::move(e)); + client_builder.receiver([copy = poster](launchdarkly::sse::Event event) { + copy->post_event(std::move(event)); }); - client_builder.errors( - [copy = poster](launchdarkly::sse::Error e) { copy->post_error(e); }); + client_builder.errors([copy = poster](launchdarkly::sse::Error event) { + copy->post_error(event); + }); auto client = client_builder.build(); if (!client) { diff --git a/contract-tests/sse-contract-tests/src/event_outbox.cpp b/contract-tests/sse-contract-tests/src/event_outbox.cpp index ee69653a6..2f09615c3 100644 --- a/contract-tests/sse-contract-tests/src/event_outbox.cpp +++ b/contract-tests/sse-contract-tests/src/event_outbox.cpp @@ -14,8 +14,6 @@ auto const kOutboxCapacity = 1023; EventOutbox::EventOutbox(net::any_io_executor executor, std::string callback_url) : callback_url_{std::move(callback_url)}, - callback_port_{}, - callback_host_{}, callback_counter_{0}, executor_{executor}, resolver_{executor}, @@ -29,7 +27,7 @@ EventOutbox::EventOutbox(net::any_io_executor executor, callback_port_ = uri_components->port(); } -void EventOutbox::do_shutdown(beast::error_code ec, std::string what) { +void EventOutbox::do_shutdown(beast::error_code ec) { event_stream_.socket().shutdown(tcp::socket::shutdown_both, ec); flush_timer_.cancel(); } @@ -54,16 +52,14 @@ void EventOutbox::run() { void EventOutbox::stop() { beast::error_code ec = net::error::basic_errors::operation_aborted; - std::string reason = "stop"; shutdown_ = true; - net::post(executor_, - beast::bind_front_handler(&EventOutbox::do_shutdown, - shared_from_this(), ec, reason)); + net::post(executor_, beast::bind_front_handler(&EventOutbox::do_shutdown, + shared_from_this(), ec)); } EventOutbox::RequestType EventOutbox::build_request( std::size_t counter, - std::variant ev) { + std::variant event) { RequestType req; req.set(http::field::host, callback_host_); @@ -101,7 +97,7 @@ EventOutbox::RequestType EventOutbox::build_request( json = msg; } }, - std::move(ev)); + std::move(event)); req.body() = json.dump(); req.prepare_payload(); @@ -111,7 +107,7 @@ EventOutbox::RequestType EventOutbox::build_request( void EventOutbox::on_resolve(beast::error_code ec, tcp::resolver::results_type results) { if (ec) { - return do_shutdown(ec, "resolve"); + return do_shutdown(ec); } beast::get_lowest_layer(event_stream_) @@ -123,7 +119,7 @@ void EventOutbox::on_resolve(beast::error_code ec, void EventOutbox::on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type) { if (ec) { - return do_shutdown(ec, "connect"); + return do_shutdown(ec); } boost::system::error_code dummy; @@ -133,7 +129,7 @@ void EventOutbox::on_connect(beast::error_code ec, void EventOutbox::on_flush_timer(boost::system::error_code ec) { if (ec && shutdown_) { - return do_shutdown(ec, "flush"); + return do_shutdown(ec); } if (!outbox_.empty()) { @@ -156,7 +152,7 @@ void EventOutbox::on_flush_timer(boost::system::error_code ec) { void EventOutbox::on_write(beast::error_code ec, std::size_t) { if (ec) { - return do_shutdown(ec, "write"); + return do_shutdown(ec); } outbox_.pop(); on_flush_timer(boost::system::error_code{}); diff --git a/libs/client-sdk/src/data_sources/streaming_data_source.cpp b/libs/client-sdk/src/data_sources/streaming_data_source.cpp index 934be053b..4d1caf6f5 100644 --- a/libs/client-sdk/src/data_sources/streaming_data_source.cpp +++ b/libs/client-sdk/src/data_sources/streaming_data_source.cpp @@ -27,6 +27,8 @@ static char const* DataSourceErrorToString(launchdarkly::sse::Error error) { return "server responded with an invalid redirection"; case sse::Error::UnrecoverableClientError: return "unrecoverable client-side error"; + case sse::Error::ReadTimeout: + return "read timeout reached"; } } From 53567e4de728941007db2560e1872cd934b7b22d Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Wed, 13 Sep 2023 10:53:16 -0700 Subject: [PATCH 12/13] add many comments --- libs/server-sent-events/src/client.cpp | 171 +++++++++++++++++-------- 1 file changed, 116 insertions(+), 55 deletions(-) diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index 95d1b02df..142768242 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -95,43 +95,17 @@ class FoxyClient : public Client, create_parser(); } - /** Logs a message indicating that an async_read_some operation - * on the response's body has completed, and how long that operation took - * (using a monotonic clock.) - * - * This is useful for debugging timeout-related issues, like receiving a - * heartbeat message. - * - * The timestamp of the last read is initialized after receiving successful - * headers, but before initiating the first body read. Future reads then - * update the timestamp. - */ - void log_and_update_last_read(std::size_t amount) { - if (last_read_) { - auto sec_since_last_read = - std::chrono::duration_cast( - std::chrono::steady_clock::now() - *last_read_) - .count(); - logger_("read (" + std::to_string(amount) + ") bytes in (" + - std::to_string(sec_since_last_read) + ") sec"); - } else { - logger_("read (" + std::to_string(amount) + ") bytes"); - } - last_read_ = std::chrono::steady_clock::now(); - } - - /** The body parser is recreated each time a connection is made because - * its internal state cannot be explicitly reset. - * - * Since SSE body will never end unless - * an error occurs, the body size limit must be removed. - */ + // The body parser is recreated each time a connection is made because + // its internal state cannot be explicitly reset. void create_parser() { body_parser_.emplace(); + // Remove body read limit because an SSE stream can be infinite. body_parser_->body_limit(boost::none); body_parser_->get().body().on_event(event_receiver_); } + // The session is recreated each time a connection is made because its + // internal state cannot be explicitly reset. void create_session() { session_.emplace( backoff_timer_.get_executor(), @@ -146,7 +120,7 @@ class FoxyClient : public Client, * The body parser's last SSE event ID must be cached so it can be added * as a header on the next request (since the parser is destroyed.) */ - void do_backoff(std::string const& reason) { + void async_backoff(std::string const& reason) { backoff_.fail(); if (auto id = body_parser_->get().body().last_event_id()) { @@ -196,7 +170,7 @@ class FoxyClient : public Client, return; } if (ec) { - return do_backoff(ec.what()); + return async_backoff(ec.what()); } if (last_event_id_) { @@ -216,7 +190,7 @@ class FoxyClient : public Client, return; } if (ec) { - return do_backoff(ec.what()); + return async_backoff(ec.what()); } session_->opts.timeout = read_timeout_.value_or(kNoTimeout); @@ -231,7 +205,7 @@ class FoxyClient : public Client, return; } if (ec) { - return do_backoff(ec.what()); + return async_backoff(ec.what()); } if (!body_parser_->is_header_done()) { @@ -252,7 +226,7 @@ class FoxyClient : public Client, return; } if (!correct_content_type(response)) { - return do_backoff("invalid Content-Type"); + return async_backoff("invalid Content-Type"); } logger_("connected"); @@ -287,14 +261,14 @@ class FoxyClient : public Client, if (status_class == beast::http::status_class::client_error) { if (recoverable_client_error(response.result())) { - return do_backoff(backoff_reason(response.result())); + return async_backoff(backoff_reason(response.result())); } errors_(Error::UnrecoverableClientError); return; } - do_backoff(backoff_reason(response.result())); + async_backoff(backoff_reason(response.result())); } static std::string backoff_reason(beast::http::status status) { @@ -305,31 +279,59 @@ class FoxyClient : public Client, void on_read_body(boost::system::error_code ec, std::size_t amount) { boost::ignore_unused(amount); - if (ec == boost::asio::error::operation_aborted) { - if (shutting_down_) { - return; + if (ec) { + if (ec == boost::asio::error::operation_aborted) { + // operation_aborted can occur if the read timeout is reached or + // if we're shutting down, so shutting_down_ is needed to + // disambiguate. + if (shutting_down_) { + // End the chain of async operations. + return; + } + errors_(Error::ReadTimeout); + return async_backoff( + "aborting read of response body (timeout)"); } - errors_(Error::ReadTimeout); - return do_backoff( - "aborting read of response body (timeout/shutdown)"); + return async_backoff(ec.what()); } + + // The server can indicate that the chunk encoded response is done + // by sending a final chunk + CRLF. The body parser will have + // detected this. if (body_parser_->is_done()) { - // The server can indicate that the chunk encoded response is done - // by sending a final chunk + CRLF. The body parser will have - // detected this. The correct response is to attempt to reconnect. - return do_backoff("receiving final chunk"); + return async_backoff("receiving final chunk"); } - if (!ec) { - log_and_update_last_read(amount); - return session_->async_read_some( - *body_parser_, - beast::bind_front_handler(&FoxyClient::on_read_body, - shared_from_this())); + + log_and_update_last_read(amount); + return session_->async_read_some( + *body_parser_, beast::bind_front_handler(&FoxyClient::on_read_body, + shared_from_this())); + } + + /** Logs a message indicating that an async_read_some operation + * on the response's body has completed, and how long that operation took + * (using a monotonic clock.) + * + * This is useful for debugging timeout-related issues, like receiving a + * heartbeat message. + */ + void log_and_update_last_read(std::size_t amount) { + if (last_read_) { + auto sec_since_last_read = + std::chrono::duration_cast( + std::chrono::steady_clock::now() - *last_read_) + .count(); + logger_("read (" + std::to_string(amount) + ") bytes in (" + + std::to_string(sec_since_last_read) + ") sec"); + } else { + logger_("read (" + std::to_string(amount) + ") bytes"); } - do_backoff(ec.what()); + last_read_ = std::chrono::steady_clock::now(); } void async_shutdown(std::function completion) override { + // Get on the session's executor, otherwise the code in the completion + // handler could race. boost::asio::post(session_->get_executor(), beast::bind_front_handler(&FoxyClient::do_shutdown, shared_from_this(), @@ -345,18 +347,25 @@ class FoxyClient : public Client, static void on_shutdown(std::function completion, boost::system::error_code ec) { + // Because do_shutdown doesn't use shared_from_this() when initiating + // the async_shutdown op, the client may already be destroyed - hence + // this static method. boost::ignore_unused(ec); if (completion) { completion(); } } + // Some client errors are considered recoverable, meaning they could be + // resolved by reconnecting. Returns true if the status is one of those. static bool recoverable_client_error(beast::http::status status) { return (status == beast::http::status::bad_request || status == beast::http::status::request_timeout || status == beast::http::status::too_many_requests); } + // A naive comparison of content-type to 'text/event-stream' is incorrect + // because multiple content types may be present. static bool correct_content_type(FoxyClient::response const& response) { if (auto content_type = response.find("content-type"); content_type != response.end()) { @@ -366,12 +375,16 @@ class FoxyClient : public Client, return false; } + // If the server redirects, ensure the location header is present. static bool can_redirect(FoxyClient::response const& response) { return (response.result() == beast::http::status::moved_permanently || response.result() == beast::http::status::temporary_redirect) && response.find("location") != response.end(); } + // Generates a redirect URL from a base and provided location. Since the + // location might not be absolute, it may be necessary to resolve it against + // the base. static std::optional redirect_url( std::string orig_base, std::string orig_location) { @@ -393,28 +406,76 @@ class FoxyClient : public Client, } private: + // Optional, but necessary for establishing TLS connections. If it is + // omitted and the scheme is https://, the connection will fail. Can be + // reused across connections. std::optional ssl_context_; + std::string host_; std::string port_; + // If present, the max amount of time that can be spent resolving DNS + // and setting up the initial connection. This doesn't include writing the + // request nor receiving the response. std::optional connect_timeout_; + + // If present, the max amount of time that can be spent after reading bytes + // before more bytes must be read. Applies after sending the request, but + // before receiving the response headers. Intended to terminate connections + // where the sender has hung up. LaunchDarkly sends a heartbeat every 180 + // seconds, so the read timeout must be greater than this. std::optional read_timeout_; + + // If present, the max amount of time that can be spent sending the request. std::optional write_timeout_; + // The request sent to the server, which persists across reconnection + // attempts. http::request req_; + // Callback which executed whenever an SSE event is received. This is + // passed into the body_parser whenever it is constructed, so it needs to be + // stored here for future use. Builder::EventReceiver event_receiver_; + + // Callback executed when log messages must be generated. The provider of + // the callback dictates the log level, which at the moment of writing is + // 'debug'. Builder::LogCallback logger_; + + // Callback executed to report errors. This is the primary mechanism by + // which the client communicates error conditions to the user. Builder::ErrorCallback errors_; + // Customized parser (see parser.hpp) which repeatedly receives chunks of + // data and parses them into SSE events. It cannot be reused across + // connections, hence the optional so it can be destroyed easily. std::optional> body_parser_; + + // The primary network primitive used to read/write to the server. It is our + // responsibility to ensure it is called from only one thread at a time. It + // cannot be reused across connections, similar to the body_parser. std::optional session_; + + // Stores the last known SSE event ID, which can be provided to the server + // upon reconnection. std::optional last_event_id_; + + // Computes the backoff delays used whenever the connection drops + // unnaturally. Backoff backoff_; + + // Used in concert with the backoff member to delay reconnection attempts. + // The timer must be cancelled when shutdown is requested, because an + // outstanding backoff delay might be in progress. boost::asio::steady_timer backoff_timer_; + // Stores the timestamp of the last read in order to augment debug logging. std::optional last_read_; + // Upon completing a read, a new async read is generally initiated. This + // determines whether to return and thus end the operation gracefully (true) + // or to perform backoff (false). bool shutting_down_; }; From 346ff3daf3157615fa936d1a4417f292ae55170d Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Wed, 13 Sep 2023 13:09:38 -0700 Subject: [PATCH 13/13] revert foxy formatting changes --- .../foxy/impl/session/async_read.impl.hpp | 89 +++++----- vendor/foxy/include/foxy/session.hpp | 157 +++++++++--------- 2 files changed, 118 insertions(+), 128 deletions(-) diff --git a/vendor/foxy/include/foxy/impl/session/async_read.impl.hpp b/vendor/foxy/include/foxy/impl/session/async_read.impl.hpp index dd64c9b35..bcbb2d05a 100644 --- a/vendor/foxy/include/foxy/impl/session/async_read.impl.hpp +++ b/vendor/foxy/include/foxy/impl/session/async_read.impl.hpp @@ -1,9 +1,8 @@ // -// Copyright (c) 2018-2019 Christian Mazakas (christian dot mazakas at gmail dot -// com) +// Copyright (c) 2018-2019 Christian Mazakas (christian dot mazakas at gmail dot com) // -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE_1_0.txt +// or copy at http://www.boost.org/LICENSE_1_0.txt) // // Official repository: https://github.com/LeonineKing1199/foxy // @@ -11,56 +10,54 @@ #ifndef FOXY_IMPL_SESSION_ASYNC_READ_IMPL_HPP_ #define FOXY_IMPL_SESSION_ASYNC_READ_IMPL_HPP_ -#include #include +#include -namespace launchdarkly::foxy { +namespace launchdarkly::foxy +{ template template -auto basic_session::async_read( - Parser& parser, - ReadHandler&& handler) & -> - typename boost::asio::async_result, - void(boost::system::error_code, - std::size_t)>::return_type { - return ::launchdarkly::foxy::detail::async_timer( - [&parser, self = this, coro = boost::asio::coroutine()]( - auto& cb, boost::system::error_code ec = {}, - std::size_t bytes_transferrred = 0) mutable { - BOOST_ASIO_CORO_REENTER(coro) { - BOOST_ASIO_CORO_YIELD boost::beast::http::async_read( - self->stream, self->buffer, parser, std::move(cb)); - - cb.complete(ec, bytes_transferrred); - } - }, - *this, std::forward(handler)); +auto +basic_session::async_read(Parser& parser, ReadHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, std::size_t)>::return_type +{ + return ::launchdarkly::foxy::detail::async_timer( + [&parser, self = this, coro = boost::asio::coroutine()]( + auto& cb, boost::system::error_code ec = {}, std::size_t bytes_transferrred = 0) mutable { + BOOST_ASIO_CORO_REENTER(coro) + { + BOOST_ASIO_CORO_YIELD boost::beast::http::async_read(self->stream, self->buffer, parser, + std::move(cb)); + + cb.complete(ec, bytes_transferrred); + } + }, + *this, std::forward(handler)); } template template -auto basic_session::async_read_some( - Parser& parser, - ReadHandler&& handler) & -> - typename boost::asio::async_result, - void(boost::system::error_code, - std::size_t)>::return_type { - return ::launchdarkly::foxy::detail::async_timer( - [&parser, self = this, coro = boost::asio::coroutine()]( - auto& cb, boost::system::error_code ec = {}, - std::size_t bytes_transferrred = 0) mutable { - BOOST_ASIO_CORO_REENTER(coro) { - BOOST_ASIO_CORO_YIELD boost::beast::http::async_read_some( - self->stream, self->buffer, parser, std::move(cb)); - - cb.complete(ec, bytes_transferrred); - } - }, - *this, std::forward(handler)); +auto +basic_session::async_read_some(Parser& parser, ReadHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, std::size_t)>::return_type +{ + return ::launchdarkly::foxy::detail::async_timer( + [&parser, self = this, coro = boost::asio::coroutine()]( + auto& cb, boost::system::error_code ec = {}, std::size_t bytes_transferrred = 0) mutable { + BOOST_ASIO_CORO_REENTER(coro) + { + BOOST_ASIO_CORO_YIELD boost::beast::http::async_read_some(self->stream, self->buffer, parser, + std::move(cb)); + + cb.complete(ec, bytes_transferrred); + } + }, + *this, std::forward(handler)); } -} // namespace launchdarkly::foxy -#endif // FOXY_IMPL_SESSION_ASYNC_READ_IMPL_HPP_ +} // namespace launchdarkly::foxy + +#endif // FOXY_IMPL_SESSION_ASYNC_READ_IMPL_HPP_ diff --git a/vendor/foxy/include/foxy/session.hpp b/vendor/foxy/include/foxy/session.hpp index c2f6f0dac..ed1d212af 100644 --- a/vendor/foxy/include/foxy/session.hpp +++ b/vendor/foxy/include/foxy/session.hpp @@ -1,9 +1,8 @@ // -// Copyright (c) 2018-2019 Christian Mazakas (christian dot mazakas at gmail dot -// com) +// Copyright (c) 2018-2019 Christian Mazakas (christian dot mazakas at gmail dot com) // -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE_1_0.txt +// or copy at http://www.boost.org/LICENSE_1_0.txt) // // Official repository: https://github.com/LeonineKing1199/foxy // @@ -11,8 +10,8 @@ #ifndef FOXY_SESSION_HPP_ #define FOXY_SESSION_HPP_ -#include #include +#include #include #include @@ -23,90 +22,84 @@ #include #include -#include #include #include +#include -namespace launchdarkly::foxy { +namespace launchdarkly::foxy +{ template -struct basic_session { - public: - static_assert(boost::beast::is_async_stream::value, - "Requirements on the Stream type were not met. Stream must " - "be a Beast.AsyncStream"); - - static_assert(boost::asio::is_dynamic_buffer::value, - "Requirements on the DynamicBuffer type were not met. " - "DynamicBuffer must be an " - "Asio.DynamicBuffer"); - - using stream_type = ::launchdarkly::foxy::basic_multi_stream; - using buffer_type = DynamicBuffer; - using timer_type = boost::asio::steady_timer; - using executor_type = typename stream_type::executor_type; - - session_opts opts; - stream_type stream; - buffer_type buffer; - timer_type timer; - - basic_session() = delete; - basic_session(basic_session const&) = delete; - basic_session(basic_session&&) = default; - - template - basic_session(boost::asio::any_io_executor executor, - session_opts opts_, - BufferArgs&&... bargs); - - template - basic_session(boost::asio::io_context& io, - session_opts opts_, - BufferArgs&&... bargs); - - template - basic_session(stream_type stream_, - session_opts opts_, - BufferArgs&&... bargs); - - auto get_executor() -> executor_type; - - template - auto async_read_header(Parser& parser, ReadHandler&& handler) & -> - typename boost::asio::async_result, - void(boost::system::error_code, - std::size_t)>::return_type; - - template - auto async_read(Parser& parser, ReadHandler&& handler) & -> - typename boost::asio::async_result, - void(boost::system::error_code, - std::size_t)>::return_type; - - template - auto async_read_some(Parser& parser, ReadHandler&& handler) & -> - typename boost::asio::async_result, - void(boost::system::error_code, - std::size_t)>::return_type; - - template - auto async_write_header(Serializer& serializer, WriteHandler&& handler) & -> - typename boost::asio::async_result, - void(boost::system::error_code, - std::size_t)>::return_type; - - template - auto async_write(Serializer& serializer, WriteHandler&& handler) & -> - typename boost::asio::async_result, - void(boost::system::error_code, - std::size_t)>::return_type; +struct basic_session +{ +public: + static_assert(boost::beast::is_async_stream::value, + "Requirements on the Stream type were not met. Stream must be a Beast.AsyncStream"); + + static_assert(boost::asio::is_dynamic_buffer::value, + "Requirements on the DynamicBuffer type were not met. DynamicBuffer must be an " + "Asio.DynamicBuffer"); + + using stream_type = ::launchdarkly::foxy::basic_multi_stream; + using buffer_type = DynamicBuffer; + using timer_type = boost::asio::steady_timer; + using executor_type = typename stream_type::executor_type; + + session_opts opts; + stream_type stream; + buffer_type buffer; + timer_type timer; + + basic_session() = delete; + basic_session(basic_session const&) = delete; + basic_session(basic_session&&) = default; + + template + basic_session(boost::asio::any_io_executor executor, session_opts opts_, BufferArgs&&... bargs); + + template + basic_session(boost::asio::io_context& io, session_opts opts_, BufferArgs&&... bargs); + + template + basic_session(stream_type stream_, session_opts opts_, BufferArgs&&... bargs); + + auto + get_executor() -> executor_type; + + template + auto + async_read_header(Parser& parser, ReadHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, std::size_t)>::return_type; + + template + auto + async_read(Parser& parser, ReadHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, std::size_t)>::return_type; + + template + auto + async_read_some(Parser& parser, ReadHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, std::size_t)>::return_type; + + template + auto + async_write_header(Serializer& serializer, WriteHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, std::size_t)>::return_type; + + template + auto + async_write(Serializer& serializer, WriteHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, std::size_t)>::return_type; }; -using session = - basic_session; +using session = basic_session; -} // namespace launchdarkly::foxy +} // namespace launchdarkly::foxy #include -#endif // FOXY_SESSION_HPP_ +#endif // FOXY_SESSION_HPP_