Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,20 @@ struct FDv2SourceResult {
*/
struct ChangeSet {
data_model::ChangeSet<ChangeSetData> change_set;
/** If true, the server signaled that the client should fall back to
* FDv1. */
bool fdv1_fallback;
};

/**
* A transient error occurred; the source may recover.
*/
struct Interrupted {
ErrorInfo error;
bool fdv1_fallback;
};

/**
* A non-recoverable error occurred; the source should not be retried.
*/
struct TerminalError {
ErrorInfo error;
bool fdv1_fallback;
};

/**
Expand All @@ -53,13 +48,18 @@ struct FDv2SourceResult {
*/
struct Goodbye {
std::optional<std::string> reason;
bool fdv1_fallback;
};

using Value =
std::variant<ChangeSet, Interrupted, TerminalError, Shutdown, Goodbye>;

Value value;

/**
* If true, the server signaled (via the X-LD-FD-Fallback response header)
* that the client should fall back to FDv1.
*/
bool fdv1_fallback = false;
};

} // namespace launchdarkly::server_side::data_interfaces
70 changes: 41 additions & 29 deletions libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
#include <launchdarkly/network/http_error_messages.hpp>
#include <launchdarkly/server_side/config/builders/all_builders.hpp>

#include <boost/algorithm/string/predicate.hpp>
#include <boost/json.hpp>
#include <boost/url/parse.hpp>
#include <boost/url/url.hpp>

namespace launchdarkly::server_side::data_systems {

static char const* const kFDv2PollPath = "/sdk/poll";
static char const* const kFDv1FallbackHeader = "X-LD-FD-Fallback";

static char const* const kErrorParsingBody =
"Could not parse FDv2 polling response";
Expand All @@ -32,6 +34,12 @@ static ErrorInfo MakeError(ErrorKind kind,
std::chrono::system_clock::now()};
}

static bool ReadFDv1FallbackDirective(
network::HttpResult::HeadersType const& headers) {
auto const it = headers.find(kFDv1FallbackHeader);
return it != headers.end() && boost::iequals(it->second, "true");
}

network::HttpRequest MakeFDv2PollRequest(
config::built::ServiceEndpoints const& endpoints,
config::built::HttpProperties const& http_properties,
Expand Down Expand Up @@ -88,15 +96,13 @@ static FDv2SourceResult ParseFDv2PollEvents(
auto typed = TranslateChangeSet(*change_set, logger);
if (!typed) {
return FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kInvalidData, 0, kErrorTranslation),
false}};
MakeError(ErrorKind::kInvalidData, 0, kErrorTranslation)}};
}
return FDv2SourceResult{
FDv2SourceResult::ChangeSet{std::move(*typed), false}};
FDv2SourceResult::ChangeSet{std::move(*typed)}};
}
if (auto* goodbye = std::get_if<Goodbye>(&result)) {
return FDv2SourceResult{
FDv2SourceResult::Goodbye{goodbye->reason, false}};
return FDv2SourceResult{FDv2SourceResult::Goodbye{goodbye->reason}};
}
if (auto* error = std::get_if<FDv2ProtocolHandler::Error>(&result)) {
if (error->kind == FDv2ProtocolHandler::Error::Kind::kServerError) {
Expand All @@ -107,16 +113,15 @@ static FDv2SourceResult ParseFDv2PollEvents(
id.value_or("") + "' with reason: '" + error->message +
"'. Automatic retry will occur.";
return FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kErrorResponse, 0, std::move(msg)),
false}};
MakeError(ErrorKind::kErrorResponse, 0, std::move(msg))}};
}
return FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kInvalidData, 0, error->message), false}};
MakeError(ErrorKind::kInvalidData, 0, error->message)}};
}
}

return FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kInvalidData, 0, kErrorIncompletePayload), false}};
MakeError(ErrorKind::kInvalidData, 0, kErrorIncompletePayload)}};
}

static FDv2SourceResult ParseFDv2PollResponse(
Expand All @@ -127,25 +132,25 @@ static FDv2SourceResult ParseFDv2PollResponse(
auto parsed = boost::json::parse(body, ec);
if (ec) {
return FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), false}};
MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody)}};
}

auto const* obj = parsed.if_object();
if (!obj) {
return FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), false}};
MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody)}};
}

auto const* events_val = obj->if_contains("events");
if (!events_val) {
return FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), false}};
MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents)}};
}

auto const* events_arr = events_val->if_array();
if (!events_arr) {
return FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), false}};
MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents)}};
}

