diff --git a/.github/workflows/sse.yml b/.github/workflows/sse.yml index 982c198c4..713ae37f5 100644 --- a/.github/workflows/sse.yml +++ b/.github/workflows/sse.yml @@ -36,4 +36,3 @@ jobs: with: repo: 'sse-contract-tests' test_service_port: ${{ env.TEST_SERVICE_PORT }} - extra_params: '-skip HTTP' diff --git a/apps/sse-contract-tests/include/definitions.hpp b/apps/sse-contract-tests/include/definitions.hpp index 34539a279..0925fad9b 100644 --- a/apps/sse-contract-tests/include/definitions.hpp +++ b/apps/sse-contract-tests/include/definitions.hpp @@ -84,3 +84,10 @@ struct CommentMessage { }; NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(CommentMessage, kind, comment); + +struct ErrorMessage { + std::string kind; + std::string comment; +}; + +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(ErrorMessage, kind, comment); diff --git a/apps/sse-contract-tests/include/event_outbox.hpp b/apps/sse-contract-tests/include/event_outbox.hpp index a8598bc44..20e609051 100644 --- a/apps/sse-contract-tests/include/event_outbox.hpp +++ b/apps/sse-contract-tests/include/event_outbox.hpp @@ -12,6 +12,7 @@ #include #include +#include namespace beast = boost::beast; namespace http = beast::http; @@ -44,23 +45,34 @@ class EventOutbox : public std::enable_shared_from_this { * @param callback_url Target URL. */ EventOutbox(net::any_io_executor executor, std::string callback_url); + /** - * Enqueues an event, which will be posted to the server + * Queues an event, which will be posted to the server * later. * @param event Event to post. */ void post_event(launchdarkly::sse::Event event); + + /** + * Queues an error, which will be posted to the server later. + * @param error Error to post. + */ + void post_error(launchdarkly::sse::Error error); + /** * Begins an async operation to connect to the server. */ void run(); + /** * Begins an async operation to disconnect from the server. */ void stop(); private: - RequestType build_request(std::size_t counter, launchdarkly::sse::Event ev); + RequestType build_request( + std::size_t counter, + std::variant ev); void on_resolve(beast::error_code ec, tcp::resolver::results_type results); void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type); diff --git a/apps/sse-contract-tests/src/entity_manager.cpp b/apps/sse-contract-tests/src/entity_manager.cpp index 215d43940..8c9fd1d7f 100644 --- a/apps/sse-contract-tests/src/entity_manager.cpp +++ b/apps/sse-contract-tests/src/entity_manager.cpp @@ -43,6 +43,9 @@ std::optional EntityManager::create(ConfigParams const& params) { copy->post_event(std::move(e)); }); + client_builder.errors( + [copy = poster](launchdarkly::sse::Error e) { copy->post_error(e); }); + auto client = client_builder.build(); if (!client) { LD_LOG(logger_, LogLevel::kWarn) diff --git a/apps/sse-contract-tests/src/event_outbox.cpp b/apps/sse-contract-tests/src/event_outbox.cpp index aac1901c6..e1c62d25b 100644 --- a/apps/sse-contract-tests/src/event_outbox.cpp +++ b/apps/sse-contract-tests/src/event_outbox.cpp @@ -40,6 +40,12 @@ void EventOutbox::post_event(launchdarkly::sse::Event event) { flush_timer_.expires_from_now(boost::posix_time::milliseconds(0)); } +void EventOutbox::post_error(launchdarkly::sse::Error error) { + auto http_request = build_request(callback_counter_++, error); + outbox_.push(http_request); + flush_timer_.expires_from_now(boost::posix_time::milliseconds(0)); +} + void EventOutbox::run() { resolver_.async_resolve(callback_host_, callback_port_, beast::bind_front_handler(&EventOutbox::on_resolve, @@ -57,7 +63,7 @@ void EventOutbox::stop() { EventOutbox::RequestType EventOutbox::build_request( std::size_t counter, - launchdarkly::sse::Event ev) { + std::variant ev) { RequestType req; req.set(http::field::host, callback_host_); @@ -66,11 +72,34 @@ EventOutbox::RequestType EventOutbox::build_request( nlohmann::json json; - if (ev.type() == "comment") { - json = CommentMessage{"comment", std::move(ev).take()}; - } else { - json = EventMessage{"event", Event{ev}}; - } + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + if (arg.type() == "comment") { + json = CommentMessage{"comment", std::move(arg).take()}; + } else { + json = EventMessage{"event", Event{std::move(arg)}}; + } + } else if constexpr (std::is_same_v) { + using launchdarkly::sse::Error; + auto msg = ErrorMessage{"error"}; + switch (arg) { + case Error::NoContent: + msg.comment = "no content"; + break; + case Error::InvalidRedirectLocation: + msg.comment = "invalid redirect location"; + break; + case Error::UnrecoverableClientError: + msg.comment = "unrecoverable client error"; + default: + msg.comment = "unspecified error"; + } + json = msg; + } + }, + std::move(ev)); req.body() = json.dump(); req.prepare_payload(); diff --git a/libs/server-sent-events/include/launchdarkly/sse/client.hpp b/libs/server-sent-events/include/launchdarkly/sse/client.hpp index bb2271763..1d7046c08 100644 --- a/libs/server-sent-events/include/launchdarkly/sse/client.hpp +++ b/libs/server-sent-events/include/launchdarkly/sse/client.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -29,8 +30,9 @@ class Client; */ class Builder { public: - using EventReceiver = std::function; + using EventReceiver = std::function; using LogCallback = std::function; + using ErrorCallback = std::function; /** * Create a builder for the given URL. If the port is omitted, 443 is @@ -114,6 +116,13 @@ class Builder { */ Builder& logger(LogCallback callback); + /** + * Specify an error reporting callback for the Client. + * @param callback Callback to receive an error from the Client. + * @return Reference to this builder. + */ + Builder& errors(ErrorCallback callback); + /** * Builds a Client. The shared pointer is necessary to extend the lifetime * of the Client to encompass each asynchronous operation that it performs. @@ -131,6 +140,7 @@ class Builder { std::optional connect_timeout_; LogCallback logging_cb_; EventReceiver receiver_; + ErrorCallback error_cb_; }; /** diff --git a/libs/server-sent-events/include/launchdarkly/sse/error.hpp b/libs/server-sent-events/include/launchdarkly/sse/error.hpp new file mode 100644 index 000000000..f9ac4b293 --- /dev/null +++ b/libs/server-sent-events/include/launchdarkly/sse/error.hpp @@ -0,0 +1,10 @@ +#pragma once + +namespace launchdarkly::sse { + +enum class Error { + NoContent = 1, + InvalidRedirectLocation = 2, + UnrecoverableClientError = 3, +}; +} diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index 74a9086b9..bdefce742 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -14,7 +14,7 @@ #include #include - +#include #include #include #include @@ -58,6 +58,7 @@ class FoxyClient : public Client, std::optional write_timeout, Builder::EventReceiver receiver, Builder::LogCallback logger, + Builder::ErrorCallback errors, std::optional maybe_ssl) : ssl_context_(std::move(maybe_ssl)), host_(std::move(host)), @@ -74,12 +75,13 @@ class FoxyClient : public Client, last_event_id_(std::nullopt), backoff_timer_(session_.get_executor()), event_receiver_(std::move(receiver)), - logger_(std::move(logger)) { + logger_(std::move(logger)), + errors_(std::move(errors)) { create_parser(); } - /** The body parser is recreated each time a connection is made because its - * internal state cannot be explicitly reset. + /** The body parser is recreated each time a connection is made because + * its internal state cannot be explicitly reset. * * Since SSE body will never end unless * an error occurs, the body size limit must be removed. @@ -100,6 +102,12 @@ class FoxyClient : public Client, void do_backoff(std::string const& reason) { backoff_.fail(); + if (auto id = body_parser_->get().body().last_event_id()) { + if (!id->empty()) { + last_event_id_ = id; + } + } + std::stringstream msg; msg << "backing off in (" << std::chrono::duration_cast( @@ -109,7 +117,6 @@ class FoxyClient : public Client, logger_(msg.str()); - last_event_id_ = body_parser_->get().body().last_event_id(); create_parser(); backoff_timer_.expires_from_now(backoff_.delay()); backoff_timer_.async_wait(beast::bind_front_handler( @@ -138,7 +145,7 @@ class FoxyClient : public Client, return do_backoff(ec.what()); } - if (last_event_id_ && !last_event_id_->empty()) { + if (last_event_id_) { req_.set("last-event-id", *last_event_id_); } else { req_.erase("last-event-id"); @@ -186,6 +193,10 @@ class FoxyClient : public Client, auto status_class = beast::http::to_status_class(response.result()); if (status_class == beast::http::status_class::successful) { + if (response.result() == beast::http::status::no_content) { + errors_(Error::NoContent); + return; + } if (!correct_content_type(response)) { return do_backoff("invalid Content-Type"); } @@ -197,13 +208,30 @@ class FoxyClient : public Client, shared_from_this())); } + if (status_class == beast::http::status_class::redirection) { + if (can_redirect(response)) { + auto new_url = + redirect_url("base", response.find("location")->value()); + + if (!new_url) { + errors_(Error::InvalidRedirectLocation); + return; + } + + req_.set(http::field::host, new_url->host()); + req_.target(new_url->encoded_target()); + } else { + errors_(Error::InvalidRedirectLocation); + return; + } + } + if (status_class == beast::http::status_class::client_error) { if (recoverable_client_error(response.result())) { return do_backoff(backoff_reason(response.result())); } - // TODO: error callback - + errors_(Error::UnrecoverableClientError); return; } @@ -244,11 +272,6 @@ class FoxyClient : public Client, } } - void fail(boost::system::error_code ec, std::string const& what) { - logger_("sse-client: " + what + ": " + ec.message()); - async_shutdown(nullptr); - } - static bool recoverable_client_error(beast::http::status status) { return (status == beast::http::status::bad_request || status == beast::http::status::request_timeout || @@ -264,21 +287,52 @@ class FoxyClient : public Client, return false; } + static bool can_redirect(FoxyClient::response const& response) { + return (response.result() == beast::http::status::moved_permanently || + response.result() == beast::http::status::temporary_redirect) && + response.find("location") != response.end(); + } + + static std::optional redirect_url( + std::string orig_base, + std::string orig_location) { + auto location = boost::urls::parse_uri(orig_location); + if (!location) { + return std::nullopt; + } + if (location->has_scheme()) { + return location.value(); + } + + boost::urls::url base(orig_base); + auto result = base.resolve(*location); + if (!result) { + return std::nullopt; + } + + return base; + } + private: std::optional ssl_context_; std::string host_; std::string port_; + std::optional connect_timeout_; std::optional read_timeout_; std::optional write_timeout_; + http::request req_; + Builder::EventReceiver event_receiver_; - std::optional > body_parser_; + Builder::LogCallback logger_; + Builder::ErrorCallback errors_; + + std::optional> body_parser_; launchdarkly::foxy::client_session session_; std::optional last_event_id_; Backoff backoff_; boost::asio::steady_timer backoff_timer_; - Builder::LogCallback logger_; }; Builder::Builder(net::any_io_executor ctx, std::string url) @@ -287,9 +341,9 @@ Builder::Builder(net::any_io_executor ctx, std::string url) read_timeout_{std::nullopt}, write_timeout_{std::nullopt}, connect_timeout_{std::nullopt}, - logging_cb_([](auto msg) {}) { - receiver_ = [](launchdarkly::sse::Event const&) {}; - + logging_cb_([](auto msg) {}), + receiver_([](launchdarkly::sse::Event const&) {}), + error_cb_([](auto err) {}) { request_.version(11); request_.set(http::field::user_agent, kDefaultUserAgent); request_.method(http::verb::get); @@ -332,11 +386,16 @@ Builder& Builder::receiver(EventReceiver receiver) { return *this; } -Builder& Builder::logger(std::function callback) { +Builder& Builder::logger(LogCallback callback) { logging_cb_ = std::move(callback); return *this; } +Builder& Builder::errors(ErrorCallback callback) { + error_cb_ = std::move(callback); + return *this; +} + std::shared_ptr Builder::build() { auto uri_components = boost::urls::parse_uri(url_); if (!uri_components) { @@ -376,7 +435,8 @@ std::shared_ptr Builder::build() { return std::make_shared( net::make_strand(executor_), request, host, service, connect_timeout_, - read_timeout_, write_timeout_, receiver_, logging_cb_, std::move(ssl)); + read_timeout_, write_timeout_, receiver_, logging_cb_, error_cb_, + std::move(ssl)); } } // namespace launchdarkly::sse