diff --git a/contract-tests/sse-contract-tests/include/event_outbox.hpp b/contract-tests/sse-contract-tests/include/event_outbox.hpp index 20e609051..6ed957bd9 100644 --- a/contract-tests/sse-contract-tests/include/event_outbox.hpp +++ b/contract-tests/sse-contract-tests/include/event_outbox.hpp @@ -72,11 +72,11 @@ class EventOutbox : public std::enable_shared_from_this { private: RequestType build_request( std::size_t counter, - std::variant ev); + std::variant event); void on_resolve(beast::error_code ec, tcp::resolver::results_type results); void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type); void on_flush_timer(boost::system::error_code ec); void on_write(beast::error_code ec, std::size_t); - void do_shutdown(beast::error_code ec, std::string what); + void do_shutdown(beast::error_code ec); }; diff --git a/contract-tests/sse-contract-tests/src/entity_manager.cpp b/contract-tests/sse-contract-tests/src/entity_manager.cpp index 8c9fd1d7f..798d55ce5 100644 --- a/contract-tests/sse-contract-tests/src/entity_manager.cpp +++ b/contract-tests/sse-contract-tests/src/entity_manager.cpp @@ -27,7 +27,7 @@ std::optional EntityManager::create(ConfigParams const& params) { } if (params.body) { - client_builder.body(std::move(*params.body)); + client_builder.body(*params.body); } if (params.readTimeoutMs) { @@ -35,16 +35,22 @@ std::optional EntityManager::create(ConfigParams const& params) { std::chrono::milliseconds(*params.readTimeoutMs)); } + if (params.initialDelayMs) { + client_builder.initial_reconnect_delay( + std::chrono::milliseconds(*params.initialDelayMs)); + } + client_builder.logger([this](std::string msg) { LD_LOG(logger_, LogLevel::kDebug) << std::move(msg); }); - client_builder.receiver([copy = poster](launchdarkly::sse::Event e) { - copy->post_event(std::move(e)); + client_builder.receiver([copy = poster](launchdarkly::sse::Event event) { + copy->post_event(std::move(event)); }); - client_builder.errors( - [copy = poster](launchdarkly::sse::Error e) { copy->post_error(e); }); + client_builder.errors([copy = poster](launchdarkly::sse::Error event) { + copy->post_error(event); + }); auto client = client_builder.build(); if (!client) { @@ -53,7 +59,7 @@ std::optional EntityManager::create(ConfigParams const& params) { return std::nullopt; } - client->run(); + client->async_connect(); entities_.emplace(id, std::make_pair(client, poster)); return id; diff --git a/contract-tests/sse-contract-tests/src/event_outbox.cpp b/contract-tests/sse-contract-tests/src/event_outbox.cpp index e1c62d25b..2f09615c3 100644 --- a/contract-tests/sse-contract-tests/src/event_outbox.cpp +++ b/contract-tests/sse-contract-tests/src/event_outbox.cpp @@ -14,8 +14,6 @@ auto const kOutboxCapacity = 1023; EventOutbox::EventOutbox(net::any_io_executor executor, std::string callback_url) : callback_url_{std::move(callback_url)}, - callback_port_{}, - callback_host_{}, callback_counter_{0}, executor_{executor}, resolver_{executor}, @@ -29,7 +27,7 @@ EventOutbox::EventOutbox(net::any_io_executor executor, callback_port_ = uri_components->port(); } -void EventOutbox::do_shutdown(beast::error_code ec, std::string what) { +void EventOutbox::do_shutdown(beast::error_code ec) { event_stream_.socket().shutdown(tcp::socket::shutdown_both, ec); flush_timer_.cancel(); } @@ -54,20 +52,18 @@ void EventOutbox::run() { void EventOutbox::stop() { beast::error_code ec = net::error::basic_errors::operation_aborted; - std::string reason = "stop"; shutdown_ = true; - net::post(executor_, - beast::bind_front_handler(&EventOutbox::do_shutdown, - shared_from_this(), ec, reason)); + net::post(executor_, beast::bind_front_handler(&EventOutbox::do_shutdown, + shared_from_this(), ec)); } EventOutbox::RequestType EventOutbox::build_request( std::size_t counter, - std::variant ev) { + std::variant event) { RequestType req; req.set(http::field::host, callback_host_); - req.method(http::verb::get); + req.method(http::verb::post); req.target(callback_url_ + "/" + std::to_string(counter)); nlohmann::json json; @@ -93,13 +89,15 @@ EventOutbox::RequestType EventOutbox::build_request( break; case Error::UnrecoverableClientError: msg.comment = "unrecoverable client error"; + case Error::ReadTimeout: + msg.comment = "read timeout"; default: msg.comment = "unspecified error"; } json = msg; } }, - std::move(ev)); + std::move(event)); req.body() = json.dump(); req.prepare_payload(); @@ -109,7 +107,7 @@ EventOutbox::RequestType EventOutbox::build_request( void EventOutbox::on_resolve(beast::error_code ec, tcp::resolver::results_type results) { if (ec) { - return do_shutdown(ec, "resolve"); + return do_shutdown(ec); } beast::get_lowest_layer(event_stream_) @@ -121,7 +119,7 @@ void EventOutbox::on_resolve(beast::error_code ec, void EventOutbox::on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type) { if (ec) { - return do_shutdown(ec, "connect"); + return do_shutdown(ec); } boost::system::error_code dummy; @@ -131,7 +129,7 @@ void EventOutbox::on_connect(beast::error_code ec, void EventOutbox::on_flush_timer(boost::system::error_code ec) { if (ec && shutdown_) { - return do_shutdown(ec, "flush"); + return do_shutdown(ec); } if (!outbox_.empty()) { @@ -154,7 +152,7 @@ void EventOutbox::on_flush_timer(boost::system::error_code ec) { void EventOutbox::on_write(beast::error_code ec, std::size_t) { if (ec) { - return do_shutdown(ec, "write"); + return do_shutdown(ec); } outbox_.pop(); on_flush_timer(boost::system::error_code{}); diff --git a/contract-tests/sse-contract-tests/src/main.cpp b/contract-tests/sse-contract-tests/src/main.cpp index 733f457ce..0a9881466 100644 --- a/contract-tests/sse-contract-tests/src/main.cpp +++ b/contract-tests/sse-contract-tests/src/main.cpp @@ -20,7 +20,7 @@ int main(int argc, char* argv[]) { launchdarkly::Logger logger{ std::make_unique("sse-contract-tests")}; - const std::string default_port = "8123"; + std::string const default_port = "8123"; std::string port = default_port; if (argc == 2) { port = @@ -38,6 +38,7 @@ int main(int argc, char* argv[]) { srv.add_capability("report"); srv.add_capability("post"); srv.add_capability("reconnection"); + srv.add_capability("read-timeout"); net::signal_set signals{ioc, SIGINT, SIGTERM}; diff --git a/libs/client-sdk/src/data_sources/streaming_data_source.cpp b/libs/client-sdk/src/data_sources/streaming_data_source.cpp index 9d7c504cc..4d1caf6f5 100644 --- a/libs/client-sdk/src/data_sources/streaming_data_source.cpp +++ b/libs/client-sdk/src/data_sources/streaming_data_source.cpp @@ -27,6 +27,8 @@ static char const* DataSourceErrorToString(launchdarkly::sse::Error error) { return "server responded with an invalid redirection"; case sse::Error::UnrecoverableClientError: return "unrecoverable client-side error"; + case sse::Error::ReadTimeout: + return "read timeout reached"; } } @@ -138,7 +140,7 @@ void StreamingDataSource::Start() { client_builder.logger([weak_self](auto msg) { if (auto self = weak_self.lock()) { - LD_LOG(self->logger_, LogLevel::kDebug) << msg; + LD_LOG(self->logger_, LogLevel::kDebug) << "sse-client: " << msg; } }); @@ -163,7 +165,7 @@ void StreamingDataSource::Start() { kCouldNotParseEndpoint); return; } - client_->run(); + client_->async_connect(); } void StreamingDataSource::ShutdownAsync(std::function completion) { diff --git a/libs/common/src/config/logging_builder.cpp b/libs/common/src/config/logging_builder.cpp index 378494535..eb8c5b1c6 100644 --- a/libs/common/src/config/logging_builder.cpp +++ b/libs/common/src/config/logging_builder.cpp @@ -60,7 +60,9 @@ LoggingBuilder::BasicLogging& LoggingBuilder::BasicLogging::Tag( return *this; } LoggingBuilder::BasicLogging::BasicLogging() - : level_(Defaults::LogLevel()), tag_(Defaults::LogTag()) {} + : level_(GetLogLevelEnum(std::getenv("LD_LOG_LEVEL"), + Defaults::LogLevel())), + tag_(Defaults::LogTag()) {} LoggingBuilder::CustomLogging& LoggingBuilder::CustomLogging::Backend( std::shared_ptr backend) { diff --git a/libs/server-sent-events/include/launchdarkly/sse/client.hpp b/libs/server-sent-events/include/launchdarkly/sse/client.hpp index 904904a23..9b9e191c6 100644 --- a/libs/server-sent-events/include/launchdarkly/sse/client.hpp +++ b/libs/server-sent-events/include/launchdarkly/sse/client.hpp @@ -160,7 +160,7 @@ class Builder { class Client { public: virtual ~Client() = default; - virtual void run() = 0; + virtual void async_connect() = 0; virtual void async_shutdown(std::function completion) = 0; }; diff --git a/libs/server-sent-events/include/launchdarkly/sse/error.hpp b/libs/server-sent-events/include/launchdarkly/sse/error.hpp index f9ac4b293..df6ffe0ea 100644 --- a/libs/server-sent-events/include/launchdarkly/sse/error.hpp +++ b/libs/server-sent-events/include/launchdarkly/sse/error.hpp @@ -6,5 +6,6 @@ enum class Error { NoContent = 1, InvalidRedirectLocation = 2, UnrecoverableClientError = 3, + ReadTimeout = 4, }; } diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index 8d8432382..142768242 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -79,33 +79,40 @@ class FoxyClient : public Client, read_timeout_(read_timeout), write_timeout_(write_timeout), req_(std::move(req)), - session_(std::move(executor), - launchdarkly::foxy::session_opts{ - ToOptRef(ssl_context_), - connect_timeout.value_or(kNoTimeout)}), + event_receiver_(std::move(receiver)), + logger_(std::move(logger)), + errors_(std::move(errors)), + body_parser_(std::nullopt), + session_(std::nullopt), + last_event_id_(std::nullopt), backoff_( initial_reconnect_delay.value_or(kDefaultInitialReconnectDelay), kDefaultMaxBackoffDelay), - last_event_id_(std::nullopt), - backoff_timer_(session_.get_executor()), - event_receiver_(std::move(receiver)), - logger_(std::move(logger)), - errors_(std::move(errors)) { + backoff_timer_(std::move(executor)), + last_read_(std::nullopt), + shutting_down_(false) { + create_session(); create_parser(); } - /** The body parser is recreated each time a connection is made because - * its internal state cannot be explicitly reset. - * - * Since SSE body will never end unless - * an error occurs, the body size limit must be removed. - */ + // The body parser is recreated each time a connection is made because + // its internal state cannot be explicitly reset. void create_parser() { body_parser_.emplace(); + // Remove body read limit because an SSE stream can be infinite. body_parser_->body_limit(boost::none); body_parser_->get().body().on_event(event_receiver_); } + // The session is recreated each time a connection is made because its + // internal state cannot be explicitly reset. + void create_session() { + session_.emplace( + backoff_timer_.get_executor(), + launchdarkly::foxy::session_opts{ + ToOptRef(ssl_context_), connect_timeout_.value_or(kNoTimeout)}); + } + /** * Called whenever the connection needs to be reattempted, triggering * a timed wait for the current backoff duration. @@ -113,7 +120,7 @@ class FoxyClient : public Client, * The body parser's last SSE event ID must be cached so it can be added * as a header on the next request (since the parser is destroyed.) */ - void do_backoff(std::string const& reason) { + void async_backoff(std::string const& reason) { backoff_.fail(); if (auto id = body_parser_->get().body().last_event_id()) { @@ -131,6 +138,7 @@ class FoxyClient : public Client, logger_(msg.str()); + create_session(); create_parser(); backoff_timer_.expires_from_now(backoff_.delay()); backoff_timer_.async_wait(beast::bind_front_handler( @@ -141,11 +149,17 @@ class FoxyClient : public Client, if (ec == boost::asio::error::operation_aborted) { return; } - run(); + do_run(); } - void run() override { - session_.async_connect( + void async_connect() override { + boost::asio::post( + session_->get_executor(), + beast::bind_front_handler(&FoxyClient::do_run, shared_from_this())); + } + + void do_run() { + session_->async_connect( host_, port_, beast::bind_front_handler(&FoxyClient::on_connect, shared_from_this())); @@ -156,7 +170,7 @@ class FoxyClient : public Client, return; } if (ec) { - return do_backoff(ec.what()); + return async_backoff(ec.what()); } if (last_event_id_) { @@ -164,10 +178,10 @@ class FoxyClient : public Client, } else { req_.erase("last-event-id"); } - session_.opts.timeout = write_timeout_.value_or(kNoTimeout); - session_.async_write(req_, - beast::bind_front_handler(&FoxyClient::on_write, - shared_from_this())); + session_->opts.timeout = write_timeout_.value_or(kNoTimeout); + session_->async_write(req_, + beast::bind_front_handler(&FoxyClient::on_write, + shared_from_this())); } void on_write(boost::system::error_code ec, std::size_t amount) { @@ -176,11 +190,11 @@ class FoxyClient : public Client, return; } if (ec) { - return do_backoff(ec.what()); + return async_backoff(ec.what()); } - session_.opts.timeout = read_timeout_.value_or(kNoTimeout); - session_.async_read_header( + session_->opts.timeout = read_timeout_.value_or(kNoTimeout); + session_->async_read_header( *body_parser_, beast::bind_front_handler(&FoxyClient::on_headers, shared_from_this())); } @@ -191,12 +205,12 @@ class FoxyClient : public Client, return; } if (ec) { - return do_backoff(ec.what()); + return async_backoff(ec.what()); } if (!body_parser_->is_header_done()) { /* keep reading headers */ - return session_.async_read_header( + return session_->async_read_header( *body_parser_, beast::bind_front_handler(&FoxyClient::on_headers, shared_from_this())); @@ -212,11 +226,14 @@ class FoxyClient : public Client, return; } if (!correct_content_type(response)) { - return do_backoff("invalid Content-Type"); + return async_backoff("invalid Content-Type"); } + logger_("connected"); backoff_.succeed(); - return session_.async_read( + + last_read_ = std::chrono::steady_clock::now(); + return session_->async_read_some( *body_parser_, beast::bind_front_handler(&FoxyClient::on_read_body, shared_from_this())); @@ -244,14 +261,14 @@ class FoxyClient : public Client, if (status_class == beast::http::status_class::client_error) { if (recoverable_client_error(response.result())) { - return do_backoff(backoff_reason(response.result())); + return async_backoff(backoff_reason(response.result())); } errors_(Error::UnrecoverableClientError); return; } - do_backoff(backoff_reason(response.result())); + async_backoff(backoff_reason(response.result())); } static std::string backoff_reason(beast::http::status status) { @@ -262,38 +279,93 @@ class FoxyClient : public Client, void on_read_body(boost::system::error_code ec, std::size_t amount) { boost::ignore_unused(amount); - if (ec == boost::asio::error::operation_aborted) { - return; + if (ec) { + if (ec == boost::asio::error::operation_aborted) { + // operation_aborted can occur if the read timeout is reached or + // if we're shutting down, so shutting_down_ is needed to + // disambiguate. + if (shutting_down_) { + // End the chain of async operations. + return; + } + errors_(Error::ReadTimeout); + return async_backoff( + "aborting read of response body (timeout)"); + } + return async_backoff(ec.what()); + } + + // The server can indicate that the chunk encoded response is done + // by sending a final chunk + CRLF. The body parser will have + // detected this. + if (body_parser_->is_done()) { + return async_backoff("receiving final chunk"); + } + + log_and_update_last_read(amount); + return session_->async_read_some( + *body_parser_, beast::bind_front_handler(&FoxyClient::on_read_body, + shared_from_this())); + } + + /** Logs a message indicating that an async_read_some operation + * on the response's body has completed, and how long that operation took + * (using a monotonic clock.) + * + * This is useful for debugging timeout-related issues, like receiving a + * heartbeat message. + */ + void log_and_update_last_read(std::size_t amount) { + if (last_read_) { + auto sec_since_last_read = + std::chrono::duration_cast( + std::chrono::steady_clock::now() - *last_read_) + .count(); + logger_("read (" + std::to_string(amount) + ") bytes in (" + + std::to_string(sec_since_last_read) + ") sec"); + } else { + logger_("read (" + std::to_string(amount) + ") bytes"); } - do_backoff(ec.what()); + last_read_ = std::chrono::steady_clock::now(); } void async_shutdown(std::function completion) override { - boost::asio::post(session_.get_executor(), + // Get on the session's executor, otherwise the code in the completion + // handler could race. + boost::asio::post(session_->get_executor(), beast::bind_front_handler(&FoxyClient::do_shutdown, shared_from_this(), std::move(completion))); } void do_shutdown(std::function completion) { - session_.async_shutdown(beast::bind_front_handler( + shutting_down_ = true; + backoff_timer_.cancel(); + session_->async_shutdown(beast::bind_front_handler( &FoxyClient::on_shutdown, std::move(completion))); } static void on_shutdown(std::function completion, boost::system::error_code ec) { + // Because do_shutdown doesn't use shared_from_this() when initiating + // the async_shutdown op, the client may already be destroyed - hence + // this static method. boost::ignore_unused(ec); if (completion) { completion(); } } + // Some client errors are considered recoverable, meaning they could be + // resolved by reconnecting. Returns true if the status is one of those. static bool recoverable_client_error(beast::http::status status) { return (status == beast::http::status::bad_request || status == beast::http::status::request_timeout || status == beast::http::status::too_many_requests); } + // A naive comparison of content-type to 'text/event-stream' is incorrect + // because multiple content types may be present. static bool correct_content_type(FoxyClient::response const& response) { if (auto content_type = response.find("content-type"); content_type != response.end()) { @@ -303,12 +375,16 @@ class FoxyClient : public Client, return false; } + // If the server redirects, ensure the location header is present. static bool can_redirect(FoxyClient::response const& response) { return (response.result() == beast::http::status::moved_permanently || response.result() == beast::http::status::temporary_redirect) && response.find("location") != response.end(); } + // Generates a redirect URL from a base and provided location. Since the + // location might not be absolute, it may be necessary to resolve it against + // the base. static std::optional redirect_url( std::string orig_base, std::string orig_location) { @@ -330,25 +406,77 @@ class FoxyClient : public Client, } private: + // Optional, but necessary for establishing TLS connections. If it is + // omitted and the scheme is https://, the connection will fail. Can be + // reused across connections. std::optional ssl_context_; + std::string host_; std::string port_; + // If present, the max amount of time that can be spent resolving DNS + // and setting up the initial connection. This doesn't include writing the + // request nor receiving the response. std::optional connect_timeout_; + + // If present, the max amount of time that can be spent after reading bytes + // before more bytes must be read. Applies after sending the request, but + // before receiving the response headers. Intended to terminate connections + // where the sender has hung up. LaunchDarkly sends a heartbeat every 180 + // seconds, so the read timeout must be greater than this. std::optional read_timeout_; + + // If present, the max amount of time that can be spent sending the request. std::optional write_timeout_; + // The request sent to the server, which persists across reconnection + // attempts. http::request req_; + // Callback which executed whenever an SSE event is received. This is + // passed into the body_parser whenever it is constructed, so it needs to be + // stored here for future use. Builder::EventReceiver event_receiver_; + + // Callback executed when log messages must be generated. The provider of + // the callback dictates the log level, which at the moment of writing is + // 'debug'. Builder::LogCallback logger_; + + // Callback executed to report errors. This is the primary mechanism by + // which the client communicates error conditions to the user. Builder::ErrorCallback errors_; + // Customized parser (see parser.hpp) which repeatedly receives chunks of + // data and parses them into SSE events. It cannot be reused across + // connections, hence the optional so it can be destroyed easily. std::optional> body_parser_; - launchdarkly::foxy::client_session session_; + + // The primary network primitive used to read/write to the server. It is our + // responsibility to ensure it is called from only one thread at a time. It + // cannot be reused across connections, similar to the body_parser. + std::optional session_; + + // Stores the last known SSE event ID, which can be provided to the server + // upon reconnection. std::optional last_event_id_; + + // Computes the backoff delays used whenever the connection drops + // unnaturally. Backoff backoff_; + + // Used in concert with the backoff member to delay reconnection attempts. + // The timer must be cancelled when shutdown is requested, because an + // outstanding backoff delay might be in progress. boost::asio::steady_timer backoff_timer_; + + // Stores the timestamp of the last read in order to augment debug logging. + std::optional last_read_; + + // Upon completing a read, a new async read is generally initiated. This + // determines whether to return and thus end the operation gracefully (true) + // or to perform backoff (false). + bool shutting_down_; }; Builder::Builder(net::any_io_executor ctx, std::string url) diff --git a/vendor/foxy/include/foxy/impl/session/async_read.impl.hpp b/vendor/foxy/include/foxy/impl/session/async_read.impl.hpp index e187bef50..bcbb2d05a 100644 --- a/vendor/foxy/include/foxy/impl/session/async_read.impl.hpp +++ b/vendor/foxy/include/foxy/impl/session/async_read.impl.hpp @@ -36,6 +36,28 @@ basic_session::async_read(Parser& parser, ReadHandler&& h *this, std::forward(handler)); } +template +template +auto +basic_session::async_read_some(Parser& parser, ReadHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, std::size_t)>::return_type +{ + return ::launchdarkly::foxy::detail::async_timer( + [&parser, self = this, coro = boost::asio::coroutine()]( + auto& cb, boost::system::error_code ec = {}, std::size_t bytes_transferrred = 0) mutable { + BOOST_ASIO_CORO_REENTER(coro) + { + BOOST_ASIO_CORO_YIELD boost::beast::http::async_read_some(self->stream, self->buffer, parser, + std::move(cb)); + + cb.complete(ec, bytes_transferrred); + } + }, + *this, std::forward(handler)); +} + + } // namespace launchdarkly::foxy #endif // FOXY_IMPL_SESSION_ASYNC_READ_IMPL_HPP_ diff --git a/vendor/foxy/include/foxy/session.hpp b/vendor/foxy/include/foxy/session.hpp index 17fb94b95..ed1d212af 100644 --- a/vendor/foxy/include/foxy/session.hpp +++ b/vendor/foxy/include/foxy/session.hpp @@ -76,6 +76,12 @@ struct basic_session async_read(Parser& parser, ReadHandler&& handler) & -> typename boost::asio::async_result, void(boost::system::error_code, std::size_t)>::return_type; + + template + auto + async_read_some(Parser& parser, ReadHandler&& handler) & -> + typename boost::asio::async_result, + void(boost::system::error_code, std::size_t)>::return_type; template auto