return ParseFDv2PollEvents(*events_arr, protocol_handler, logger);
Expand All @@ -161,24 +166,28 @@ data_interfaces::FDv2SourceResult HandleFDv2PollResponse(
std::string error_msg = msg.has_value() ? *msg : "unknown error";
LD_LOG(logger, LogLevel::kWarn) << identity << ": " << error_msg;
return FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kNetworkError, 0, std::move(error_msg)),
false}};
MakeError(ErrorKind::kNetworkError, 0, std::move(error_msg))}};
}

bool const fdv1_fallback = ReadFDv1FallbackDirective(res.Headers());

if (res.Status() == 304) {
return FDv2SourceResult{FDv2SourceResult::ChangeSet{
data_model::ChangeSet<data_interfaces::ChangeSetData>{
data_model::ChangeSetType::kNone, {}, data_model::Selector{}},
false}};
return FDv2SourceResult{
FDv2SourceResult::ChangeSet{
data_model::ChangeSet<data_interfaces::ChangeSetData>{
data_model::ChangeSetType::kNone,
{},
data_model::Selector{}}},
fdv1_fallback};
}

if (res.Status() == 200) {
auto const& body = res.Body();
if (!body) {
return FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kInvalidData, 0,
"polling response contained no body"),
false}};
return FDv2SourceResult{FDv2SourceResult::Interrupted{MakeError(
ErrorKind::kInvalidData, 0,
"polling response contained no body")},
fdv1_fallback};
}

auto result = ParseFDv2PollResponse(*body, protocol_handler, logger);
Expand All @@ -192,24 +201,27 @@ data_interfaces::FDv2SourceResult HandleFDv2PollResponse(
<< identity << ": " << interrupted->error.Message();
}
}
result.fdv1_fallback = fdv1_fallback;
return result;
}

if (network::IsRecoverableStatus(res.Status())) {
std::string msg = network::ErrorForStatusCode(
res.Status(), "FDv2 polling request", "will retry");
LD_LOG(logger, LogLevel::kWarn) << identity << ": " << msg;
return FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg)),
false}};
return FDv2SourceResult{
FDv2SourceResult::Interrupted{MakeError(
ErrorKind::kErrorResponse, res.Status(), std::move(msg))},
fdv1_fallback};
}

std::string msg = network::ErrorForStatusCode(
res.Status(), "FDv2 polling request", std::nullopt);
LD_LOG(logger, LogLevel::kError) << identity << ": " << msg;
return FDv2SourceResult{FDv2SourceResult::TerminalError{
MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg)),
false}};
return FDv2SourceResult{
FDv2SourceResult::TerminalError{
MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg))},
fdv1_fallback};
}

} // namespace launchdarkly::server_side::data_systems
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ async::Future<FDv2SourceResult> FDv2PollingInitializer::Run() {
FDv2SourceResult{FDv2SourceResult::TerminalError{
ErrorInfo{ErrorInfo::ErrorKind::kUnknown, 0,
"invalid polling endpoint URL",
std::chrono::system_clock::now()},
false}});
std::chrono::system_clock::now()}}});
}

// Promisify the callback-based HTTP request.
Expand Down
61 changes: 36 additions & 25 deletions libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <launchdarkly/async/timer.hpp>

#include <boost/algorithm/string/predicate.hpp>
#include <boost/json.hpp>
#include <boost/url/parse.hpp>
#include <boost/url/url.hpp>
Expand Down Expand Up @@ -67,8 +68,7 @@ void FDv2StreamingSynchronizer::State::EnsureStarted(
<< kIdentity << ": could not parse streaming endpoint URL";
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{
MakeError(ErrorKind::kNetworkError, 0,
"could not parse streaming endpoint URL"),
false}});
"could not parse streaming endpoint URL")}});
return;
}

Expand Down Expand Up @@ -116,6 +116,12 @@ void FDv2StreamingSynchronizer::State::EnsureStarted(
s->OnConnect(req);
}
});
client_builder.on_response(
[weak](boost::beast::http::response_header<> const& headers) {
if (auto s = weak.lock()) {
s->OnResponse(headers);
}
});
client_builder.receiver([weak](sse::Event const& event) {
if (auto s = weak.lock()) {
s->OnEvent(event);
Expand All @@ -137,10 +143,8 @@ void FDv2StreamingSynchronizer::State::EnsureStarted(
// started_ intentionally left true: same reasoning as above.
LD_LOG(logger_, LogLevel::kError)
<< kIdentity << ": could not build SSE client";
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{
MakeError(ErrorKind::kNetworkError, 0,
"could not build SSE client"),
false}});
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{MakeError(
ErrorKind::kNetworkError, 0, "could not build SSE client")}});
return;
}

