diff --git a/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp b/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp index 63c807705..73329f348 100644 --- a/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp +++ b/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp @@ -22,9 +22,6 @@ struct FDv2SourceResult { */ struct ChangeSet { data_model::ChangeSet change_set; - /** If true, the server signaled that the client should fall back to - * FDv1. */ - bool fdv1_fallback; }; /** @@ -32,7 +29,6 @@ struct FDv2SourceResult { */ struct Interrupted { ErrorInfo error; - bool fdv1_fallback; }; /** @@ -40,7 +36,6 @@ struct FDv2SourceResult { */ struct TerminalError { ErrorInfo error; - bool fdv1_fallback; }; /** @@ -53,13 +48,18 @@ struct FDv2SourceResult { */ struct Goodbye { std::optional reason; - bool fdv1_fallback; }; using Value = std::variant; Value value; + + /** + * If true, the server signaled (via the X-LD-FD-Fallback response header) + * that the client should fall back to FDv1. + */ + bool fdv1_fallback = false; }; } // namespace launchdarkly::server_side::data_interfaces diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp index 54b9dc867..260153737 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -11,6 +12,7 @@ namespace launchdarkly::server_side::data_systems { static char const* const kFDv2PollPath = "/sdk/poll"; +static char const* const kFDv1FallbackHeader = "X-LD-FD-Fallback"; static char const* const kErrorParsingBody = "Could not parse FDv2 polling response"; @@ -32,6 +34,12 @@ static ErrorInfo MakeError(ErrorKind kind, std::chrono::system_clock::now()}; } +static bool ReadFDv1FallbackDirective( + network::HttpResult::HeadersType const& headers) { + auto const it = headers.find(kFDv1FallbackHeader); + return it != headers.end() && boost::iequals(it->second, "true"); +} + network::HttpRequest MakeFDv2PollRequest( config::built::ServiceEndpoints const& endpoints, config::built::HttpProperties const& http_properties, @@ -88,15 +96,13 @@ static FDv2SourceResult ParseFDv2PollEvents( auto typed = TranslateChangeSet(*change_set, logger); if (!typed) { return FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kInvalidData, 0, kErrorTranslation), - false}}; + MakeError(ErrorKind::kInvalidData, 0, kErrorTranslation)}}; } return FDv2SourceResult{ - FDv2SourceResult::ChangeSet{std::move(*typed), false}}; + FDv2SourceResult::ChangeSet{std::move(*typed)}}; } if (auto* goodbye = std::get_if(&result)) { - return FDv2SourceResult{ - FDv2SourceResult::Goodbye{goodbye->reason, false}}; + return FDv2SourceResult{FDv2SourceResult::Goodbye{goodbye->reason}}; } if (auto* error = std::get_if(&result)) { if (error->kind == FDv2ProtocolHandler::Error::Kind::kServerError) { @@ -107,16 +113,15 @@ static FDv2SourceResult ParseFDv2PollEvents( id.value_or("") + "' with reason: '" + error->message + "'. Automatic retry will occur."; return FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kErrorResponse, 0, std::move(msg)), - false}}; + MakeError(ErrorKind::kErrorResponse, 0, std::move(msg))}}; } return FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kInvalidData, 0, error->message), false}}; + MakeError(ErrorKind::kInvalidData, 0, error->message)}}; } } return FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kInvalidData, 0, kErrorIncompletePayload), false}}; + MakeError(ErrorKind::kInvalidData, 0, kErrorIncompletePayload)}}; } static FDv2SourceResult ParseFDv2PollResponse( @@ -127,25 +132,25 @@ static FDv2SourceResult ParseFDv2PollResponse( auto parsed = boost::json::parse(body, ec); if (ec) { return FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), false}}; + MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody)}}; } auto const* obj = parsed.if_object(); if (!obj) { return FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), false}}; + MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody)}}; } auto const* events_val = obj->if_contains("events"); if (!events_val) { return FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), false}}; + MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents)}}; } auto const* events_arr = events_val->if_array(); if (!events_arr) { return FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), false}}; + MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents)}}; } return ParseFDv2PollEvents(*events_arr, protocol_handler, logger); @@ -161,24 +166,28 @@ data_interfaces::FDv2SourceResult HandleFDv2PollResponse( std::string error_msg = msg.has_value() ? *msg : "unknown error"; LD_LOG(logger, LogLevel::kWarn) << identity << ": " << error_msg; return FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kNetworkError, 0, std::move(error_msg)), - false}}; + MakeError(ErrorKind::kNetworkError, 0, std::move(error_msg))}}; } + bool const fdv1_fallback = ReadFDv1FallbackDirective(res.Headers()); + if (res.Status() == 304) { - return FDv2SourceResult{FDv2SourceResult::ChangeSet{ - data_model::ChangeSet{ - data_model::ChangeSetType::kNone, {}, data_model::Selector{}}, - false}}; + return FDv2SourceResult{ + FDv2SourceResult::ChangeSet{ + data_model::ChangeSet{ + data_model::ChangeSetType::kNone, + {}, + data_model::Selector{}}}, + fdv1_fallback}; } if (res.Status() == 200) { auto const& body = res.Body(); if (!body) { - return FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kInvalidData, 0, - "polling response contained no body"), - false}}; + return FDv2SourceResult{FDv2SourceResult::Interrupted{MakeError( + ErrorKind::kInvalidData, 0, + "polling response contained no body")}, + fdv1_fallback}; } auto result = ParseFDv2PollResponse(*body, protocol_handler, logger); @@ -192,6 +201,7 @@ data_interfaces::FDv2SourceResult HandleFDv2PollResponse( << identity << ": " << interrupted->error.Message(); } } + result.fdv1_fallback = fdv1_fallback; return result; } @@ -199,17 +209,19 @@ data_interfaces::FDv2SourceResult HandleFDv2PollResponse( std::string msg = network::ErrorForStatusCode( res.Status(), "FDv2 polling request", "will retry"); LD_LOG(logger, LogLevel::kWarn) << identity << ": " << msg; - return FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg)), - false}}; + return FDv2SourceResult{ + FDv2SourceResult::Interrupted{MakeError( + ErrorKind::kErrorResponse, res.Status(), std::move(msg))}, + fdv1_fallback}; } std::string msg = network::ErrorForStatusCode( res.Status(), "FDv2 polling request", std::nullopt); LD_LOG(logger, LogLevel::kError) << identity << ": " << msg; - return FDv2SourceResult{FDv2SourceResult::TerminalError{ - MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg)), - false}}; + return FDv2SourceResult{ + FDv2SourceResult::TerminalError{ + MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg))}, + fdv1_fallback}; } } // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp index 174edd040..05620161d 100644 --- a/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp @@ -37,8 +37,7 @@ async::Future FDv2PollingInitializer::Run() { FDv2SourceResult{FDv2SourceResult::TerminalError{ ErrorInfo{ErrorInfo::ErrorKind::kUnknown, 0, "invalid polling endpoint URL", - std::chrono::system_clock::now()}, - false}}); + std::chrono::system_clock::now()}}}); } // Promisify the callback-based HTTP request. diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp index e4a5f0a6f..a87a648f2 100644 --- a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp @@ -3,6 +3,7 @@ #include +#include #include #include #include @@ -67,8 +68,7 @@ void FDv2StreamingSynchronizer::State::EnsureStarted( << kIdentity << ": could not parse streaming endpoint URL"; Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ MakeError(ErrorKind::kNetworkError, 0, - "could not parse streaming endpoint URL"), - false}}); + "could not parse streaming endpoint URL")}}); return; } @@ -116,6 +116,12 @@ void FDv2StreamingSynchronizer::State::EnsureStarted( s->OnConnect(req); } }); + client_builder.on_response( + [weak](boost::beast::http::response_header<> const& headers) { + if (auto s = weak.lock()) { + s->OnResponse(headers); + } + }); client_builder.receiver([weak](sse::Event const& event) { if (auto s = weak.lock()) { s->OnEvent(event); @@ -137,10 +143,8 @@ void FDv2StreamingSynchronizer::State::EnsureStarted( // started_ intentionally left true: same reasoning as above. LD_LOG(logger_, LogLevel::kError) << kIdentity << ": could not build SSE client"; - Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ - MakeError(ErrorKind::kNetworkError, 0, - "could not build SSE client"), - false}}); + Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{MakeError( + ErrorKind::kNetworkError, 0, "could not build SSE client")}}); return; } @@ -169,6 +173,15 @@ void FDv2StreamingSynchronizer::State::OnConnect(HttpRequest* req) { req->target(u.encoded_target()); } +void FDv2StreamingSynchronizer::State::OnResponse( + HttpResponseHeader const& headers) { + auto const it = headers.find("X-LD-FD-Fallback"); + bool const directive = + it != headers.end() && boost::iequals(it->value(), "true"); + std::lock_guard lock(mutex_); + latest_fdv1_fallback_ = directive; +} + void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) { boost::system::error_code ec; auto data = boost::json::parse(event.data(), ec); @@ -177,7 +190,7 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) { std::string msg = "could not parse FDv2 streaming event payload"; LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg; Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kInvalidData, 0, std::move(msg)), false}}); + MakeError(ErrorKind::kInvalidData, 0, std::move(msg))}}); return; } @@ -195,21 +208,20 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) { "FDv2 streaming changeset could not be translated"; LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg; - Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kInvalidData, 0, std::move(msg)), - false}}); + Notify(FDv2SourceResult{ + FDv2SourceResult::Interrupted{MakeError( + ErrorKind::kInvalidData, 0, std::move(msg))}}); return; } Notify(FDv2SourceResult{ - FDv2SourceResult::ChangeSet{std::move(*typed), false}}); + FDv2SourceResult::ChangeSet{std::move(*typed)}}); } else if constexpr (std::is_same_v) { LD_LOG(logger_, LogLevel::kInfo) << kIdentity << ": Goodbye was received from the LaunchDarkly " "connection with reason: '" << r.reason.value_or("") << "'."; - Notify(FDv2SourceResult{ - FDv2SourceResult::Goodbye{r.reason, false}}); + Notify(FDv2SourceResult{FDv2SourceResult::Goodbye{r.reason}}); // Drop the current connection and reconnect; the protocol // handler is reset so the new connection starts in a clean // state. @@ -229,15 +241,15 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) { "'. Automatic retry will occur."; LD_LOG(logger_, LogLevel::kInfo) << kIdentity << ": " << msg; - Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kErrorResponse, 0, std::move(msg)), - false}}); + Notify(FDv2SourceResult{ + FDv2SourceResult::Interrupted{MakeError( + ErrorKind::kErrorResponse, 0, std::move(msg))}}); return; } LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << r.message; Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kInvalidData, 0, r.message), false}}); + MakeError(ErrorKind::kInvalidData, 0, r.message)}}); } else { static_assert(always_false_v, "non-exhaustive visitor"); } @@ -253,7 +265,7 @@ void FDv2StreamingSynchronizer::State::OnError(sse::Error const& error) { if (sse::IsRecoverable(error)) { LD_LOG(logger_, LogLevel::kWarn) << kIdentity << ": " << msg; Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{ - MakeError(ErrorKind::kNetworkError, 0, std::move(msg)), false}}); + MakeError(ErrorKind::kNetworkError, 0, std::move(msg))}}); return; } @@ -261,23 +273,22 @@ void FDv2StreamingSynchronizer::State::OnError(sse::Error const& error) { if (auto const* client_error = std::get_if(&error)) { - Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ - MakeError( - ErrorKind::kErrorResponse, - static_cast(client_error->status), - std::move(msg)), - false}}); + Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{MakeError( + ErrorKind::kErrorResponse, + static_cast(client_error->status), + std::move(msg))}}); return; } Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{ - MakeError(ErrorKind::kNetworkError, 0, std::move(msg)), false}}); + MakeError(ErrorKind::kNetworkError, 0, std::move(msg))}}); } void FDv2StreamingSynchronizer::State::Notify(FDv2SourceResult result) { std::optional> promise; { std::lock_guard lock(mutex_); + result.fdv1_fallback = latest_fdv1_fallback_; if (pending_promise_) { promise = std::move(pending_promise_); pending_promise_.reset(); diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp index 95974ca87..1abed1f64 100644 --- a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp @@ -106,6 +106,7 @@ class FDv2StreamingSynchronizer final private: using HttpRequest = boost::beast::http::request; + using HttpResponseHeader = boost::beast::http::response_header<>; /** * Starts the SSE client if not already running, and updates the stored @@ -121,13 +122,9 @@ class FDv2StreamingSynchronizer final */ void Notify(data_interfaces::FDv2SourceResult result); - /** - * Per-connect hook for the SSE client. Overwrites the request target - * with the streaming path plus query parameters built from the latest - * stored selector and the (immutable) filter key. - */ + // SSE client callbacks. void OnConnect(HttpRequest* req); - + void OnResponse(HttpResponseHeader const& headers); void OnEvent(sse::Event const& event); void OnError(sse::Error const& error); @@ -149,6 +146,8 @@ class FDv2StreamingSynchronizer final std::mutex mutex_; bool started_ = false; bool closed_ = false; + // FDv1 fallback directive from the most recent SSE response. + bool latest_fdv1_fallback_ = false; data_model::Selector latest_selector_; std::optional base_url_; std::shared_ptr sse_client_; diff --git a/libs/server-sdk/tests/conditions_test.cpp b/libs/server-sdk/tests/conditions_test.cpp index d32ca8522..4a9976055 100644 --- a/libs/server-sdk/tests/conditions_test.cpp +++ b/libs/server-sdk/tests/conditions_test.cpp @@ -54,7 +54,6 @@ TEST(FallbackConditionTest, InterruptedArmsTimerWhichFiresAfterTimeout) { FDv2SourceResult::ErrorInfo{ FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, /*status_code=*/0, "boom", std::chrono::system_clock::now()}, - false, }}); auto result = future.WaitForResult(1s); @@ -74,7 +73,6 @@ TEST(FallbackConditionTest, ChangeSetCancelsActiveTimer) { FDv2SourceResult::ErrorInfo{ FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, /*status_code=*/0, "boom", std::chrono::system_clock::now()}, - false, }}); condition.Inform(FDv2SourceResult{FDv2SourceResult::ChangeSet{ launchdarkly::data_model::ChangeSet{ @@ -82,7 +80,6 @@ TEST(FallbackConditionTest, ChangeSetCancelsActiveTimer) { {}, launchdarkly::data_model::Selector{}, }, - false, }}); // Wait well past the 100ms threshold; future should remain unresolved. @@ -99,7 +96,6 @@ TEST(FallbackConditionTest, CloseCancelsActiveTimerAndResolvesWithCancelled) { FDv2SourceResult::ErrorInfo{ FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, /*status_code=*/0, "boom", std::chrono::system_clock::now()}, - false, }}); condition.Close(); @@ -133,7 +129,6 @@ TEST(RecoveryConditionTest, InformDoesNotAffectTimer) { FDv2SourceResult::ErrorInfo{ FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, /*status_code=*/0, "boom", std::chrono::system_clock::now()}, - false, }}); condition.Inform(FDv2SourceResult{FDv2SourceResult::ChangeSet{ launchdarkly::data_model::ChangeSet{ @@ -141,7 +136,6 @@ TEST(RecoveryConditionTest, InformDoesNotAffectTimer) { {}, launchdarkly::data_model::Selector{}, }, - false, }}); auto result = future.WaitForResult(1s); @@ -206,7 +200,6 @@ TEST(ConditionsTest, InformForwardsToAllUnderlyingConditions) { FDv2SourceResult::ErrorInfo{ FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, /*status_code=*/0, "boom", std::chrono::system_clock::now()}, - false, }}); auto result = conditions.GetFuture().WaitForResult(1s); diff --git a/libs/server-sdk/tests/fdv2_data_system_test.cpp b/libs/server-sdk/tests/fdv2_data_system_test.cpp index e11fc7a15..48d9b49d1 100644 --- a/libs/server-sdk/tests/fdv2_data_system_test.cpp +++ b/libs/server-sdk/tests/fdv2_data_system_test.cpp @@ -236,7 +236,6 @@ FDv2SourceResult MakeFullChangeSetResult(std::vector items, std::move(items), std::move(selector), }, - false, }}; } @@ -339,7 +338,6 @@ TEST(FDv2DataSystemTest, InitializerInterrupted_AdvancesToNextInitializer) { FDv2SourceResult::ErrorInfo{ FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom", std::chrono::system_clock::now()}, - false, }}); auto first_factory = std::make_unique(std::move(first)); @@ -413,7 +411,6 @@ TEST(FDv2DataSystemTest, }, MakeSelector(1, "state-1"), }, - false, }}); auto second_factory = std::make_unique(std::move(second)); @@ -487,7 +484,6 @@ TEST(FDv2DataSystemTest, InitializerOnly_AllFail_TransitionsToOff) { FDv2SourceResult::ErrorInfo{ FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "fail", std::chrono::system_clock::now()}, - false, }}); std::vector> initializers; @@ -561,7 +557,7 @@ TEST(FDv2DataSystemTest, SynchronizerGoodbye_StaysOnSameSynchronizer) { // (reconnecting); the orchestrator must NOT rotate. auto first = std::make_unique(std::vector{ - FDv2SourceResult{FDv2SourceResult::Goodbye{std::nullopt, false}}}); + FDv2SourceResult{FDv2SourceResult::Goodbye{std::nullopt}}}); auto first_factory = std::make_unique(std::move(first)); auto* first_factory_ptr = first_factory.get(); @@ -604,7 +600,6 @@ TEST(FDv2DataSystemTest, SynchronizerInterrupted_RetriesSameSynchronizer) { FDv2SourceResult::ErrorInfo{ FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "transient", std::chrono::system_clock::now()}, - false, }}); results.push_back( MakeFullChangeSetResult(ChangeSetData{}, MakeSelector(1, "state-1"))); @@ -654,7 +649,6 @@ TEST(FDv2DataSystemTest, SynchronizerNext_ReceivesUpdatedSelector) { ChangeSetData{}, MakeSelector(2, "state-2"), }, - false, }}); auto sync = std::make_unique( std::move(results), /*closed_flag=*/nullptr, &next_calls); @@ -711,10 +705,9 @@ TEST(FDv2DataSystemTest, SynchronizerGoodbye_PreservesSelectorOnNextCall) { ChangeSetData{}, MakeSelector(2, "state-2"), }, - false, }}); results.push_back( - FDv2SourceResult{FDv2SourceResult::Goodbye{std::nullopt, false}}); + FDv2SourceResult{FDv2SourceResult::Goodbye{std::nullopt}}); auto sync = std::make_unique( std::move(results), /*closed_flag=*/nullptr, &next_calls); @@ -762,7 +755,6 @@ TEST(FDv2DataSystemTest, FDv2SourceResult::ErrorInfo{ FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, 401, "unauthorized", std::chrono::system_clock::now()}, - false, }}); auto first = std::make_unique(std::move(first_results)); auto first_factory = @@ -811,7 +803,6 @@ TEST(FDv2DataSystemTest, SynchronizerCycledExhaustion_TransitionsToOff) { FDv2SourceResult::ErrorInfo{ FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, 401, "unauthorized", std::chrono::system_clock::now()}, - false, }}}); std::vector> synchronizers; @@ -850,7 +841,6 @@ TEST(FDv2DataSystemTest, FallbackConditionFires_AdvancesToNextSynchronizer) { FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, /*status_code=*/0, "boom", std::chrono::system_clock::now()}, - false, }}, }, /*closed_flag=*/nullptr, /*next_calls=*/nullptr, @@ -901,7 +891,6 @@ TEST(FDv2DataSystemTest, FallbackConditionOnLastSynchronizerWrapsToPrimary) { FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, /*status_code=*/0, "boom", std::chrono::system_clock::now()}, - false, }}, }, /*closed_flag=*/nullptr, /*next_calls=*/nullptr, @@ -919,7 +908,6 @@ TEST(FDv2DataSystemTest, FallbackConditionOnLastSynchronizerWrapsToPrimary) { FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, /*status_code=*/0, "boom", std::chrono::system_clock::now()}, - false, }}, }, /*closed_flag=*/nullptr, /*next_calls=*/nullptr, @@ -965,7 +953,6 @@ TEST(FDv2DataSystemTest, RecoveryConditionResetsToFirstAvailable) { FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, /*status_code=*/0, "boom", std::chrono::system_clock::now()}, - false, }}, }, /*closed_flag=*/nullptr, /*next_calls=*/nullptr, @@ -983,7 +970,6 @@ TEST(FDv2DataSystemTest, RecoveryConditionResetsToFirstAvailable) { FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, /*status_code=*/0, "boom", std::chrono::system_clock::now()}, - false, }}, }, /*closed_flag=*/nullptr, /*next_calls=*/nullptr, @@ -1031,7 +1017,6 @@ TEST(FDv2DataSystemTest, TerminalErrorsOnEverySynchronizerExhaustToOff) { FDv2SourceResult::ErrorInfo{ FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, 401, "unauthorized", std::chrono::system_clock::now()}, - false, }}}); auto primary_factory = std::make_unique(std::move(primary_sync)); @@ -1042,7 +1027,6 @@ TEST(FDv2DataSystemTest, TerminalErrorsOnEverySynchronizerExhaustToOff) { FDv2SourceResult::ErrorInfo{ FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, 401, "unauthorized", std::chrono::system_clock::now()}, - false, }}}); auto secondary_factory = std::make_unique(std::move(secondary_sync)); @@ -1080,7 +1064,6 @@ TEST(FDv2DataSystemTest, SingleSynchronizerHasNoFallbackArmed) { FDv2SourceResult::ErrorInfo{ FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, /*status_code=*/0, "boom", std::chrono::system_clock::now()}, - false, }}); auto sync = std::make_unique( std::move(results), /*closed_flag=*/nullptr, /*next_calls=*/nullptr, diff --git a/libs/server-sdk/tests/fdv2_polling_impl_test.cpp b/libs/server-sdk/tests/fdv2_polling_impl_test.cpp new file mode 100644 index 000000000..b12407b5e --- /dev/null +++ b/libs/server-sdk/tests/fdv2_polling_impl_test.cpp @@ -0,0 +1,123 @@ +#include + +#include + +#include +#include +#include + +using namespace launchdarkly; +using namespace launchdarkly::server_side::data_interfaces; +using namespace launchdarkly::server_side::data_systems; + +static Logger MakeNullLogger() { + struct NullBackend : ILogBackend { + bool Enabled(LogLevel) noexcept override { return false; } + void Write(LogLevel, std::string) noexcept override {} + }; + return Logger{std::make_shared()}; +} + +TEST(HandleFDv2PollResponseTest, NotModifiedSetsFdv1FallbackWhenHeaderPresent) { + auto logger = MakeNullLogger(); + FDv2ProtocolHandler handler; + + network::HttpResult::HeadersType headers{{"X-LD-FD-Fallback", "true"}}; + network::HttpResult res{304, std::nullopt, std::move(headers)}; + + auto result = HandleFDv2PollResponse(res, &handler, logger, "test"); + + EXPECT_TRUE( + std::holds_alternative(result.value)); + EXPECT_TRUE(result.fdv1_fallback); +} + +TEST(HandleFDv2PollResponseTest, FlagIsFalseWhenHeaderAbsent) { + auto logger = MakeNullLogger(); + FDv2ProtocolHandler handler; + + network::HttpResult res{304, std::nullopt, {}}; + + auto result = HandleFDv2PollResponse(res, &handler, logger, "test"); + + EXPECT_FALSE(result.fdv1_fallback); +} + +TEST(HandleFDv2PollResponseTest, RecoverableErrorPropagatesFlag) { + auto logger = MakeNullLogger(); + FDv2ProtocolHandler handler; + + network::HttpResult::HeadersType headers{{"x-ld-fd-fallback", "true"}}; + network::HttpResult res{400, std::nullopt, std::move(headers)}; + + auto result = HandleFDv2PollResponse(res, &handler, logger, "test"); + + EXPECT_TRUE( + std::holds_alternative(result.value)); + EXPECT_TRUE(result.fdv1_fallback); +} + +TEST(HandleFDv2PollResponseTest, TerminalErrorPropagatesFlag) { + auto logger = MakeNullLogger(); + FDv2ProtocolHandler handler; + + network::HttpResult::HeadersType headers{{"X-LD-FD-Fallback", "true"}}; + network::HttpResult res{401, std::nullopt, std::move(headers)}; + + auto result = HandleFDv2PollResponse(res, &handler, logger, "test"); + + EXPECT_TRUE( + std::holds_alternative(result.value)); + EXPECT_TRUE(result.fdv1_fallback); +} + +TEST(HandleFDv2PollResponseTest, OkWithMissingBodyPropagatesFlag) { + auto logger = MakeNullLogger(); + FDv2ProtocolHandler handler; + + network::HttpResult::HeadersType headers{{"X-LD-FD-Fallback", "true"}}; + network::HttpResult res{200, std::nullopt, std::move(headers)}; + + auto result = HandleFDv2PollResponse(res, &handler, logger, "test"); + + EXPECT_TRUE( + std::holds_alternative(result.value)); + EXPECT_TRUE(result.fdv1_fallback); +} + +TEST(HandleFDv2PollResponseTest, HeaderValueIsCaseInsensitive) { + auto logger = MakeNullLogger(); + FDv2ProtocolHandler handler; + + network::HttpResult::HeadersType headers{{"X-LD-FD-Fallback", "TRUE"}}; + network::HttpResult res{304, std::nullopt, std::move(headers)}; + + auto result = HandleFDv2PollResponse(res, &handler, logger, "test"); + + EXPECT_TRUE(result.fdv1_fallback); +} + +TEST(HandleFDv2PollResponseTest, HeaderValueOtherThanTrueDoesNotSetFlag) { + auto logger = MakeNullLogger(); + FDv2ProtocolHandler handler; + + network::HttpResult::HeadersType headers{{"X-LD-FD-Fallback", "false"}}; + network::HttpResult res{304, std::nullopt, std::move(headers)}; + + auto result = HandleFDv2PollResponse(res, &handler, logger, "test"); + + EXPECT_FALSE(result.fdv1_fallback); +} + +TEST(HandleFDv2PollResponseTest, NetworkErrorDoesNotSetFlag) { + auto logger = MakeNullLogger(); + FDv2ProtocolHandler handler; + + network::HttpResult res{std::optional{"connection refused"}}; + + auto result = HandleFDv2PollResponse(res, &handler, logger, "test"); + + EXPECT_TRUE( + std::holds_alternative(result.value)); + EXPECT_FALSE(result.fdv1_fallback); +} diff --git a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp index 5a0f166f5..b8e9b017f 100644 --- a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp +++ b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp @@ -41,6 +41,11 @@ class FDv2StreamingSynchronizerTestPeer { boost::beast::http::request* req) { sync.state_->OnConnect(req); } + static void OnResponse( + FDv2StreamingSynchronizer& sync, + boost::beast::http::response_header<> const& headers) { + sync.state_->OnResponse(headers); + } static void MarkStarted(FDv2StreamingSynchronizer& sync) { std::lock_guard lock(sync.state_->mutex_); sync.state_->started_ = true; @@ -675,3 +680,74 @@ TEST(FDv2StreamingSynchronizerTest, RecoverableReadTimeoutReturnsInterrupted) { EXPECT_EQ(interrupted->error.Kind(), FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError); } + +// ============================================================================ +// FDv1 fallback directive +// ============================================================================ + +TEST(FDv2StreamingSynchronizerTest, OnResponseDirectivePropagatesToChangeSet) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + boost::beast::http::response_header<> headers; + headers.result(200); + headers.set("X-LD-FD-Fallback", "true"); + + FDv2StreamingSynchronizerTestPeer::OnResponse(synchronizer, headers); + FDv2StreamingSynchronizerTestPeer::OnEvent( + synchronizer, + sse::Event("server-intent", R"({"payloads":[{"id":"p1","target":1,)" + R"("intentCode":"xfer-full"}]})")); + FDv2StreamingSynchronizerTestPeer::OnEvent( + synchronizer, + sse::Event( + "put-object", + R"({"version":1,"kind":"flag","key":"my-flag","object":)" + R"({"key":"my-flag","on":true,"fallthrough":{"variation":0},)" + R"("variations":[true,false],"version":1}})")); + FDv2StreamingSynchronizerTestPeer::OnEvent( + synchronizer, + sse::Event("payload-transferred", R"({"state":"abc","version":1})")); + + auto result = synchronizer.Next(data_model::Selector{}).WaitForResult(2s); + + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE( + std::holds_alternative(result->value)); + EXPECT_TRUE(result->fdv1_fallback); +} + +TEST(FDv2StreamingSynchronizerTest, SecondResponseWithoutDirectiveClearsFlag) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + boost::beast::http::response_header<> first; + first.result(200); + first.set("X-LD-FD-Fallback", "true"); + FDv2StreamingSynchronizerTestPeer::OnResponse(synchronizer, first); + + boost::beast::http::response_header<> second; + second.result(200); + FDv2StreamingSynchronizerTestPeer::OnResponse(synchronizer, second); + + FDv2StreamingSynchronizerTestPeer::OnError( + synchronizer, + sse::Error{sse::errors::ReadTimeout{std::chrono::milliseconds(0)}}); + + auto result = synchronizer.Next(data_model::Selector{}).WaitForResult(2s); + + ASSERT_TRUE(result.has_value()); + EXPECT_FALSE(result->fdv1_fallback); +}