From 0cc15bee2be20c2b165c454f7a301ecbf1abbe9f Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Wed, 22 Nov 2023 10:16:54 -0800 Subject: [PATCH 1/4] chore: update IDataSynchronizer's interface --- .../source/idata_synchronizer.hpp | 31 ++++++++----- .../background_sync_system.cpp | 13 +++--- .../sources/noop/null_data_source.cpp | 5 +-- .../sources/noop/null_data_source.hpp | 4 +- .../sources/polling/polling_data_source.cpp | 33 +++++++------- .../sources/polling/polling_data_source.hpp | 40 +++++++++++------ .../streaming/streaming_data_source.cpp | 43 +++++++++---------- .../streaming/streaming_data_source.hpp | 34 ++++++--------- 8 files changed, 108 insertions(+), 95 deletions(-) diff --git a/libs/server-sdk/src/data_interfaces/source/idata_synchronizer.hpp b/libs/server-sdk/src/data_interfaces/source/idata_synchronizer.hpp index ea2facf25..c9df0f2d7 100644 --- a/libs/server-sdk/src/data_interfaces/source/idata_synchronizer.hpp +++ b/libs/server-sdk/src/data_interfaces/source/idata_synchronizer.hpp @@ -6,6 +6,8 @@ #include #include +#include "../destination/idestination.hpp" + namespace launchdarkly::server_side::data_interfaces { /** @@ -15,18 +17,25 @@ namespace launchdarkly::server_side::data_interfaces { class IDataSynchronizer { public: /** - * \brief Initialize the source, optionally with an initial data set. Init - * will be called before Start. - * \param initial_data Initial set of SDK data. - */ - virtual void Init(std::optional initial_data) = 0; - - /** - * \brief Starts the synchronization mechanism. Start will be called only - * once after Init; the source is responsible for maintaining a persistent - * connection. Start should not block. + * @brief Starts synchronizing data into the given IDestination. + * + * + * The second parameter, boostrap_data, may be nullptr meaning no bootstrap + * data is present in the SDK and a full synchronization must be initiated. + * + * If bootstrap_data is not nullptr, then it contains data obtained by the + * SDK out-of-band from the source's mechanism. The pointer is valid only + * for this call. + * + * The data may be used to optimize the synchronization process, e.g. by + * obtaining a diff rather than a full dataset. + * @param destination The destination to synchronize data into. Pointer is + * invalid after the ShutdownAsync completion handler is called. + * @param bootstrap_data Optional bootstrap data. + * Pointer is valid only for this call. */ - virtual void StartAsync() = 0; + virtual void StartAsync(IDestination* destination, + data_model::SDKDataSet const* bootstrap_data) = 0; /** * \brief Stops the synchronization mechanism. Stop will be called only once diff --git a/libs/server-sdk/src/data_systems/background_sync/background_sync_system.cpp b/libs/server-sdk/src/data_systems/background_sync/background_sync_system.cpp index ae2682a63..433773078 100644 --- a/libs/server-sdk/src/data_systems/background_sync/background_sync_system.cpp +++ b/libs/server-sdk/src/data_systems/background_sync/background_sync_system.cpp @@ -21,15 +21,15 @@ BackgroundSync::BackgroundSync( config::built::BackgroundSyncConfig:: StreamingConfig>) { synchronizer_ = std::make_shared( - endpoints, method_config, http_properties, ioc, - change_notifier_, status_manager, logger); + ioc, logger, status_manager, endpoints, method_config, + http_properties); } else if constexpr (std::is_same_v< T, config::built::BackgroundSyncConfig:: PollingConfig>) { synchronizer_ = std::make_shared( - endpoints, method_config, http_properties, ioc, - change_notifier_, status_manager, logger); + ioc, logger, status_manager, endpoints, method_config, + http_properties); } }, background_sync_config.synchronizer_); @@ -43,9 +43,8 @@ BackgroundSync::BackgroundSync( synchronizer_(std::make_shared(ioc, status_manager)) {} void BackgroundSync::Initialize() { - // TODO: if there was any data from bootstrapping, then add it: - // synchronizer_->Init(data); - synchronizer_->StartAsync(); + synchronizer_->StartAsync(&change_notifier_, + nullptr /* no bootstrap data supported yet */); } std::string const& BackgroundSync::Identity() const { diff --git a/libs/server-sdk/src/data_systems/background_sync/sources/noop/null_data_source.cpp b/libs/server-sdk/src/data_systems/background_sync/sources/noop/null_data_source.cpp index 28f07ab5c..a6b70f2c7 100644 --- a/libs/server-sdk/src/data_systems/background_sync/sources/noop/null_data_source.cpp +++ b/libs/server-sdk/src/data_systems/background_sync/sources/noop/null_data_source.cpp @@ -4,7 +4,8 @@ namespace launchdarkly::server_side::data_systems { -void NullDataSource::StartAsync() { +void NullDataSource::StartAsync(data_interfaces::IDestination* destination, + data_model::SDKDataSet const* initial_data) { status_manager_.SetState(DataSourceStatus::DataSourceState::kValid); } @@ -12,8 +13,6 @@ void NullDataSource::ShutdownAsync(std::function complete) { boost::asio::post(exec_, complete); } -void NullDataSource::Init(std::optional initial_data) {} - std::string const& NullDataSource::Identity() const { static std::string const identity = "no-op data source"; return identity; diff --git a/libs/server-sdk/src/data_systems/background_sync/sources/noop/null_data_source.hpp b/libs/server-sdk/src/data_systems/background_sync/sources/noop/null_data_source.hpp index 695eb7198..657cf49d2 100644 --- a/libs/server-sdk/src/data_systems/background_sync/sources/noop/null_data_source.hpp +++ b/libs/server-sdk/src/data_systems/background_sync/sources/noop/null_data_source.hpp @@ -13,8 +13,8 @@ class NullDataSource : public data_interfaces::IDataSynchronizer { boost::asio::any_io_executor exec, data_components::DataSourceStatusManager& status_manager); - void Init(std::optional initial_data) override; - void StartAsync() override; + void StartAsync(data_interfaces::IDestination* destination, + data_model::SDKDataSet const* initial_data) override; void ShutdownAsync(std::function) override; [[nodiscard]] std::string const& Identity() const override; diff --git a/libs/server-sdk/src/data_systems/background_sync/sources/polling/polling_data_source.cpp b/libs/server-sdk/src/data_systems/background_sync/sources/polling/polling_data_source.cpp index 0f0876d48..f88eb5cc9 100644 --- a/libs/server-sdk/src/data_systems/background_sync/sources/polling/polling_data_source.cpp +++ b/libs/server-sdk/src/data_systems/background_sync/sources/polling/polling_data_source.cpp @@ -13,7 +13,6 @@ #include namespace launchdarkly::server_side::data_systems { - static char const* const kErrorParsingPut = "Could not parse polling payload"; static char const* const kErrorPutInvalid = "Polling payload contained invalid data"; @@ -44,22 +43,20 @@ std::string const& PollingDataSource::Identity() const { } PollingDataSource::PollingDataSource( + boost::asio::any_io_executor const& ioc, + Logger const& logger, + data_components::DataSourceStatusManager& status_manager, config::built::ServiceEndpoints const& endpoints, config::built::BackgroundSyncConfig::PollingConfig const& data_source_config, - config::built::HttpProperties const& http_properties, - boost::asio::any_io_executor const& ioc, - data_interfaces::IDestination& handler, - data_components::DataSourceStatusManager& status_manager, - Logger const& logger) - : ioc_(ioc), - logger_(logger), + config::built::HttpProperties const& http_properties) + : logger_(logger), status_manager_(status_manager), - update_sink_(handler), requester_(ioc), - timer_(ioc), polling_interval_(data_source_config.poll_interval), - request_(MakeRequest(data_source_config, endpoints, http_properties)) { + request_(MakeRequest(data_source_config, endpoints, http_properties)), + timer_(ioc), + sink_(nullptr) { if (polling_interval_ < data_source_config.min_polling_interval) { LD_LOG(logger_, LogLevel::kWarn) << "Polling interval too frequent, defaulting to " @@ -72,10 +69,6 @@ PollingDataSource::PollingDataSource( } } -void PollingDataSource::Init( - std::optional initial_data) { - // TODO: implement -} void PollingDataSource::DoPoll() { last_poll_start_ = std::chrono::system_clock::now(); @@ -136,7 +129,7 @@ void PollingDataSource::HandlePollResult(network::HttpResult const& res) { tl::expected>(parsed); if (poll_result.has_value()) { - update_sink_.Init(std::move(*poll_result)); + sink_->Init(std::move(*poll_result)); status_manager_.SetState( DataSourceStatus::DataSourceState::kValid); return; @@ -211,7 +204,13 @@ void PollingDataSource::StartPollingTimer() { }); } -void PollingDataSource::StartAsync() { +void PollingDataSource::StartAsync( + data_interfaces::IDestination* dest, + data_model::SDKDataSet const* bootstrap_data) { + boost::ignore_unused(bootstrap_data); + + sink_ = dest; + status_manager_.SetState(DataSourceStatus::DataSourceState::kInitializing); if (!request_.Valid()) { LD_LOG(logger_, LogLevel::kError) << kCouldNotParseEndpoint; diff --git a/libs/server-sdk/src/data_systems/background_sync/sources/polling/polling_data_source.hpp b/libs/server-sdk/src/data_systems/background_sync/sources/polling/polling_data_source.hpp index 5dd668cb5..f7eaa8db8 100644 --- a/libs/server-sdk/src/data_systems/background_sync/sources/polling/polling_data_source.hpp +++ b/libs/server-sdk/src/data_systems/background_sync/sources/polling/polling_data_source.hpp @@ -19,17 +19,17 @@ class PollingDataSource : public data_interfaces::IDataSynchronizer, public std::enable_shared_from_this { public: - PollingDataSource(config::built::ServiceEndpoints const& endpoints, + PollingDataSource(boost::asio::any_io_executor const& ioc, + Logger const& logger, + data_components::DataSourceStatusManager& status_manager, + config::built::ServiceEndpoints const& endpoints, config::built::BackgroundSyncConfig::PollingConfig const& data_source_config, - config::built::HttpProperties const& http_properties, - boost::asio::any_io_executor const& ioc, - data_interfaces::IDestination& handler, - data_components::DataSourceStatusManager& status_manager, - Logger const& logger); + config::built::HttpProperties const& http_properties); + + void StartAsync(data_interfaces::IDestination* dest, + data_model::SDKDataSet const* bootstrap_data) override; - void Init(std::optional initial_data) override; - void StartAsync() override; void ShutdownAsync(std::function completion) override; [[nodiscard]] std::string const& Identity() const override; @@ -38,19 +38,35 @@ class PollingDataSource void DoPoll(); void HandlePollResult(network::HttpResult const& res); + Logger const& logger_; + + // Status manager is used to report the status of the data source. It must + // outlive the source. This source performs asynchronous + // operations, so a completion handler might invoke the status manager after + // the it has been destroyed. data_components::DataSourceStatusManager& status_manager_; - std::string polling_endpoint_; + // Responsible for performing HTTP requests using boost::asio. network::AsioRequester requester_; - Logger const& logger_; - boost::asio::any_io_executor ioc_; + + // How long to wait beteween individual polling requests. std::chrono::seconds polling_interval_; + + // Cached request arguments used in the polling request. network::HttpRequest request_; + + // Etag can be sent in HTTP request to save bandwidth if the server knows + // the response is unchanged. std::optional etag_; + // Used with polling_interval to schedule polling requests. boost::asio::steady_timer timer_; + + // The last time the polling HTTP request is initiated. std::chrono::time_point last_poll_start_; - data_interfaces::IDestination& update_sink_; + + // Destination for all data obtained via polling. + data_interfaces::IDestination* sink_; void StartPollingTimer(); }; diff --git a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp index 7749135ba..b5a62d0b4 100644 --- a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp +++ b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp @@ -14,7 +14,7 @@ namespace launchdarkly::server_side::data_systems { static char const* const kCouldNotParseEndpoint = "Could not parse streaming endpoint URL"; -static char const* DataSourceErrorToString(launchdarkly::sse::Error error) { +static char const* DataSourceErrorToString(sse::Error const error) { switch (error) { case sse::Error::NoContent: return "server responded 204 (No Content), will not attempt to " @@ -34,28 +34,26 @@ std::string const& StreamingDataSource::Identity() const { } StreamingDataSource::StreamingDataSource( - config::built::ServiceEndpoints const& endpoints, - config::built::BackgroundSyncConfig::StreamingConfig const& - data_source_config, - config::built::HttpProperties http_properties, - boost::asio::any_io_executor ioc, - data_interfaces::IDestination& handler, + boost::asio::any_io_executor io, + Logger const& logger, data_components::DataSourceStatusManager& status_manager, - Logger const& logger) - : exec_(std::move(ioc)), + config::built::ServiceEndpoints const& endpoints, + config::built::BackgroundSyncConfig::StreamingConfig const& streaming, + config::built::HttpProperties const& http_properties) + : io_(std::move(io)), logger_(logger), status_manager_(status_manager), - data_source_handler_(handler, logger, status_manager_), - http_config_(std::move(http_properties)), - streaming_config_(data_source_config), + http_config_(http_properties), + streaming_config_(streaming), streaming_endpoint_(endpoints.StreamingBaseUrl()) {} -void StreamingDataSource::Init( - std::optional initial_data) { - // TODO: implement -} +void StreamingDataSource::StartAsync( + data_interfaces::IDestination* dest, + data_model::SDKDataSet const* bootstrap_data) { + boost::movelib::ignore(bootstrap_data); + + event_handler_.emplace(*dest, logger_, status_manager_); -void StreamingDataSource::StartAsync() { status_manager_.SetState(DataSourceStatus::DataSourceState::kInitializing); auto updated_url = network::AppendUrl(streaming_endpoint_, @@ -85,7 +83,7 @@ void StreamingDataSource::StartAsync() { boost::urls::url url = uri_components.value(); - auto client_builder = launchdarkly::sse::Builder(exec_, url.buffer()); + auto client_builder = launchdarkly::sse::Builder(io_, url.buffer()); client_builder.method(boost::beast::http::verb::get); @@ -101,16 +99,15 @@ void StreamingDataSource::StartAsync() { client_builder.initial_reconnect_delay( streaming_config_.initial_reconnect_delay); - for (auto const& header : http_config_.BaseHeaders()) { - client_builder.header(header.first, header.second); + for (auto const& [key, value] : http_config_.BaseHeaders()) { + client_builder.header(key, value); } auto weak_self = weak_from_this(); client_builder.receiver([weak_self](launchdarkly::sse::Event const& event) { if (auto self = weak_self.lock()) { - self->data_source_handler_.HandleMessage(event.type(), - event.data()); + self->event_handler_->HandleMessage(event.type(), event.data()); // TODO: Use the result of handle message to restart the // event source if we got bad data. sc-204387 } @@ -153,7 +150,7 @@ void StreamingDataSource::ShutdownAsync(std::function completion) { return client_->async_shutdown(std::move(completion)); } if (completion) { - boost::asio::post(exec_, completion); + boost::asio::post(io_, completion); } } } // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.hpp b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.hpp index 9c868fce0..647eb3319 100644 --- a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.hpp +++ b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.hpp @@ -6,17 +6,12 @@ #include "../../../../data_interfaces/destination/idestination.hpp" #include "../../../../data_interfaces/source/idata_synchronizer.hpp" -#include - #include +#include #include #include -#include - -using namespace std::chrono_literals; - namespace launchdarkly::server_side::data_systems { class StreamingDataSource final @@ -24,32 +19,31 @@ class StreamingDataSource final public std::enable_shared_from_this { public: StreamingDataSource( - config::built::ServiceEndpoints const& endpoints, - config::built::BackgroundSyncConfig::StreamingConfig const& - data_source_config, - config::built::HttpProperties http_properties, - boost::asio::any_io_executor ioc, - data_interfaces::IDestination& handler, + boost::asio::any_io_executor io, + Logger const& logger, data_components::DataSourceStatusManager& status_manager, - Logger const& logger); + config::built::ServiceEndpoints const& endpoints, + config::built::BackgroundSyncConfig::StreamingConfig const& streaming, + config::built::HttpProperties const& http_properties); - void Init(std::optional initial_data) override; - void StartAsync() override; + void StartAsync(data_interfaces::IDestination* dest, + data_model::SDKDataSet const* bootstrap_data) override; void ShutdownAsync(std::function completion) override; [[nodiscard]] std::string const& Identity() const override; private: - boost::asio::any_io_executor exec_; + boost::asio::any_io_executor io_; + Logger const& logger_; + data_components::DataSourceStatusManager& status_manager_; - DataSourceEventHandler data_source_handler_; + config::built::HttpProperties http_config_; + + std::optional event_handler_; std::string streaming_endpoint_; config::built::BackgroundSyncConfig::StreamingConfig streaming_config_; - config::built::HttpProperties http_config_; - - Logger const& logger_; std::shared_ptr client_; }; } // namespace launchdarkly::server_side::data_systems From 42f73bcf39ac73c1886ca908e6585ad59c5d6b39 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Wed, 22 Nov 2023 10:23:32 -0800 Subject: [PATCH 2/4] doc wording --- .../src/data_interfaces/source/idata_synchronizer.hpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/libs/server-sdk/src/data_interfaces/source/idata_synchronizer.hpp b/libs/server-sdk/src/data_interfaces/source/idata_synchronizer.hpp index c9df0f2d7..ea1250dfb 100644 --- a/libs/server-sdk/src/data_interfaces/source/idata_synchronizer.hpp +++ b/libs/server-sdk/src/data_interfaces/source/idata_synchronizer.hpp @@ -12,7 +12,7 @@ namespace launchdarkly::server_side::data_interfaces { /** * \brief IDataSynchronizer obtains data via a background synchronization - * mechanism, updating a local cache whenever changes are made upstream. + * mechanism, updating an IDestination whenever changes arrive from upstream. */ class IDataSynchronizer { public: @@ -24,11 +24,12 @@ class IDataSynchronizer { * data is present in the SDK and a full synchronization must be initiated. * * If bootstrap_data is not nullptr, then it contains data obtained by the - * SDK out-of-band from the source's mechanism. The pointer is valid only + * SDK during the bootstrap process. The pointer is valid only * for this call. * * The data may be used to optimize the synchronization process, e.g. by * obtaining a diff rather than a full dataset. + * * @param destination The destination to synchronize data into. Pointer is * invalid after the ShutdownAsync completion handler is called. * @param bootstrap_data Optional bootstrap data. @@ -46,7 +47,7 @@ class IDataSynchronizer { virtual void ShutdownAsync(std::function complete) = 0; /** - * \return Identity of the source. Used in logs. + * \return Identity of the synchronizer. Used in logs. */ [[nodiscard]] virtual std::string const& Identity() const = 0; From 3afa464d9fc5a8a05b89c4e7aa3ad19a3502436f Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Wed, 22 Nov 2023 10:28:29 -0800 Subject: [PATCH 3/4] compilation fixes --- .../data_interfaces/sources/iserialized_data_reader.hpp | 1 + .../background_sync/sources/streaming/streaming_data_source.cpp | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/libs/server-sdk/include/launchdarkly/server_side/data_interfaces/sources/iserialized_data_reader.hpp b/libs/server-sdk/include/launchdarkly/server_side/data_interfaces/sources/iserialized_data_reader.hpp index 88e0fd0f2..8b193ded8 100644 --- a/libs/server-sdk/include/launchdarkly/server_side/data_interfaces/sources/iserialized_data_reader.hpp +++ b/libs/server-sdk/include/launchdarkly/server_side/data_interfaces/sources/iserialized_data_reader.hpp @@ -7,6 +7,7 @@ #include #include +#include namespace launchdarkly::server_side::data_interfaces { diff --git a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp index b5a62d0b4..3828ae5d7 100644 --- a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp +++ b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp @@ -50,7 +50,7 @@ StreamingDataSource::StreamingDataSource( void StreamingDataSource::StartAsync( data_interfaces::IDestination* dest, data_model::SDKDataSet const* bootstrap_data) { - boost::movelib::ignore(bootstrap_data); + boost::ignore_unused(bootstrap_data); event_handler_.emplace(*dest, logger_, status_manager_); From 0e85fb6732936f1f9355d571cf42d6fa38741f47 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Mon, 27 Nov 2023 09:18:46 -0800 Subject: [PATCH 4/4] Update libs/server-sdk/src/data_systems/background_sync/sources/polling/polling_data_source.hpp Co-authored-by: Matthew M. Keeler --- .../background_sync/sources/polling/polling_data_source.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/server-sdk/src/data_systems/background_sync/sources/polling/polling_data_source.hpp b/libs/server-sdk/src/data_systems/background_sync/sources/polling/polling_data_source.hpp index f7eaa8db8..b9492c755 100644 --- a/libs/server-sdk/src/data_systems/background_sync/sources/polling/polling_data_source.hpp +++ b/libs/server-sdk/src/data_systems/background_sync/sources/polling/polling_data_source.hpp @@ -43,7 +43,7 @@ class PollingDataSource // Status manager is used to report the status of the data source. It must // outlive the source. This source performs asynchronous // operations, so a completion handler might invoke the status manager after - // the it has been destroyed. + // it has been destroyed. data_components::DataSourceStatusManager& status_manager_; // Responsible for performing HTTP requests using boost::asio.