Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <optional>
#include <string>
#include <unordered_map>

namespace launchdarkly::server_side::data_interfaces {

Expand Down
36 changes: 23 additions & 13 deletions libs/server-sdk/src/data_interfaces/source/idata_synchronizer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,37 @@
#include <optional>
#include <string>

#include "../destination/idestination.hpp"

namespace launchdarkly::server_side::data_interfaces {

/**
* \brief IDataSynchronizer obtains data via a background synchronization
* mechanism, updating a local cache whenever changes are made upstream.
* mechanism, updating an IDestination whenever changes arrive from upstream.
*/
class IDataSynchronizer {
public:
/**
* \brief Initialize the source, optionally with an initial data set. Init
* will be called before Start.
* \param initial_data Initial set of SDK data.
*/
virtual void Init(std::optional<data_model::SDKDataSet> initial_data) = 0;

/**
* \brief Starts the synchronization mechanism. Start will be called only
* once after Init; the source is responsible for maintaining a persistent
* connection. Start should not block.
* @brief Starts synchronizing data into the given IDestination.
*
*
* The second parameter, boostrap_data, may be nullptr meaning no bootstrap
* data is present in the SDK and a full synchronization must be initiated.
*
* If bootstrap_data is not nullptr, then it contains data obtained by the
* SDK during the bootstrap process. The pointer is valid only
* for this call.
*
* The data may be used to optimize the synchronization process, e.g. by
* obtaining a diff rather than a full dataset.
*
* @param destination The destination to synchronize data into. Pointer is
* invalid after the ShutdownAsync completion handler is called.
* @param bootstrap_data Optional bootstrap data.
* Pointer is valid only for this call.
*/
virtual void StartAsync() = 0;
virtual void StartAsync(IDestination* destination,
data_model::SDKDataSet const* bootstrap_data) = 0;

/**
* \brief Stops the synchronization mechanism. Stop will be called only once
Expand All @@ -37,7 +47,7 @@ class IDataSynchronizer {
virtual void ShutdownAsync(std::function<void()> complete) = 0;

/**
* \return Identity of the source. Used in logs.
* \return Identity of the synchronizer. Used in logs.
*/
[[nodiscard]] virtual std::string const& Identity() const = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ BackgroundSync::BackgroundSync(
config::built::BackgroundSyncConfig::
StreamingConfig>) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-arranged the constructor arguments to follow our C++ standards.

synchronizer_ = std::make_shared<StreamingDataSource>(
endpoints, method_config, http_properties, ioc,
change_notifier_, status_manager, logger);
ioc, logger, status_manager, endpoints, method_config,
http_properties);

} else if constexpr (std::is_same_v<
T, config::built::BackgroundSyncConfig::
PollingConfig>) {
synchronizer_ = std::make_shared<PollingDataSource>(
endpoints, method_config, http_properties, ioc,
change_notifier_, status_manager, logger);
ioc, logger, status_manager, endpoints, method_config,
http_properties);
}
},
background_sync_config.synchronizer_);
Expand All @@ -43,9 +43,8 @@ BackgroundSync::BackgroundSync(
synchronizer_(std::make_shared<NullDataSource>(ioc, status_manager)) {}

void BackgroundSync::Initialize() {
// TODO: if there was any data from bootstrapping, then add it:
// synchronizer_->Init(data);
synchronizer_->StartAsync();
synchronizer_->StartAsync(&change_notifier_,
nullptr /* no bootstrap data supported yet */);
}

std::string const& BackgroundSync::Identity() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@

namespace launchdarkly::server_side::data_systems {

void NullDataSource::StartAsync() {
void NullDataSource::StartAsync(data_interfaces::IDestination* destination,
data_model::SDKDataSet const* initial_data) {
status_manager_.SetState(DataSourceStatus::DataSourceState::kValid);
}

void NullDataSource::ShutdownAsync(std::function<void()> complete) {
boost::asio::post(exec_, complete);
}

void NullDataSource::Init(std::optional<data_model::SDKDataSet> initial_data) {}

std::string const& NullDataSource::Identity() const {
static std::string const identity = "no-op data source";
return identity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ class NullDataSource : public data_interfaces::IDataSynchronizer {
boost::asio::any_io_executor exec,
data_components::DataSourceStatusManager& status_manager);

void Init(std::optional<data_model::SDKDataSet> initial_data) override;
void StartAsync() override;
void StartAsync(data_interfaces::IDestination* destination,
data_model::SDKDataSet const* initial_data) override;
void ShutdownAsync(std::function<void()>) override;

[[nodiscard]] std::string const& Identity() const override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <boost/json.hpp>

namespace launchdarkly::server_side::data_systems {

static char const* const kErrorParsingPut = "Could not parse polling payload";
static char const* const kErrorPutInvalid =
"Polling payload contained invalid data";
Expand Down Expand Up @@ -44,22 +43,20 @@ std::string const& PollingDataSource::Identity() const {
}

PollingDataSource::PollingDataSource(
boost::asio::any_io_executor const& ioc,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes follow from re-arranging the constructor args.

Logger const& logger,
data_components::DataSourceStatusManager& status_manager,
config::built::ServiceEndpoints const& endpoints,
config::built::BackgroundSyncConfig::PollingConfig const&
data_source_config,
config::built::HttpProperties const& http_properties,
boost::asio::any_io_executor const& ioc,
data_interfaces::IDestination& handler,
data_components::DataSourceStatusManager& status_manager,
Logger const& logger)
: ioc_(ioc),
logger_(logger),
config::built::HttpProperties const& http_properties)
: logger_(logger),
status_manager_(status_manager),
update_sink_(handler),
requester_(ioc),
timer_(ioc),
polling_interval_(data_source_config.poll_interval),
request_(MakeRequest(data_source_config, endpoints, http_properties)) {
request_(MakeRequest(data_source_config, endpoints, http_properties)),
timer_(ioc),
sink_(nullptr) {
if (polling_interval_ < data_source_config.min_polling_interval) {
LD_LOG(logger_, LogLevel::kWarn)
<< "Polling interval too frequent, defaulting to "
Expand All @@ -72,10 +69,6 @@ PollingDataSource::PollingDataSource(
}
}

void PollingDataSource::Init(
std::optional<data_model::SDKDataSet> initial_data) {
// TODO: implement
}
void PollingDataSource::DoPoll() {
last_poll_start_ = std::chrono::system_clock::now();

Expand Down Expand Up @@ -136,7 +129,7 @@ void PollingDataSource::HandlePollResult(network::HttpResult const& res) {
tl::expected<data_model::SDKDataSet, JsonError>>(parsed);

if (poll_result.has_value()) {
update_sink_.Init(std::move(*poll_result));
sink_->Init(std::move(*poll_result));
status_manager_.SetState(
DataSourceStatus::DataSourceState::kValid);
return;
Expand Down Expand Up @@ -211,7 +204,13 @@ void PollingDataSource::StartPollingTimer() {
});
}

void PollingDataSource::StartAsync() {
void PollingDataSource::StartAsync(
data_interfaces::IDestination* dest,
data_model::SDKDataSet const* bootstrap_data) {
boost::ignore_unused(bootstrap_data);

sink_ = dest;

status_manager_.SetState(DataSourceStatus::DataSourceState::kInitializing);
if (!request_.Valid()) {
LD_LOG(logger_, LogLevel::kError) << kCouldNotParseEndpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ class PollingDataSource
: public data_interfaces::IDataSynchronizer,
public std::enable_shared_from_this<PollingDataSource> {
public:
PollingDataSource(config::built::ServiceEndpoints const& endpoints,
PollingDataSource(boost::asio::any_io_executor const& ioc,
Logger const& logger,
data_components::DataSourceStatusManager& status_manager,
config::built::ServiceEndpoints const& endpoints,
config::built::BackgroundSyncConfig::PollingConfig const&
data_source_config,
config::built::HttpProperties const& http_properties,
boost::asio::any_io_executor const& ioc,
data_interfaces::IDestination& handler,
data_components::DataSourceStatusManager& status_manager,
Logger const& logger);
config::built::HttpProperties const& http_properties);

void StartAsync(data_interfaces::IDestination* dest,
data_model::SDKDataSet const* bootstrap_data) override;

void Init(std::optional<data_model::SDKDataSet> initial_data) override;
void StartAsync() override;
void ShutdownAsync(std::function<void()> completion) override;

[[nodiscard]] std::string const& Identity() const override;
Expand All @@ -38,19 +38,35 @@ class PollingDataSource
void DoPoll();
void HandlePollResult(network::HttpResult const& res);

Logger const& logger_;

// Status manager is used to report the status of the data source. It must
// outlive the source. This source performs asynchronous
// operations, so a completion handler might invoke the status manager after
// it has been destroyed.
data_components::DataSourceStatusManager& status_manager_;
std::string polling_endpoint_;

// Responsible for performing HTTP requests using boost::asio.
network::AsioRequester requester_;
Logger const& logger_;
boost::asio::any_io_executor ioc_;

// How long to wait beteween individual polling requests.
std::chrono::seconds polling_interval_;

// Cached request arguments used in the polling request.
network::HttpRequest request_;

// Etag can be sent in HTTP request to save bandwidth if the server knows
// the response is unchanged.
std::optional<std::string> etag_;

// Used with polling_interval to schedule polling requests.
boost::asio::steady_timer timer_;

// The last time the polling HTTP request is initiated.
std::chrono::time_point<std::chrono::system_clock> last_poll_start_;
data_interfaces::IDestination& update_sink_;

// Destination for all data obtained via polling.
data_interfaces::IDestination* sink_;

void StartPollingTimer();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace launchdarkly::server_side::data_systems {
static char const* const kCouldNotParseEndpoint =
"Could not parse streaming endpoint URL";

static char const* DataSourceErrorToString(launchdarkly::sse::Error error) {
static char const* DataSourceErrorToString(sse::Error const error) {
switch (error) {
case sse::Error::NoContent:
return "server responded 204 (No Content), will not attempt to "
Expand All @@ -34,28 +34,26 @@ std::string const& StreamingDataSource::Identity() const {
}

StreamingDataSource::StreamingDataSource(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follows from re-arranging constructor args.

config::built::ServiceEndpoints const& endpoints,
config::built::BackgroundSyncConfig::StreamingConfig const&
data_source_config,
config::built::HttpProperties http_properties,
boost::asio::any_io_executor ioc,
data_interfaces::IDestination& handler,
boost::asio::any_io_executor io,
Logger const& logger,
data_components::DataSourceStatusManager& status_manager,
Logger const& logger)
: exec_(std::move(ioc)),
config::built::ServiceEndpoints const& endpoints,
config::built::BackgroundSyncConfig::StreamingConfig const& streaming,
config::built::HttpProperties const& http_properties)
: io_(std::move(io)),
logger_(logger),
status_manager_(status_manager),
data_source_handler_(handler, logger, status_manager_),
http_config_(std::move(http_properties)),
streaming_config_(data_source_config),
http_config_(http_properties),
streaming_config_(streaming),
streaming_endpoint_(endpoints.StreamingBaseUrl()) {}

void StreamingDataSource::Init(
std::optional<data_model::SDKDataSet> initial_data) {
// TODO: implement
}
void StreamingDataSource::StartAsync(
data_interfaces::IDestination* dest,
data_model::SDKDataSet const* bootstrap_data) {
boost::ignore_unused(bootstrap_data);

event_handler_.emplace(*dest, logger_, status_manager_);

void StreamingDataSource::StartAsync() {
status_manager_.SetState(DataSourceStatus::DataSourceState::kInitializing);

auto updated_url = network::AppendUrl(streaming_endpoint_,
Expand Down Expand Up @@ -85,7 +83,7 @@ void StreamingDataSource::StartAsync() {

boost::urls::url url = uri_components.value();

auto client_builder = launchdarkly::sse::Builder(exec_, url.buffer());
auto client_builder = launchdarkly::sse::Builder(io_, url.buffer());

client_builder.method(boost::beast::http::verb::get);

Expand All @@ -101,16 +99,15 @@ void StreamingDataSource::StartAsync() {
client_builder.initial_reconnect_delay(
streaming_config_.initial_reconnect_delay);

for (auto const& header : http_config_.BaseHeaders()) {
client_builder.header(header.first, header.second);
for (auto const& [key, value] : http_config_.BaseHeaders()) {
client_builder.header(key, value);
}

auto weak_self = weak_from_this();

client_builder.receiver([weak_self](launchdarkly::sse::Event const& event) {
if (auto self = weak_self.lock()) {
self->data_source_handler_.HandleMessage(event.type(),
event.data());
self->event_handler_->HandleMessage(event.type(), event.data());
// TODO: Use the result of handle message to restart the
// event source if we got bad data. sc-204387
}
Expand Down Expand Up @@ -153,7 +150,7 @@ void StreamingDataSource::ShutdownAsync(std::function<void()> completion) {
return client_->async_shutdown(std::move(completion));
}
if (completion) {
boost::asio::post(exec_, completion);
boost::asio::post(io_, completion);
}
}
} // namespace launchdarkly::server_side::data_systems
Loading