diff --git a/libs/client-sdk/include/launchdarkly/client_side/api.hpp b/libs/client-sdk/include/launchdarkly/client_side/api.hpp index a8ffccaf9..4e9c8291f 100644 --- a/libs/client-sdk/include/launchdarkly/client_side/api.hpp +++ b/libs/client-sdk/include/launchdarkly/client_side/api.hpp @@ -12,9 +12,9 @@ #include "config/client.hpp" #include "context.hpp" #include "error.hpp" -#include "events/event_processor.hpp" #include "launchdarkly/client_side/data_source.hpp" #include "launchdarkly/client_side/data_sources/detail/data_source_status_manager.hpp" +#include "launchdarkly/client_side/event_processor.hpp" #include "launchdarkly/client_side/flag_manager/detail/flag_manager.hpp" #include "launchdarkly/client_side/flag_manager/detail/flag_updater.hpp" #include "logger.hpp" @@ -25,6 +25,11 @@ class Client { public: Client(Config config, Context context); + Client(Client&&) = delete; + Client(Client const&) = delete; + Client& operator=(Client) = delete; + Client& operator=(Client&& other) = delete; + using FlagKey = std::string; [[nodiscard]] std::unordered_map AllFlags() const; @@ -72,7 +77,7 @@ class Client { std::thread thread_; boost::asio::io_context ioc_; Context context_; - std::unique_ptr event_processor_; + std::unique_ptr event_processor_; std::unique_ptr data_source_; std::thread run_thread_; }; diff --git a/libs/common/include/events/event_processor.hpp b/libs/client-sdk/include/launchdarkly/client_side/event_processor.hpp similarity index 87% rename from libs/common/include/events/event_processor.hpp rename to libs/client-sdk/include/launchdarkly/client_side/event_processor.hpp index 35d698694..0893e6fb6 100644 --- a/libs/common/include/events/event_processor.hpp +++ b/libs/client-sdk/include/launchdarkly/client_side/event_processor.hpp @@ -2,7 +2,7 @@ #include "events/events.hpp" -namespace launchdarkly::events { +namespace launchdarkly::client_side { class IEventProcessor { public: @@ -12,7 +12,7 @@ class IEventProcessor { * capacity. * @param event InputEvent to deliver. */ - virtual void AsyncSend(InputEvent event) = 0; + virtual void AsyncSend(events::InputEvent event) = 0; /** * Asynchronously flush's the processor's events, returning as soon as * possible. Flushing may be a no-op if a flush is ongoing. @@ -34,4 +34,4 @@ class IEventProcessor { IEventProcessor() = default; }; -} // namespace launchdarkly::events +} // namespace launchdarkly::client_side diff --git a/libs/client-sdk/include/launchdarkly/client_side/event_processor/detail/event_processor.hpp b/libs/client-sdk/include/launchdarkly/client_side/event_processor/detail/event_processor.hpp new file mode 100644 index 000000000..f961d7a74 --- /dev/null +++ b/libs/client-sdk/include/launchdarkly/client_side/event_processor/detail/event_processor.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include +#include "config/client.hpp" +#include "config/detail/sdks.hpp" +#include "events/detail/asio_event_processor.hpp" +#include "launchdarkly/client_side/event_processor.hpp" +#include "logger.hpp" + +namespace launchdarkly::client_side::detail { + +class EventProcessor : public IEventProcessor { + public: + EventProcessor(boost::asio::any_io_executor const& io, + Config const& config, + Logger& logger); + void AsyncSend(events::InputEvent event) override; + void AsyncFlush() override; + void AsyncClose() override; + + private: + events::detail::AsioEventProcessor impl_; +}; + +} // namespace launchdarkly::client_side::detail diff --git a/libs/client-sdk/include/launchdarkly/client_side/event_processor/detail/null_event_processor.hpp b/libs/client-sdk/include/launchdarkly/client_side/event_processor/detail/null_event_processor.hpp new file mode 100644 index 000000000..55404c9b4 --- /dev/null +++ b/libs/client-sdk/include/launchdarkly/client_side/event_processor/detail/null_event_processor.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "launchdarkly/client_side/event_processor.hpp" + +namespace launchdarkly::client_side::detail { + +class NullEventProcessor : public IEventProcessor { + public: + NullEventProcessor() = default; + void AsyncSend(events::InputEvent event) override; + void AsyncFlush() override; + void AsyncClose() override; +}; +} // namespace launchdarkly::client_side::detail diff --git a/libs/client-sdk/src/CMakeLists.txt b/libs/client-sdk/src/CMakeLists.txt index aa849e3dc..eb128eff6 100644 --- a/libs/client-sdk/src/CMakeLists.txt +++ b/libs/client-sdk/src/CMakeLists.txt @@ -6,6 +6,8 @@ file(GLOB HEADER_LIST CONFIGURE_DEPENDS "${LaunchDarklyCPPClient_SOURCE_DIR}/include/launchdarkly/client_side/data_sources/detail/*.hpp" "${LaunchDarklyCPPClient_SOURCE_DIR}/include/launchdarkly/client_side/flag_manager/*.hpp" "${LaunchDarklyCPPClient_SOURCE_DIR}/include/launchdarkly/client_side/flag_manager/detail/*.hpp" + "${LaunchDarklyCPPClient_SOURCE_DIR}/include/launchdarkly/client_side/event_processor/*.hpp" + "${LaunchDarklyCPPClient_SOURCE_DIR}/include/launchdarkly/client_side/event_processor/detail/*.hpp" ) # Automatic library: static or dynamic based on user config. @@ -23,6 +25,8 @@ add_library(${LIBNAME} flag_manager/flag_change_event.cpp data_sources/data_source_status.cpp data_sources/data_source_status_manager.cpp + event_processor/event_processor.cpp + event_processor/null_event_processor.cpp boost_signal_connection.cpp ) diff --git a/libs/client-sdk/src/api.cpp b/libs/client-sdk/src/api.cpp index dad10674f..cc232e4d8 100644 --- a/libs/client-sdk/src/api.cpp +++ b/libs/client-sdk/src/api.cpp @@ -3,9 +3,10 @@ #include #include -#include "events/detail/asio_event_processor.hpp" #include "launchdarkly/client_side/data_sources/detail/polling_data_source.hpp" #include "launchdarkly/client_side/data_sources/detail/streaming_data_source.hpp" +#include "launchdarkly/client_side/event_processor/detail/event_processor.hpp" +#include "launchdarkly/client_side/event_processor/detail/null_event_processor.hpp" namespace launchdarkly::client_side { @@ -14,7 +15,7 @@ using launchdarkly::client_side::data_sources::DataSourceStatus; static std::unique_ptr MakeDataSource( Config const& config, Context const& context, - boost::asio::any_io_executor executor, + boost::asio::any_io_executor const& executor, flag_manager::detail::FlagUpdater& flag_updater, data_sources::detail::DataSourceStatusManager& status_manager, Logger& logger) { @@ -32,13 +33,7 @@ static std::unique_ptr MakeDataSource( Client::Client(Config config, Context context) : logger_(config.Logger()), context_(std::move(context)), - event_processor_( - std::make_unique( - ioc_.get_executor(), - config.Events(), - config.ServiceEndpoints(), - config.SdkKey(), - logger_)), + event_processor_(nullptr), flag_updater_(flag_manager_), data_source_(MakeDataSource(config, context_, @@ -47,6 +42,15 @@ Client::Client(Config config, Context context) status_manager_, logger_)), initialized_(false) { + if (config.Events().Enabled()) { + event_processor_ = std::make_unique( + ioc_.get_executor(), config, logger_); + } else { + event_processor_ = std::make_unique(); + } + + data_source_->Start(); + status_manager_.OnDataSourceStatusChange([this](auto status) { if (status.State() == DataSourceStatus::DataSourceState::kValid || status.State() == DataSourceStatus::DataSourceState::kShutdown || diff --git a/libs/client-sdk/src/event_processor/event_processor.cpp b/libs/client-sdk/src/event_processor/event_processor.cpp new file mode 100644 index 000000000..5b62207e9 --- /dev/null +++ b/libs/client-sdk/src/event_processor/event_processor.cpp @@ -0,0 +1,22 @@ +#include "launchdarkly/client_side/event_processor/detail/event_processor.hpp" + +namespace launchdarkly::client_side::detail { + +EventProcessor::EventProcessor(boost::asio::any_io_executor const& io, + Config const& config, + Logger& logger) + : impl_(io, config, logger) {} + +void EventProcessor::AsyncSend(launchdarkly::events::InputEvent event) { + impl_.AsyncSend(std::move(event)); +} + +void EventProcessor::AsyncFlush() { + impl_.AsyncFlush(); +} + +void EventProcessor::AsyncClose() { + impl_.AsyncClose(); +} + +} // namespace launchdarkly::client_side::detail diff --git a/libs/client-sdk/src/event_processor/null_event_processor.cpp b/libs/client-sdk/src/event_processor/null_event_processor.cpp new file mode 100644 index 000000000..f77965ac2 --- /dev/null +++ b/libs/client-sdk/src/event_processor/null_event_processor.cpp @@ -0,0 +1,10 @@ +#include "launchdarkly/client_side/event_processor/detail/null_event_processor.hpp" + +namespace launchdarkly::client_side::detail { + +void NullEventProcessor::AsyncSend(events::InputEvent event) {} + +void NullEventProcessor::AsyncFlush() {} + +void NullEventProcessor::AsyncClose() {} +} // namespace launchdarkly::client_side::detail diff --git a/libs/common/include/config/client.hpp b/libs/common/include/config/client.hpp index 31e019b60..5d77379a5 100644 --- a/libs/common/include/config/client.hpp +++ b/libs/common/include/config/client.hpp @@ -16,7 +16,10 @@ using AppInfoBuilder = config::detail::builders::AppInfoBuilder; using EndpointsBuilder = config::detail::builders::EndpointsBuilder; using ConfigBuilder = config::detail::builders::ConfigBuilder; using EventsBuilder = config::detail::builders::EventsBuilder; +using HttpPropertiesBuilder = + config::detail::builders::HttpPropertiesBuilder; using DataSourceBuilder = config::detail::builders::DataSourceBuilder; + using Config = config::detail::Config; } // namespace launchdarkly::client_side diff --git a/libs/common/include/config/detail/builders/events_builder.hpp b/libs/common/include/config/detail/builders/events_builder.hpp index c36ce6ec4..c9d700d89 100644 --- a/libs/common/include/config/detail/builders/events_builder.hpp +++ b/libs/common/include/config/detail/builders/events_builder.hpp @@ -34,6 +34,20 @@ class EventsBuilder { */ EventsBuilder(); + /** + * Specify if event-sending should be enabled or not. By default, + * events are enabled. + * @param enabled True to enable. + * @return Reference to this builder. + */ + EventsBuilder& Enabled(bool enabled); + + /** + * Alias for Enabled(false). + * @return Reference to this builder. + */ + EventsBuilder& Disable(); + /** * Sets the capacity of the event processor. When more events are generated * within the processor's flush interval than this value, events will be diff --git a/libs/common/include/config/detail/built/events.hpp b/libs/common/include/config/detail/built/events.hpp index 93132fdab..061a56cac 100644 --- a/libs/common/include/config/detail/built/events.hpp +++ b/libs/common/include/config/detail/built/events.hpp @@ -19,6 +19,8 @@ class Events final { friend class builders::EventsBuilder; /** * Constructs configuration for the event subsystem. + * @param enabled If event-sending is enabled. If false, no events will be + * sent to LaunchDarkly. * @param capacity How many events can queue in memory before new events * are dropped. * @param flush_interval How often events are automatically flushed to @@ -31,12 +33,22 @@ class Events final { * AllAttributesPrivate is false. * @param security Whether a plaintext or encrypted client should be used * for event delivery. + * @param flush_workers How many workers to use for concurrent event + * delivery. */ - Events(std::size_t capacity, + Events(bool enabled, + std::size_t capacity, std::chrono::milliseconds flush_interval, std::string path, bool all_attributes_private, - AttributeReference::SetType private_attrs); + AttributeReference::SetType private_attrs, + std::chrono::milliseconds delivery_retry_delay, + std::size_t flush_workers); + + /** + * Returns true if event-sending is enabled. + */ + [[nodiscard]] bool Enabled() const; /** * Capacity of the event processor. @@ -48,6 +60,12 @@ class Events final { */ [[nodiscard]] std::chrono::milliseconds FlushInterval() const; + /* + * If an event payload fails to be delivered and can be retried, how long + * to wait before retrying. + */ + [[nodiscard]] std::chrono::milliseconds DeliveryRetryDelay() const; + /** * Path component of the LaunchDarkly event delivery endpoint. */ @@ -63,12 +81,20 @@ class Events final { */ [[nodiscard]] AttributeReference::SetType const& PrivateAttributes() const; + /** + * Number of flush workers used for concurrent event delivery. + */ + [[nodiscard]] std::size_t FlushWorkers() const; + private: + bool enabled_; std::size_t capacity_; std::chrono::milliseconds flush_interval_; std::string path_; bool all_attributes_private_; AttributeReference::SetType private_attributes_; + std::chrono::milliseconds delivery_retry_delay_; + std::size_t flush_workers_; }; bool operator==(Events const& lhs, Events const& rhs); diff --git a/libs/common/include/config/detail/defaults.hpp b/libs/common/include/config/detail/defaults.hpp index 5d1639ae7..ab1a219f7 100644 --- a/libs/common/include/config/detail/defaults.hpp +++ b/libs/common/include/config/detail/defaults.hpp @@ -35,8 +35,14 @@ struct Defaults { } static auto Events() -> built::Events { - return {100, std::chrono::seconds(30), "/mobile", false, - AttributeReference::SetType()}; + return {true, + 100, + std::chrono::seconds(30), + "/mobile", + false, + AttributeReference::SetType(), + std::chrono::seconds(1), + 5}; } static auto HttpProperties() -> built::HttpProperties { @@ -74,8 +80,14 @@ struct Defaults { } static auto Events() -> built::Events { - return {10000, std::chrono::seconds(5), "/bulk", false, - AttributeReference::SetType()}; + return {true, + 10000, + std::chrono::seconds(5), + "/bulk", + false, + AttributeReference::SetType(), + std::chrono::seconds(1), + 5}; } static auto HttpProperties() -> built::HttpProperties { diff --git a/libs/common/include/config/server.hpp b/libs/common/include/config/server.hpp index eb8226404..41f4f1be5 100644 --- a/libs/common/include/config/server.hpp +++ b/libs/common/include/config/server.hpp @@ -16,6 +16,8 @@ using AppInfoBuilder = config::detail::builders::AppInfoBuilder; using EndpointsBuilder = config::detail::builders::EndpointsBuilder; using ConfigBuilder = config::detail::builders::ConfigBuilder; using EventsBuilder = config::detail::builders::EventsBuilder; +using HttpPropertiesBuilder = + config::detail::builders::HttpPropertiesBuilder; using DataSourceBuilder = config::detail::builders::DataSourceBuilder; using Config = config::detail::Config; diff --git a/libs/common/include/events/detail/asio_event_processor.hpp b/libs/common/include/events/detail/asio_event_processor.hpp index 60df8b5ad..d920a6d2e 100644 --- a/libs/common/include/events/detail/asio_event_processor.hpp +++ b/libs/common/include/events/detail/asio_event_processor.hpp @@ -7,39 +7,38 @@ #include #include #include -#include "config/detail/built/events.hpp" -#include "config/detail/built/service_endpoints.hpp" +#include +#include "config/detail/config.hpp" #include "context_filter.hpp" -#include "events/detail/conn_pool.hpp" +#include "events/detail/event_batch.hpp" #include "events/detail/outbox.hpp" #include "events/detail/summarizer.hpp" -#include "events/event_processor.hpp" +#include "events/detail/worker_pool.hpp" #include "events/events.hpp" #include "logger.hpp" +#include "network/detail/http_requester.hpp" namespace launchdarkly::events::detail { -class AsioEventProcessor : public IEventProcessor { +template +class AsioEventProcessor { public: AsioEventProcessor(boost::asio::any_io_executor const& io, - config::detail::built::Events const& config, - config::detail::built::ServiceEndpoints const& endpoints, - std::string authorization, + config::detail::Config const& config, Logger& logger); - void AsyncFlush() override; + void AsyncFlush(); - void AsyncSend(InputEvent event) override; + void AsyncSend(InputEvent event); - void AsyncClose() override; + void AsyncClose(); private: + using Clock = std::chrono::system_clock; enum class FlushTrigger { Automatic = 0, Manual = 1, }; - using RequestType = - boost::beast::http::request; boost::asio::any_io_executor io_; Outbox outbox_; @@ -48,13 +47,14 @@ class AsioEventProcessor : public IEventProcessor { std::chrono::milliseconds flush_interval_; boost::asio::steady_timer timer_; - std::string host_; - std::string path_; + std::string url_; std::string authorization_; + config::detail::built::HttpProperties http_props_; + boost::uuids::random_generator uuids_; - ConnPool conns_; + WorkerPool workers_; std::size_t inbox_capacity_; std::size_t inbox_size_; @@ -62,6 +62,9 @@ class AsioEventProcessor : public IEventProcessor { bool full_outbox_encountered_; bool full_inbox_encountered_; + bool permanent_delivery_failure_; + + std::optional last_known_past_time_; launchdarkly::ContextFilter filter_; @@ -69,7 +72,7 @@ class AsioEventProcessor : public IEventProcessor { void HandleSend(InputEvent event); - std::optional MakeRequest(); + std::optional CreateBatch(); void Flush(FlushTrigger flush_type); @@ -79,6 +82,9 @@ class AsioEventProcessor : public IEventProcessor { bool InboxIncrement(); void InboxDecrement(); + + void OnEventDeliveryResult(std::size_t count, + RequestWorker::DeliveryResult); }; } // namespace launchdarkly::events::detail diff --git a/libs/common/include/events/detail/conn_pool.hpp b/libs/common/include/events/detail/conn_pool.hpp deleted file mode 100644 index 8fe259139..000000000 --- a/libs/common/include/events/detail/conn_pool.hpp +++ /dev/null @@ -1,14 +0,0 @@ -#pragma once - -#include - -namespace launchdarkly::events::detail { - -class ConnPool { - public: - using RequestType = - boost::beast::http::request; - ConnPool(); - void Deliver(RequestType request); -}; -} // namespace launchdarkly::events::detail diff --git a/libs/common/include/events/detail/event_batch.hpp b/libs/common/include/events/detail/event_batch.hpp new file mode 100644 index 000000000..a9f861dd6 --- /dev/null +++ b/libs/common/include/events/detail/event_batch.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include "config/detail/built/http_properties.hpp" +#include "network/detail/http_requester.hpp" + +namespace launchdarkly::events::detail { + +/** + * EventBatch represents a batch of events being sent to LaunchDarkly as + * an HTTP request. + */ +class EventBatch { + public: + /** + * Constructs a new EventBatch. + * @param url Target of the request. + * @param http_props General HTTP properties for the request. + * @param events Array of events to be serialized in the request. + */ + EventBatch(std::string url, + config::detail::built::HttpProperties http_props, + boost::json::value const& events); + + /** + * Returns the number of events in the batch. + */ + [[nodiscard]] std::size_t Count() const; + + /** + * Returns the built HTTP request. + */ + [[nodiscard]] network::detail::HttpRequest const& Request() const; + + /** + * Returns the target of the request. + */ + [[nodiscard]] std::string Target() const; + + private: + std::size_t num_events_; + network::detail::HttpRequest request_; +}; + +} // namespace launchdarkly::events::detail diff --git a/libs/common/include/events/detail/null_event_processor.hpp b/libs/common/include/events/detail/null_event_processor.hpp deleted file mode 100644 index 7a0ac75bd..000000000 --- a/libs/common/include/events/detail/null_event_processor.hpp +++ /dev/null @@ -1,14 +0,0 @@ -#pragma once - -#include "events/event_processor.hpp" - -namespace launchdarkly::events::detail { - -class NullEventProcessor : public IEventProcessor { - public: - NullEventProcessor() = default; - void AsyncSend(InputEvent event) override; - void AsyncFlush() override; - void AsyncClose() override; -}; -} // namespace launchdarkly::events::detail diff --git a/libs/common/include/events/detail/parse_date_header.hpp b/libs/common/include/events/detail/parse_date_header.hpp new file mode 100644 index 000000000..96eeaddec --- /dev/null +++ b/libs/common/include/events/detail/parse_date_header.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include +#include +#include +#include + +namespace launchdarkly::events::detail { + +template +static std::optional ParseDateHeader( + std::string const& datetime) { + // TODO: SC-199582 + std::tm t = {}; + std::istringstream ss(datetime); + ss.imbue(std::locale("en_US.utf-8")); + ss >> std::get_time(&t, "%a, %d %b %Y %H:%M:%S GMT"); + if (ss.fail()) { + return std::nullopt; + } + std::time_t tt = std::mktime(&t); + return Clock::from_time_t(tt); +} + +} // namespace launchdarkly::events::detail diff --git a/libs/common/include/events/detail/request_worker.hpp b/libs/common/include/events/detail/request_worker.hpp new file mode 100644 index 000000000..b8c34b344 --- /dev/null +++ b/libs/common/include/events/detail/request_worker.hpp @@ -0,0 +1,167 @@ +#pragma once +#include +#include +#include +#include +#include +#include "events/detail/event_batch.hpp" +#include "logger.hpp" +#include "network/detail/asio_requester.hpp" +#include "network/detail/http_requester.hpp" + +namespace launchdarkly::events::detail { + +enum class State { + /* Worker is ready for a new job. */ + Idle = 1, + /* Worker is performing the 1st delivery. */ + FirstChance = 2, + /* Worker is performing the 2nd (final) delivery. */ + SecondChance = 3, + /* Worker has encountered an error that cannot be recovered from. */ + PermanentlyFailed = 4, +}; + +std::ostream& operator<<(std::ostream& out, State const& s); + +enum class Action { + /* No action necessary. */ + None = 0, + /* Free the current request. */ + Reset = 1, + /* Attempt to parse a Date header out of a request, and then free it. */ + ParseDateAndReset = 2, + /* Wait and then retry delivery of the same request. */ + Retry = 3, + /* Invoke the permanent failure callback; free current request. */ + NotifyPermanentFailure = 4, +}; + +std::ostream& operator<<(std::ostream& out, Action const& s); + +/** + * Computes the next (state, action) pair from an existing state and an HTTP + * result. + * @param state Current state. + * @param result HTTP result. + * @return Next state + action to take. + */ +std::pair NextState(State, + network::detail::HttpResult const& result); + +/** + * RequestWorker is responsible for initiating HTTP requests to deliver + * event payloads to LaunchDarkly. It will attempt re-delivery once if + * an HTTP failure is deemed recoverable. + * + * RequestWorkers are meant to be instantiated once and then used repeatedly + * as they become available. + */ +class RequestWorker { + public: + /* + * A delivery request can resolve as a PermanentFailureResult, + * meaning the request could not be delivered at all. In this case, the HTTP + * status code is made available for inspection. + */ + using PermanentFailureResult = network::detail::HttpResult::StatusCode; + + /* + * A delivery request can resolve as a ServerTimeResult, meaning the request + * succeeded *and* a valid server timestamp was received. + */ + using ServerTimeResult = std::chrono::system_clock::time_point; + + using DeliveryResult = + std::variant; + + /* + * A request made with AsyncDeliver may result in invocation of the + * provided ResultCallback in exactly two cases: + * - The delivery permanently failed. The callback will be invoked + * with the number of events in the batch, and a PermanentFailureResult. + * - The delivery succeeded and the server returned a valid timestamp. The + * callback will be invoked with the number of events in the batch, and a + * ServerTimeResult. + */ + using ResultCallback = std::function; + + /** + * Constructs a new RequestWorker. + * @param io The executor used to perform HTTP requests and retry + * operations. + * @param retry_after How long to wait after a recoverable failure before + * trying to deliver events again. + * @param logger Logger. + */ + RequestWorker(boost::asio::any_io_executor io, + std::chrono::milliseconds retry_after, + Logger& logger); + + /** + * Returns true if the worker is available for delivery. + */ + bool Available() const; + + /** + * Passes an EventBatch to the worker for delivery. The delivery may be + * retried exactly once if the failure is recoverable. + * + * Completion is not guaranteed; see documentation on ResultCallback for + * info. + * + * @param batch The events to deliver. + */ + template + auto AsyncDeliver(EventBatch batch, CompletionToken&& token) { + namespace asio = boost::asio; + namespace system = boost::system; + + using Sig = void(DeliveryResult); + using Result = asio::async_result, Sig>; + using Handler = typename Result::completion_handler_type; + + Handler handler(std::forward(token)); + Result result(handler); + + state_ = State::FirstChance; + batch_ = std::move(batch); + + LD_LOG(logger_, LogLevel::kDebug) + << "posting " << batch_->Count() << " events(s) to " + << batch_->Target() << " with payload: " + << batch_->Request().Body().value_or("(no body)"); + + requester_.Request(batch_->Request(), + [this, handler](network::detail::HttpResult result) { + OnDeliveryAttempt(std::move(result), + std::move(handler)); + }); + return result.get(); + } + + private: + /* Used to wait a specific amount of time after a failed request before + * trying again. */ + boost::asio::steady_timer timer_; + + /* How long to wait before trying again. */ + std::chrono::milliseconds retry_delay_; + + /* Current state of the worker. */ + State state_; + + /* Component used to perform HTTP operations. */ + network::detail::AsioRequester requester_; + + /* Current event batch; only present if AsyncDeliver was called and + * request is in-flight or a retry is taking place. */ + std::optional batch_; + + Logger& logger_; + + void OnDeliveryAttempt(network::detail::HttpResult request, + ResultCallback cb); +}; + +} // namespace launchdarkly::events::detail diff --git a/libs/common/include/events/detail/worker_pool.hpp b/libs/common/include/events/detail/worker_pool.hpp new file mode 100644 index 000000000..6d99f1f72 --- /dev/null +++ b/libs/common/include/events/detail/worker_pool.hpp @@ -0,0 +1,72 @@ +#pragma once + +#include +#include +#include +#include +#include +#include "events/detail/request_worker.hpp" +#include "logger.hpp" +#include "network/detail/asio_requester.hpp" +#include "network/detail/http_requester.hpp" +namespace launchdarkly::events::detail { + +/** + * WorkerPool represents a pool of workers capable of delivering event payloads + * to LaunchDarkly. In practice, this means initiating overlapping HTTP POSTs, + * assuming more than one worker is active. + * + * A WorkerPool's executor is meant to be executed from a single thread. + */ +class WorkerPool { + public: + using ServerTimeCallback = + std::function; + /** + * Constructs a new WorkerPool. + * @param io The executor used for all workers. + * @param pool_size How many workers to make available. + * @param delivery_retry_delay How long a worker should wait after a failed + * delivery before trying again. + * @param logger Logger. + */ + WorkerPool(boost::asio::any_io_executor io, + std::size_t pool_size, + std::chrono::milliseconds delivery_retry_delay, + Logger& logger); + + /** + * Attempts to find a free worker. If none are available, the completion + * handler is invoked with nullptr. + */ + template + auto Get(CompletionToken&& token) { + namespace asio = boost::asio; + namespace system = boost::system; + + using Sig = void(RequestWorker*); + using Result = asio::async_result, Sig>; + using Handler = typename Result::completion_handler_type; + + Handler handler(std::forward(token)); + Result result(handler); + + boost::asio::dispatch(io_, [this, handler]() mutable { + for (auto& worker : workers_) { + if (worker->Available()) { + handler(worker.get()); + return; + } + } + handler(nullptr); + }); + + return result.get(); + } + + private: + boost::asio::any_io_executor io_; + std::vector> workers_; +}; + +} // namespace launchdarkly::events::detail diff --git a/libs/common/src/CMakeLists.txt b/libs/common/src/CMakeLists.txt index b437c49e6..0d6e0dc9b 100644 --- a/libs/common/src/CMakeLists.txt +++ b/libs/common/src/CMakeLists.txt @@ -36,12 +36,13 @@ add_library(${LIBNAME} serialization/json_evaluation_result.cpp serialization/json_value.cpp serialization/events/json_events.cpp - events/null_event_processor.cpp events/asio_event_processor.cpp events/outbox.cpp - events/conn_pool.cpp + events/worker_pool.cpp events/summarizer.cpp events/client_events.cpp + events/request_worker.cpp + events/event_batch.cpp config/http_properties.cpp config/data_source_builder.cpp config/http_properties_builder.cpp diff --git a/libs/common/src/config/events.cpp b/libs/common/src/config/events.cpp index d0c754d0e..777255e8f 100644 --- a/libs/common/src/config/events.cpp +++ b/libs/common/src/config/events.cpp @@ -2,16 +2,26 @@ namespace launchdarkly::config::detail::built { -Events::Events(std::size_t capacity, +Events::Events(bool enabled, + std::size_t capacity, std::chrono::milliseconds flush_interval, std::string path, bool all_attributes_private, - AttributeReference::SetType private_attrs) - : capacity_(capacity), + AttributeReference::SetType private_attrs, + std::chrono::milliseconds delivery_retry_delay, + std::size_t flush_workers) + : enabled_(enabled), + capacity_(capacity), flush_interval_(flush_interval), path_(std::move(path)), all_attributes_private_(all_attributes_private), - private_attributes_(std::move(private_attrs)) {} + private_attributes_(std::move(private_attrs)), + delivery_retry_delay_(delivery_retry_delay), + flush_workers_(flush_workers) {} + +bool Events::Enabled() const { + return enabled_; +} std::size_t Events::Capacity() const { return capacity_; @@ -21,6 +31,10 @@ std::chrono::milliseconds Events::FlushInterval() const { return flush_interval_; } +std::chrono::milliseconds Events::DeliveryRetryDelay() const { + return delivery_retry_delay_; +} + std::string const& Events::Path() const { return path_; } @@ -33,11 +47,17 @@ AttributeReference::SetType const& Events::PrivateAttributes() const { return private_attributes_; } +std::size_t Events::FlushWorkers() const { + return flush_workers_; +} + bool operator==(Events const& lhs, Events const& rhs) { return lhs.Path() == rhs.Path() && lhs.FlushInterval() == rhs.FlushInterval() && lhs.Capacity() == rhs.Capacity() && lhs.AllAttributesPrivate() == rhs.AllAttributesPrivate() && - lhs.PrivateAttributes() == rhs.PrivateAttributes(); + lhs.PrivateAttributes() == rhs.PrivateAttributes() && + lhs.DeliveryRetryDelay() == rhs.DeliveryRetryDelay() && + lhs.FlushWorkers() == rhs.FlushWorkers(); } } // namespace launchdarkly::config::detail::built diff --git a/libs/common/src/config/events_builder.cpp b/libs/common/src/config/events_builder.cpp index 518abcccc..f8f5ab85b 100644 --- a/libs/common/src/config/events_builder.cpp +++ b/libs/common/src/config/events_builder.cpp @@ -8,6 +8,17 @@ namespace launchdarkly::config::detail::builders { template EventsBuilder::EventsBuilder() : config_(Defaults::Events()) {} +template +EventsBuilder& EventsBuilder::Enabled(bool enabled) { + config_.enabled_ = enabled; + return *this; +} + +template +EventsBuilder& EventsBuilder::Disable() { + return Enabled(false); +} + template EventsBuilder& EventsBuilder::Capacity(std::size_t capacity) { config_.capacity_ = capacity; diff --git a/libs/common/src/events/asio_event_processor.cpp b/libs/common/src/events/asio_event_processor.cpp index 9b3656dd4..cf2b30e54 100644 --- a/libs/common/src/events/asio_event_processor.cpp +++ b/libs/common/src/events/asio_event_processor.cpp @@ -2,9 +2,11 @@ #include #include #include - #include #include +#include "config/detail/builders/http_properties_builder.hpp" +#include "config/detail/sdks.hpp" +#include "network/detail/asio_requester.hpp" #include "serialization/events/json_events.hpp" @@ -15,32 +17,51 @@ auto const kEventSchemaHeader = "X-LaunchDarkly-Event-Schema"; auto const kPayloadIdHeader = "X-LaunchDarkly-Payload-Id"; auto const kEventSchemaVersion = 4; -AsioEventProcessor::AsioEventProcessor( +// These helpers are for usage with std::visit. +template +struct overloaded : Ts... { + using Ts::operator()...; +}; +// explicit deduction guide (not needed as of C++20) +template +overloaded(Ts...) -> overloaded; + +template +AsioEventProcessor::AsioEventProcessor( boost::asio::any_io_executor const& io, - config::detail::built::Events const& config, - config::detail::built::ServiceEndpoints const& endpoints, - std::string authorization, + config::detail::Config const& config, Logger& logger) : io_(boost::asio::make_strand(io)), - outbox_(config.Capacity()), + outbox_(config.Events().Capacity()), summarizer_(std::chrono::system_clock::now()), - flush_interval_(config.FlushInterval()), + flush_interval_(config.Events().FlushInterval()), timer_(io_), - host_(endpoints.EventsBaseUrl()), // TODO: parse and use host - path_(config.Path()), - authorization_(std::move(authorization)), + url_(config.ServiceEndpoints().EventsBaseUrl() + config.Events().Path()), + http_props_(config.HttpProperties()), + authorization_(config.SdkKey()), uuids_(), - inbox_capacity_(config.Capacity()), + workers_(io_, + config.Events().FlushWorkers(), + config.Events().DeliveryRetryDelay(), + logger), + inbox_capacity_(config.Events().Capacity()), inbox_size_(0), full_outbox_encountered_(false), full_inbox_encountered_(false), - filter_(config.AllAttributesPrivate(), config.PrivateAttributes()), + permanent_delivery_failure_(false), + last_known_past_time_(std::nullopt), + filter_(config.Events().AllAttributesPrivate(), + config.Events().PrivateAttributes()), logger_(logger) { ScheduleFlush(); } -bool AsioEventProcessor::InboxIncrement() { +template +bool AsioEventProcessor::InboxIncrement() { std::lock_guard guard{inbox_mutex_}; + if (permanent_delivery_failure_) { + return false; + } if (inbox_size_ < inbox_capacity_) { inbox_size_++; return true; @@ -55,14 +76,16 @@ bool AsioEventProcessor::InboxIncrement() { return false; } -void AsioEventProcessor::InboxDecrement() { +template +void AsioEventProcessor::InboxDecrement() { std::lock_guard guard{inbox_mutex_}; if (inbox_size_ > 0) { inbox_size_--; } } -void AsioEventProcessor::AsyncSend(InputEvent input_event) { +template +void AsioEventProcessor::AsyncSend(InputEvent input_event) { if (!InboxIncrement()) { return; } @@ -72,7 +95,8 @@ void AsioEventProcessor::AsyncSend(InputEvent input_event) { }); } -void AsioEventProcessor::HandleSend(InputEvent event) { +template +void AsioEventProcessor::HandleSend(InputEvent event) { std::vector output_events = Process(std::move(event)); bool inserted = outbox_.PushDiscardingOverflow(std::move(output_events)); @@ -84,20 +108,56 @@ void AsioEventProcessor::HandleSend(InputEvent event) { full_outbox_encountered_ = !inserted; } -void AsioEventProcessor::Flush(FlushTrigger flush_type) { - if (auto request = MakeRequest()) { - conns_.Deliver(*request); - } else { - LD_LOG(logger_, LogLevel::kDebug) - << "event-processor: nothing to flush"; - } - summarizer_ = Summarizer(std::chrono::system_clock::now()); +template +void AsioEventProcessor::Flush(FlushTrigger flush_type) { + workers_.Get([this](RequestWorker* worker) { + if (!worker) { + LD_LOG(logger_, LogLevel::kDebug) + << "event-processor: no flush workers available; skipping " + "flush"; + return; + } + auto batch = CreateBatch(); + if (!batch) { + LD_LOG(logger_, LogLevel::kDebug) + << "event-processor: nothing to flush"; + return; + } + worker->AsyncDeliver( + std::move(*batch), + [this](std::size_t count, RequestWorker::DeliveryResult result) { + OnEventDeliveryResult(count, result); + }); + summarizer_ = Summarizer(Clock::now()); + }); + if (flush_type == FlushTrigger::Automatic) { ScheduleFlush(); } } -void AsioEventProcessor::ScheduleFlush() { +template +void AsioEventProcessor::OnEventDeliveryResult( + std::size_t event_count, + RequestWorker::DeliveryResult result) { + boost::ignore_unused(event_count); + + std::visit( + overloaded{[&](Clock::time_point&& server_time) { + last_known_past_time_ = std::move(server_time); + }, + [&](network::detail::HttpResult::StatusCode status) { + std::lock_guard guard{this->inbox_mutex_}; + if (!permanent_delivery_failure_) { + timer_.cancel(); + permanent_delivery_failure_ = true; + } + }}, + std::move(result)); +} + +template +void AsioEventProcessor::ScheduleFlush() { LD_LOG(logger_, LogLevel::kDebug) << "event-processor: scheduling flush in " << flush_interval_.count() << "ms"; @@ -113,19 +173,21 @@ void AsioEventProcessor::ScheduleFlush() { }); } -void AsioEventProcessor::AsyncFlush() { +template +void AsioEventProcessor::AsyncFlush() { boost::asio::post(io_, [this] { boost::system::error_code ec; Flush(FlushTrigger::Manual); }); } -void AsioEventProcessor::AsyncClose() { +template +void AsioEventProcessor::AsyncClose() { timer_.cancel(); } -std::optional -AsioEventProcessor::MakeRequest() { +template +std::optional AsioEventProcessor::CreateBatch() { if (outbox_.Empty()) { return std::nullopt; } @@ -136,34 +198,22 @@ AsioEventProcessor::MakeRequest() { events.as_array().push_back(boost::json::value_from(summarizer_)); } - LD_LOG(logger_, LogLevel::kDebug) - << "event-processor: generating http request"; - - RequestType req; + // TODO(cwaldren): Template the event processor over SDK type? Add it into + // HttpProperties? + config::detail::builders::HttpPropertiesBuilder + props(http_props_); - req.set(http::field::host, host_); - req.method(http::verb::post); - req.set(http::field::content_type, "application/json"); - req.set(http::field::authorization, authorization_); - req.set(kEventSchemaHeader, std::to_string(kEventSchemaVersion)); - req.set(kPayloadIdHeader, boost::lexical_cast(uuids_())); - req.target(host_ + path_); + props.Header(kEventSchemaHeader, std::to_string(kEventSchemaVersion)); + props.Header(kPayloadIdHeader, boost::lexical_cast(uuids_())); + props.Header(to_string(http::field::authorization), authorization_); + props.Header(to_string(http::field::content_type), "application/json"); - req.body() = boost::json::serialize(events); - req.prepare_payload(); - return req; + return EventBatch(url_, props.Build(), events); } -// These helpers are for the std::visit within AsioEventProcessor::Process. -template -struct overloaded : Ts... { - using Ts::operator()...; -}; -// explicit deduction guide (not needed as of C++20) -template -overloaded(Ts...) -> overloaded; - -std::vector AsioEventProcessor::Process(InputEvent input_event) { +template +std::vector AsioEventProcessor::Process( + InputEvent input_event) { std::vector out; std::visit( overloaded{[&](client::FeatureEventParams&& event) { @@ -178,15 +228,20 @@ std::vector AsioEventProcessor::Process(InputEvent input_event) { auto debug_until_date = event.eval_result.debug_events_until_date(); + auto max_time = std::max( + std::chrono::system_clock::now(), + last_known_past_time_.value_or( + std::chrono::system_clock::time_point{})); + bool emit_debug_event = debug_until_date && - debug_until_date.value() > - std::chrono::system_clock::now(); + debug_until_date.value() > max_time; if (emit_debug_event) { out.emplace_back(client::DebugEvent{ base, filter_.filter(event.context)}); } + out.emplace_back(client::FeatureEvent{ std::move(base), event.context.kinds_to_keys()}); }, @@ -204,4 +259,7 @@ std::vector AsioEventProcessor::Process(InputEvent input_event) { return out; } + +template class AsioEventProcessor; + } // namespace launchdarkly::events::detail diff --git a/libs/common/src/events/conn_pool.cpp b/libs/common/src/events/conn_pool.cpp deleted file mode 100644 index 64febceef..000000000 --- a/libs/common/src/events/conn_pool.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "events/detail/conn_pool.hpp" -#include -namespace launchdarkly::events::detail { -ConnPool::ConnPool() {} - -void ConnPool::Deliver(RequestType request) { - std::cout << "making an HTTP request with body" << request.body().data() - << "\n"; -} -} // namespace launchdarkly::events::detail diff --git a/libs/common/src/events/event_batch.cpp b/libs/common/src/events/event_batch.cpp new file mode 100644 index 000000000..eb44e7298 --- /dev/null +++ b/libs/common/src/events/event_batch.cpp @@ -0,0 +1,27 @@ +#include "events/detail/event_batch.hpp" +#include + +namespace launchdarkly::events::detail { +EventBatch::EventBatch(std::string url, + config::detail::built::HttpProperties http_props, + boost::json::value const& events) + : num_events_(events.as_array().size()), + request_(url, + network::detail::HttpMethod::kPost, + http_props, + boost::json::serialize(events)) {} + +std::size_t EventBatch::Count() const { + return num_events_; +} + +network::detail::HttpRequest const& EventBatch::Request() const { + return request_; +} + +std::string EventBatch::Target() const { + return (request_.Https() ? "https://" : "http://") + request_.Host() + ":" + + request_.Port() + request_.Path(); +} + +} // namespace launchdarkly::events::detail diff --git a/libs/common/src/events/null_event_processor.cpp b/libs/common/src/events/null_event_processor.cpp deleted file mode 100644 index 87fdc0266..000000000 --- a/libs/common/src/events/null_event_processor.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include "events/detail/null_event_processor.hpp" - -namespace launchdarkly::events::detail { -void NullEventProcessor::AsyncSend(InputEvent event) {} - -void NullEventProcessor::AsyncFlush() {} - -void NullEventProcessor::AsyncClose() {} -} // namespace launchdarkly::events::detail diff --git a/libs/common/src/events/request_worker.cpp b/libs/common/src/events/request_worker.cpp new file mode 100644 index 000000000..3f5cc63b4 --- /dev/null +++ b/libs/common/src/events/request_worker.cpp @@ -0,0 +1,208 @@ +#include "events/detail/request_worker.hpp" +#include "events/detail/parse_date_header.hpp" + +namespace launchdarkly::events::detail { + +RequestWorker::RequestWorker(boost::asio::any_io_executor io, + std::chrono::milliseconds retry_after, + Logger& logger) + : timer_(io), + retry_delay_(retry_after), + state_(State::Idle), + requester_(timer_.get_executor()), + batch_(std::nullopt), + logger_(logger) {} + +bool RequestWorker::Available() const { + return state_ == State::Idle; +} + +// Returns true if the result is considered transient - meaning it should +// not stop the event processor from processing future events. +static bool IsTransientFailure(network::detail::HttpResult const& result) { + auto status = http::status(result.Status()); + + if (result.IsError() || + http::to_status_class(status) != http::status_class::client_error) { + return true; + } + + return status == http::status::bad_request || + status == http::status::request_timeout || + status == http::status::too_many_requests || + status == http::status::payload_too_large; +} + +// Returns true if the request should be retried or not. Meant to be called +// when IsTransientFailure returns true. +static bool IsRetryable(network::detail::HttpResult::StatusCode status) { + return http::status(status) != http::status::payload_too_large; +} + +static bool IsSuccess(network::detail::HttpResult const& result) { + return !result.IsError() && + http::to_status_class(http::status(result.Status())) == + http::status_class::successful; +} + +void RequestWorker::OnDeliveryAttempt(network::detail::HttpResult result, + ResultCallback callback) { + auto [next_state, action] = NextState(state_, result); + + LD_LOG(logger_, LogLevel::kDebug) << "flush-worker: " << state_ << " -> " + << next_state << ", " << action << ""; + + switch (action) { + case Action::None: + break; + case Action::Reset: + if (result.IsError()) { + LD_LOG(logger_, LogLevel::kWarn) + << "error posting " << batch_->Count() + << " event(s) (some events were dropped): " + << result.ErrorMessage().value_or("unknown IO error"); + } else { + LD_LOG(logger_, LogLevel::kWarn) + << "error posting " << batch_->Count() + << " event(s) (some events were dropped): " + "HTTP error " + << result.Status(); + } + batch_.reset(); + break; + case Action::NotifyPermanentFailure: + LD_LOG(logger_, LogLevel::kWarn) + << "error posting " << batch_->Count() + << " event(s) (giving up permanently): HTTP error " + << result.Status(); + callback(batch_->Count(), result.Status()); + batch_.reset(); + break; + case Action::ParseDateAndReset: { + auto headers = result.Headers(); + if (auto date = headers.find("Date"); date != headers.end()) { + if (auto server_time = + ParseDateHeader( + date->second)) { + callback(batch_->Count(), *server_time); + } + } + batch_.reset(); + } break; + case Action::Retry: + if (result.IsError()) { + LD_LOG(logger_, LogLevel::kWarn) + << "error posting " << batch_->Count() + << " event(s) (will retry): " + << result.ErrorMessage().value_or("unknown IO error"); + } else { + LD_LOG(logger_, LogLevel::kWarn) + << "error posting " << batch_->Count() + << " event(s) (will retry): HTTP error " << result.Status(); + } + timer_.expires_from_now(retry_delay_); + timer_.async_wait([this, callback](boost::system::error_code ec) { + if (ec) { + return; + } + requester_.Request( + batch_->Request(), + [this, callback](network::detail::HttpResult result) { + OnDeliveryAttempt(std::move(result), + std::move(callback)); + }); + }); + break; + } + + state_ = next_state; +} + +std::pair NextState(State state, + network::detail::HttpResult const& result) { + std::optional action; + std::optional next_state; + + switch (state) { + case State::Idle: + return {state, Action::None}; + case State::PermanentlyFailed: + return {state, Action::None}; + case State::FirstChance: + if (IsSuccess(result)) { + next_state = State::Idle; + action = Action::ParseDateAndReset; + } else if (IsTransientFailure(result)) { + if (IsRetryable(result.Status())) { + next_state = State::SecondChance; + action = Action::Retry; + } else { + next_state = State::Idle; + action = Action::Reset; + } + } else { + next_state = State::PermanentlyFailed; + action = Action::NotifyPermanentFailure; + } + break; + case State::SecondChance: + if (IsSuccess(result)) { + next_state = State::Idle; + action = Action::ParseDateAndReset; + } else if (IsTransientFailure(result)) { + // no more delivery attempts; payload is dropped. + next_state = State::Idle; + action = Action::Reset; + } else { + next_state = State::PermanentlyFailed; + action = Action::NotifyPermanentFailure; + } + break; + } + + assert(action.has_value() && "NextState must generate an action"); + assert(next_state.has_value() && "NextState must generate a new state"); + + return {*next_state, *action}; +} + +std::ostream& operator<<(std::ostream& out, State const& s) { + switch (s) { + case State::Idle: + out << "State::Idle"; + break; + case State::FirstChance: + out << "State::FirstChance"; + break; + case State::SecondChance: + out << "State::SecondChance"; + break; + case State::PermanentlyFailed: + out << "State::PermanentlyFailed"; + break; + } + return out; +} + +std::ostream& operator<<(std::ostream& out, Action const& s) { + switch (s) { + case Action::None: + out << "Action::None"; + break; + case Action::Reset: + out << "Action::Reset"; + break; + case Action::ParseDateAndReset: + out << "Action::ParseDateAndReset"; + break; + case Action::Retry: + out << "Action::Retry"; + break; + case Action::NotifyPermanentFailure: + out << "Action::NotifyPermanentFailure"; + break; + } + return out; +} + +} // namespace launchdarkly::events::detail diff --git a/libs/common/src/events/worker_pool.cpp b/libs/common/src/events/worker_pool.cpp new file mode 100644 index 000000000..55682be71 --- /dev/null +++ b/libs/common/src/events/worker_pool.cpp @@ -0,0 +1,18 @@ +#include "events/detail/worker_pool.hpp" +#include +#include "events/detail/request_worker.hpp" + +namespace launchdarkly::events::detail { + +WorkerPool::WorkerPool(boost::asio::any_io_executor io, + std::size_t pool_size, + std::chrono::milliseconds delivery_retry_delay, + Logger& logger) + : io_(io), workers_() { + for (std::size_t i = 0; i < pool_size; i++) { + workers_.emplace_back( + std::make_unique(io_, delivery_retry_delay, logger)); + } +} + +} // namespace launchdarkly::events::detail diff --git a/libs/common/tests/event_processor_test.cpp b/libs/common/tests/event_processor_test.cpp index b5b17ff6c..e2fc8eb73 100644 --- a/libs/common/tests/event_processor_test.cpp +++ b/libs/common/tests/event_processor_test.cpp @@ -7,8 +7,10 @@ #include "context_builder.hpp" #include "events/client_events.hpp" #include "events/detail/asio_event_processor.hpp" +#include "events/detail/worker_pool.hpp" using namespace launchdarkly::events::detail; +using namespace launchdarkly::network::detail; static std::chrono::system_clock::time_point TimeZero() { return std::chrono::system_clock::time_point{}; @@ -18,6 +20,40 @@ static std::chrono::system_clock::time_point Time1000() { return std::chrono::system_clock::from_time_t(1); } +TEST(WorkerPool, PoolReturnsAvailableWorker) { + using namespace launchdarkly; + Logger logger{std::make_unique(LogLevel::kDebug, "test")}; + boost::asio::io_context ioc; + + auto work = boost::asio::make_work_guard(ioc); + std::thread t([&]() { ioc.run(); }); + + WorkerPool pool(ioc.get_executor(), 1, std::chrono::seconds(1), logger); + + RequestWorker* worker = pool.Get(boost::asio::use_future).get(); + ASSERT_TRUE(worker); + + work.reset(); + t.join(); +} + +TEST(WorkerPool, PoolReturnsNullptrWhenNoWorkerAvaialable) { + using namespace launchdarkly; + Logger logger{std::make_unique(LogLevel::kDebug, "test")}; + boost::asio::io_context ioc; + + auto work = boost::asio::make_work_guard(ioc); + std::thread t([&]() { ioc.run(); }); + + WorkerPool pool(ioc.get_executor(), 0, std::chrono::seconds(1), logger); + + RequestWorker* worker = pool.Get(boost::asio::use_future).get(); + ASSERT_FALSE(worker); + + work.reset(); + t.join(); +} + // This test is a temporary test that exists only to ensure the event processor // compiles; it should be replaced by more robust tests (and contract tests.) TEST(EventProcessorTests, ProcessorCompiles) { @@ -26,15 +62,16 @@ TEST(EventProcessorTests, ProcessorCompiles) { Logger logger{std::make_unique(LogLevel::kDebug, "test")}; boost::asio::io_context ioc; - auto config = client_side::EventsBuilder() - .Capacity(10) - .FlushInterval(std::chrono::seconds(1)) - .Build(); + auto config = + client_side::ConfigBuilder("sdk-123") + .Events(client_side::EventsBuilder().Capacity(10).FlushInterval( + std::chrono::seconds(1))) + .Build(); - auto endpoints = client_side::EndpointsBuilder().Build(); + ASSERT_TRUE(config); - events::detail::AsioEventProcessor processor( - ioc.get_executor(), *config, *endpoints, "password", logger); + events::detail::AsioEventProcessor processor(ioc.get_executor(), *config, + logger); std::thread ioc_thread([&]() { ioc.run(); }); auto context = launchdarkly::ContextBuilder().kind("org", "ld").build(); @@ -48,6 +85,7 @@ TEST(EventProcessorTests, ProcessorCompiles) { for (std::size_t i = 0; i < 10; i++) { processor.AsyncSend(identify_event); } + processor.AsyncClose(); ioc_thread.join(); } diff --git a/libs/common/tests/request_worker_test.cpp b/libs/common/tests/request_worker_test.cpp new file mode 100644 index 000000000..c4b45b6e1 --- /dev/null +++ b/libs/common/tests/request_worker_test.cpp @@ -0,0 +1,102 @@ +#include "events/detail/request_worker.hpp" +#include +#include "network/detail/http_requester.hpp" + +using namespace launchdarkly::events::detail; +using namespace launchdarkly::network::detail; + +struct TestCase { + State state; + HttpResult result; + State expected_state; + Action expected_action; +}; + +class StateMachineFixture : public ::testing::TestWithParam {}; + +TEST_P(StateMachineFixture, StatesAndActionsAreComputedCorrectly) { + auto params = GetParam(); + auto [next_state, action] = NextState(params.state, params.result); + ASSERT_EQ(params.expected_state, next_state); + ASSERT_EQ(params.expected_action, action); +} + +INSTANTIATE_TEST_SUITE_P( + SuccessfulRequests, + StateMachineFixture, + ::testing::Values( + TestCase{State::FirstChance, + HttpResult(200, "200 ok!", HttpResult::HeadersType{}), + State::Idle, Action::ParseDateAndReset}, + TestCase{State::FirstChance, + HttpResult(201, "201 created!", HttpResult::HeadersType{}), + State::Idle, Action::ParseDateAndReset}, + TestCase{State::FirstChance, + HttpResult(202, "202 accepted!", HttpResult::HeadersType{}), + State::Idle, Action::ParseDateAndReset})); + +INSTANTIATE_TEST_SUITE_P( + RecoverableErrors, + StateMachineFixture, + ::testing::Values( + TestCase{State::FirstChance, + HttpResult(400, "400 bad request!", HttpResult::HeadersType{}), + State::SecondChance, Action::Retry}, + TestCase{ + State::FirstChance, + HttpResult(408, "408 request timeout!", HttpResult::HeadersType{}), + State::SecondChance, Action::Retry}, + TestCase{State::FirstChance, + HttpResult(429, + "429 too many requests!", + HttpResult::HeadersType{}), + State::SecondChance, Action::Retry}, + TestCase{State::FirstChance, + HttpResult(500, + "500 internal server error!", + HttpResult::HeadersType{}), + State::SecondChance, Action::Retry}, + TestCase{State::FirstChance, HttpResult("generic connection error!"), + State::SecondChance, Action::Retry}, + TestCase{State::FirstChance, + HttpResult(413, "413 too large!", HttpResult::HeadersType{}), + State::Idle, Action::Reset})); + +INSTANTIATE_TEST_SUITE_P( + PermanentErrors, + StateMachineFixture, + ::testing::Values( + TestCase{State::FirstChance, + HttpResult(404, "404 not found!", HttpResult::HeadersType{}), + State::PermanentlyFailed, Action::NotifyPermanentFailure}, + TestCase{ + State::FirstChance, + HttpResult(418, "418 i'm a teapot!", HttpResult::HeadersType{}), + State::PermanentlyFailed, Action::NotifyPermanentFailure}, + TestCase{ + State::FirstChance, + HttpResult(401, "401 unauthorized!", HttpResult::HeadersType{}), + State::PermanentlyFailed, Action::NotifyPermanentFailure})); + +INSTANTIATE_TEST_SUITE_P( + SecondChanceOutcomes, + StateMachineFixture, + ::testing::Values( + TestCase{State::SecondChance, + HttpResult(404, "404 not found!", HttpResult::HeadersType{}), + State::PermanentlyFailed, Action::NotifyPermanentFailure}, + TestCase{State::SecondChance, + HttpResult(400, "400 bad request!", HttpResult::HeadersType{}), + State::Idle, Action::Reset}, + TestCase{ + State::SecondChance, + HttpResult(408, "408 request timeout!", HttpResult::HeadersType{}), + State::Idle, Action::Reset}, + TestCase{State::SecondChance, + HttpResult(429, + "429 too many requests!", + HttpResult::HeadersType{}), + State::Idle, Action::Reset}, + TestCase{State::SecondChance, + HttpResult(200, "200 ok!", HttpResult::HeadersType{}), + State::Idle, Action::ParseDateAndReset}));