Expand Down Expand Up @@ -169,6 +173,15 @@ void FDv2StreamingSynchronizer::State::OnConnect(HttpRequest* req) {
req->target(u.encoded_target());
}

void FDv2StreamingSynchronizer::State::OnResponse(
HttpResponseHeader const& headers) {
auto const it = headers.find("X-LD-FD-Fallback");
bool const directive =
it != headers.end() && boost::iequals(it->value(), "true");
std::lock_guard lock(mutex_);
latest_fdv1_fallback_ = directive;
}

void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
boost::system::error_code ec;
auto data = boost::json::parse(event.data(), ec);
Expand All @@ -177,7 +190,7 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
std::string msg = "could not parse FDv2 streaming event payload";
LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg;
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kInvalidData, 0, std::move(msg)), false}});
MakeError(ErrorKind::kInvalidData, 0, std::move(msg))}});
return;
}

Expand All @@ -195,21 +208,20 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
"FDv2 streaming changeset could not be translated";
LD_LOG(logger_, LogLevel::kError)
<< kIdentity << ": " << msg;
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kInvalidData, 0, std::move(msg)),
false}});
Notify(FDv2SourceResult{
FDv2SourceResult::Interrupted{MakeError(
ErrorKind::kInvalidData, 0, std::move(msg))}});
return;
}
Notify(FDv2SourceResult{
FDv2SourceResult::ChangeSet{std::move(*typed), false}});
FDv2SourceResult::ChangeSet{std::move(*typed)}});
} else if constexpr (std::is_same_v<T, Goodbye>) {
LD_LOG(logger_, LogLevel::kInfo)
<< kIdentity
<< ": Goodbye was received from the LaunchDarkly "
"connection with reason: '"
<< r.reason.value_or("") << "'.";
Notify(FDv2SourceResult{
FDv2SourceResult::Goodbye{r.reason, false}});
Notify(FDv2SourceResult{FDv2SourceResult::Goodbye{r.reason}});
// Drop the current connection and reconnect; the protocol
// handler is reset so the new connection starts in a clean
// state.
Expand All @@ -229,15 +241,15 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
"'. Automatic retry will occur.";
LD_LOG(logger_, LogLevel::kInfo)
<< kIdentity << ": " << msg;
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kErrorResponse, 0, std::move(msg)),
false}});
Notify(FDv2SourceResult{
FDv2SourceResult::Interrupted{MakeError(
ErrorKind::kErrorResponse, 0, std::move(msg))}});
return;
}
LD_LOG(logger_, LogLevel::kError)
<< kIdentity << ": " << r.message;
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kInvalidData, 0, r.message), false}});
MakeError(ErrorKind::kInvalidData, 0, r.message)}});
} else {
static_assert(always_false_v<T>, "non-exhaustive visitor");
}
Expand All @@ -253,31 +265,30 @@ void FDv2StreamingSynchronizer::State::OnError(sse::Error const& error) {
if (sse::IsRecoverable(error)) {
LD_LOG(logger_, LogLevel::kWarn) << kIdentity << ": " << msg;
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
MakeError(ErrorKind::kNetworkError, 0, std::move(msg)), false}});
MakeError(ErrorKind::kNetworkError, 0, std::move(msg))}});
return;
}

LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg;

if (auto const* client_error =
std::get_if<sse::errors::UnrecoverableClientError>(&error)) {
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{
MakeError(
ErrorKind::kErrorResponse,
static_cast<ErrorInfo::StatusCodeType>(client_error->status),
std::move(msg)),
false}});
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{MakeError(
ErrorKind::kErrorResponse,
static_cast<ErrorInfo::StatusCodeType>(client_error->status),
std::move(msg))}});
return;
}

Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{
MakeError(ErrorKind::kNetworkError, 0, std::move(msg)), false}});
MakeError(ErrorKind::kNetworkError, 0, std::move(msg))}});
}

void FDv2StreamingSynchronizer::State::Notify(FDv2SourceResult result) {
std::optional<async::Promise<FDv2SourceResult>> promise;
{
std::lock_guard lock(mutex_);
result.fdv1_fallback = latest_fdv1_fallback_;
if (pending_promise_) {
promise = std::move(pending_promise_);
pending_promise_.reset();
Expand Down
Loading
Loading