diff --git a/.github/workflows/server.yml b/.github/workflows/server.yml index 7740e1585..3542c8b50 100644 --- a/.github/workflows/server.yml +++ b/.github/workflows/server.yml @@ -32,7 +32,6 @@ jobs: with: # Inform the test harness of test service's port. test_service_port: ${{ env.TEST_SERVICE_PORT }} - extra_params: '-skip-from ./contract-tests/server-contract-tests/test-suppressions.txt' token: ${{ secrets.GITHUB_TOKEN }} contract-tests-curl: @@ -54,7 +53,6 @@ jobs: with: # Inform the test harness of test service's port. test_service_port: ${{ env.TEST_SERVICE_PORT }} - extra_params: '-skip-from ./contract-tests/server-contract-tests/test-suppressions.txt' token: ${{ secrets.GITHUB_TOKEN }} build-test-server: diff --git a/contract-tests/server-contract-tests/test-suppressions.txt b/contract-tests/server-contract-tests/test-suppressions.txt deleted file mode 100644 index 460d051db..000000000 --- a/contract-tests/server-contract-tests/test-suppressions.txt +++ /dev/null @@ -1,7 +0,0 @@ -# SC-204387 -streaming/validation/drop and reconnect if stream event has malformed JSON/put event -streaming/validation/drop and reconnect if stream event has malformed JSON/patch event -streaming/validation/drop and reconnect if stream event has malformed JSON/delete event -streaming/validation/drop and reconnect if stream event has well-formed JSON not matching schema/put event -streaming/validation/drop and reconnect if stream event has well-formed JSON not matching schema/patch event -streaming/validation/drop and reconnect if stream event has well-formed JSON not matching schema/delete event diff --git a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/event_handler.cpp b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/event_handler.cpp index 194fa895c..078f00ca8 100644 --- a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/event_handler.cpp +++ b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/event_handler.cpp @@ -41,6 +41,10 @@ tl::expected Patch( if (!data.has_value()) { return tl::unexpected(JsonError::kSchemaFailure); } + // Check if the optional is empty (indicates null data) + if (!data->has_value()) { + return tl::unexpected(JsonError::kSchemaFailure); + } return DataSourceEventHandler::Patch{ TStreamingDataKind::Key(path), data_model::ItemDescriptor(data->value())}; diff --git a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp index 4c433c12b..5f7a27a7b 100644 --- a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp +++ b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp @@ -127,9 +127,18 @@ void StreamingDataSource::StartAsync( client_builder.receiver([weak_self](launchdarkly::sse::Event const& event) { if (auto self = weak_self.lock()) { - self->event_handler_->HandleMessage(event.type(), event.data()); - // TODO: Use the result of handle message to restart the - // event source if we got bad data. sc-204387 + auto status = + self->event_handler_->HandleMessage(event.type(), event.data()); + if (status == DataSourceEventHandler::MessageStatus::kInvalidMessage) { + // Invalid data received - restart the connection with backoff + // to get a fresh stream. The backoff mechanism prevents rapid + // reconnection attempts. + LD_LOG(self->logger_, LogLevel::kWarn) + << "Received invalid data from stream, restarting connection"; + if (self->client_) { + self->client_->async_restart("invalid data in stream"); + } + } } }); diff --git a/libs/server-sdk/tests/data_source_event_handler_test.cpp b/libs/server-sdk/tests/data_source_event_handler_test.cpp index 90c43ba66..d2591fe8e 100644 --- a/libs/server-sdk/tests/data_source_event_handler_test.cpp +++ b/libs/server-sdk/tests/data_source_event_handler_test.cpp @@ -203,3 +203,276 @@ TEST(DataSourceEventHandlerTests, HandlesDeleteSegment) { ASSERT_FALSE(store->GetSegment("segmentA")->item); } + +TEST(DataSourceEventHandlerTests, HandlesPatchWithNullDataForFlag) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Initialize the store + event_handler.HandleMessage("put", R"({"path":"/", "data":{}})"); + + // Null data should be treated as invalid, not crash the application + auto res = event_handler.HandleMessage( + "patch", R"({"path": "/flags/flagA", "data": null})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); + // The error should be recorded, but we stay in Valid state after a previous successful PUT + EXPECT_EQ(DataSourceStatus::DataSourceState::kValid, + manager.Status().State()); + ASSERT_TRUE(manager.Status().LastError().has_value()); + EXPECT_EQ(DataSourceStatus::ErrorInfo::ErrorKind::kInvalidData, + manager.Status().LastError()->Kind()); +} + +TEST(DataSourceEventHandlerTests, HandlesPatchWithNullDataForSegment) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Initialize the store + event_handler.HandleMessage("put", R"({"path":"/", "data":{}})"); + + // Null data should be treated as invalid, not crash the application + auto res = event_handler.HandleMessage( + "patch", R"({"path": "/segments/segmentA", "data": null})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); + // The error should be recorded, but we stay in Valid state after a previous successful PUT + EXPECT_EQ(DataSourceStatus::DataSourceState::kValid, + manager.Status().State()); + ASSERT_TRUE(manager.Status().LastError().has_value()); + EXPECT_EQ(DataSourceStatus::ErrorInfo::ErrorKind::kInvalidData, + manager.Status().LastError()->Kind()); +} + +TEST(DataSourceEventHandlerTests, HandlesPatchWithMissingDataField) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Initialize the store + event_handler.HandleMessage("put", R"({"path":"/", "data":{}})"); + + // Missing data field should also be treated as invalid + auto res = event_handler.HandleMessage( + "patch", R"({"path": "/flags/flagA"})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); +} + +TEST(DataSourceEventHandlerTests, HandlesPutWithNullData) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // PUT with null data should also be handled safely + auto res = event_handler.HandleMessage( + "put", R"({"path":"/", "data": null})"); + + // PUT handles this differently - it may succeed with empty data + // The important thing is it doesn't crash + ASSERT_TRUE(res == DataSourceEventHandler::MessageStatus::kMessageHandled || + res == DataSourceEventHandler::MessageStatus::kInvalidMessage); +} + +// Tests for wrong data types (schema validation errors) + +TEST(DataSourceEventHandlerTests, HandlesPatchWithBooleanData) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Initialize the store + event_handler.HandleMessage("put", R"({"path":"/", "data":{}})"); + + // Boolean data instead of object should be treated as invalid + auto res = event_handler.HandleMessage( + "patch", R"({"path": "/flags/flagA", "data": true})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); +} + +TEST(DataSourceEventHandlerTests, HandlesPatchWithStringData) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Initialize the store + event_handler.HandleMessage("put", R"({"path":"/", "data":{}})"); + + // String data instead of object should be treated as invalid + auto res = event_handler.HandleMessage( + "patch", R"({"path": "/flags/flagA", "data": "not an object"})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); +} + +TEST(DataSourceEventHandlerTests, HandlesPatchWithArrayData) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Initialize the store + event_handler.HandleMessage("put", R"({"path":"/", "data":{}})"); + + // Array data instead of object should be treated as invalid + auto res = event_handler.HandleMessage( + "patch", R"({"path": "/flags/flagA", "data": []})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); +} + +TEST(DataSourceEventHandlerTests, HandlesPatchWithNumberData) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Initialize the store + event_handler.HandleMessage("put", R"({"path":"/", "data":{}})"); + + // Number data instead of object should be treated as invalid + auto res = event_handler.HandleMessage( + "patch", R"({"path": "/flags/flagA", "data": 42})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); +} + +TEST(DataSourceEventHandlerTests, HandlesDeleteWithStringVersion) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Initialize the store + event_handler.HandleMessage("put", R"({"path":"/", "data":{}})"); + + // String version instead of number should be treated as invalid + auto res = event_handler.HandleMessage( + "delete", R"({"path": "/flags/flagA", "version": "not a number"})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); +} + +TEST(DataSourceEventHandlerTests, HandlesPutWithInvalidFlagsType) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Flags should be an object, not a boolean + auto res = event_handler.HandleMessage( + "put", R"({"path": "/", "data": {"flags": true, "segments": {}}})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); +} + +TEST(DataSourceEventHandlerTests, HandlesPutWithInvalidSegmentsType) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Segments should be an object, not an array + auto res = event_handler.HandleMessage( + "put", R"({"path": "/", "data": {"flags": {}, "segments": []}})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); +} + +// Tests for additional malformed JSON variants + +TEST(DataSourceEventHandlerTests, HandlesUnterminatedString) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Unterminated string should be treated as malformed JSON + auto res = event_handler.HandleMessage( + "patch", R"({"path": "/flags/x", "data": "unterminated)"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); +} + +TEST(DataSourceEventHandlerTests, HandlesTrailingComma) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Trailing comma should be treated as malformed JSON + auto res = event_handler.HandleMessage( + "patch", R"({"path": "/flags/x", "data": {},})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); +} + +// Tests for missing required fields + +TEST(DataSourceEventHandlerTests, HandlesDeleteWithMissingPath) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Initialize the store + event_handler.HandleMessage("put", R"({"path":"/", "data":{}})"); + + // Missing path field should be treated as invalid + auto res = event_handler.HandleMessage( + "delete", R"({"version": 1})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); +} + +TEST(DataSourceEventHandlerTests, HandlesDeleteWithMissingVersion) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Initialize the store + event_handler.HandleMessage("put", R"({"path":"/", "data":{}})"); + + // Missing version field should be treated as invalid + auto res = event_handler.HandleMessage( + "delete", R"({"path": "/flags/flagA"})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res); +} + +TEST(DataSourceEventHandlerTests, HandlesPutWithMissingPath) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Missing/empty path is treated as unrecognized (safely ignored) + // This provides forward compatibility + auto res = event_handler.HandleMessage( + "put", R"({"data": {}})"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kMessageHandled, res); +} + +TEST(DataSourceEventHandlerTests, HandlesEmptyJsonObject) { + auto logger = launchdarkly::logging::NullLogger(); + auto store = std::make_shared(); + DataSourceStatusManager manager; + DataSourceEventHandler event_handler(*store, logger, manager); + + // Empty JSON object with missing path is treated as unrecognized (safely ignored) + // This provides forward compatibility with future event types + auto res = event_handler.HandleMessage("patch", "{}"); + + ASSERT_EQ(DataSourceEventHandler::MessageStatus::kMessageHandled, res); +} diff --git a/libs/server-sent-events/include/launchdarkly/sse/client.hpp b/libs/server-sent-events/include/launchdarkly/sse/client.hpp index ca0b93a7a..500467fd0 100644 --- a/libs/server-sent-events/include/launchdarkly/sse/client.hpp +++ b/libs/server-sent-events/include/launchdarkly/sse/client.hpp @@ -207,6 +207,15 @@ class Client { virtual ~Client() = default; virtual void async_connect() = 0; virtual void async_shutdown(std::function completion) = 0; + + /** + * Restart the connection with exponential backoff. This should be called + * when the SDK detects invalid data from the stream and needs to + * reconnect. The backoff mechanism prevents rapid reconnection attempts + * that could overload the service. + * @param reason A description of why the restart was triggered (for logging) + */ + virtual void async_restart(std::string const& reason) = 0; }; } // namespace launchdarkly::sse diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index 46246f3b5..99e8fdb10 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -161,6 +161,29 @@ class FoxyClient : public Client, beast::bind_front_handler(&FoxyClient::do_run, shared_from_this())); } + void async_restart(std::string const& reason) override { + boost::asio::post( + session_->get_executor(), + beast::bind_front_handler(&FoxyClient::do_restart, + shared_from_this(), reason)); + } + + void do_restart(std::string const& reason) { + // Cancel any ongoing read operations + try { + if (session_->stream.is_ssl()) { + session_->stream.ssl().next_layer().cancel(); + } else { + session_->stream.plain().cancel(); + } + } catch (boost::system::system_error const& err) { + logger_("exception canceling stream during restart: " + + std::string(err.what())); + } + // Trigger backoff and reconnect + async_backoff(reason); + } + void do_run() { session_->async_connect( host_, port_, diff --git a/libs/server-sent-events/src/curl_client.cpp b/libs/server-sent-events/src/curl_client.cpp index c410db004..c99b6830a 100644 --- a/libs/server-sent-events/src/curl_client.cpp +++ b/libs/server-sent-events/src/curl_client.cpp @@ -76,6 +76,17 @@ void CurlClient::async_connect() { [self = shared_from_this()]() { self->do_run(); }); } +void CurlClient::async_restart(std::string const& reason) { + boost::asio::post(backoff_timer_.get_executor(), + [self = shared_from_this(), reason]() { + // Close the socket to abort the current transfer. + // CURL will detect the error and call the completion + // handler, which will trigger backoff and reconnection. + self->log_message("async_restart: aborting transfer due to " + reason); + self->request_context_->abort_transfer(); + }); +} + void CurlClient::do_run() { if (request_context_->is_shutting_down()) { return; diff --git a/libs/server-sent-events/src/curl_client.hpp b/libs/server-sent-events/src/curl_client.hpp index a3ab9e173..e210791d1 100644 --- a/libs/server-sent-events/src/curl_client.hpp +++ b/libs/server-sent-events/src/curl_client.hpp @@ -170,6 +170,21 @@ class CurlClient final : public Client, curl_socket_ = curl_socket; } + void abort_transfer() { + std::lock_guard lock(mutex_); + if (shutting_down_) { + return; + } + if (curl_socket_ != CURL_SOCKET_BAD) { +#ifdef _WIN32 + closesocket(curl_socket_); +#else + close(curl_socket_); +#endif + curl_socket_ = CURL_SOCKET_BAD; + } + } + void shutdown() { std::lock_guard lock(mutex_); shutting_down_ = true; @@ -232,6 +247,7 @@ class CurlClient final : public Client, void async_connect() override; void async_shutdown(std::function completion) override; + void async_restart(std::string const& reason) override; private: void do_run();