Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/server.yml
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tests now pass, so no suppressions are needed.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ jobs:
with:
# Inform the test harness of test service's port.
test_service_port: ${{ env.TEST_SERVICE_PORT }}
extra_params: '-skip-from ./contract-tests/server-contract-tests/test-suppressions.txt'
token: ${{ secrets.GITHUB_TOKEN }}

contract-tests-curl:
Expand All @@ -54,7 +53,6 @@ jobs:
with:
# Inform the test harness of test service's port.
test_service_port: ${{ env.TEST_SERVICE_PORT }}
extra_params: '-skip-from ./contract-tests/server-contract-tests/test-suppressions.txt'
token: ${{ secrets.GITHUB_TOKEN }}

build-test-server:
Expand Down
7 changes: 0 additions & 7 deletions contract-tests/server-contract-tests/test-suppressions.txt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ tl::expected<DataSourceEventHandler::Patch, JsonError> Patch(
if (!data.has_value()) {
return tl::unexpected(JsonError::kSchemaFailure);
}
// Check if the optional is empty (indicates null data)
if (!data->has_value()) {
Copy link
Member Author

@kinyoklion kinyoklion Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual root fix.

return tl::unexpected(JsonError::kSchemaFailure);
}
return DataSourceEventHandler::Patch{
TStreamingDataKind::Key(path),
data_model::ItemDescriptor<TData>(data->value())};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#include "streaming_data_source.hpp"

#include <launchdarkly/network/http_requester.hpp>
Expand Down Expand Up @@ -127,9 +127,18 @@

client_builder.receiver([weak_self](launchdarkly::sse::Event const& event) {
if (auto self = weak_self.lock()) {
self->event_handler_->HandleMessage(event.type(), event.data());
// TODO: Use the result of handle message to restart the
// event source if we got bad data. sc-204387
auto status =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Secondary handle strange JSON and reconnect.

self->event_handler_->HandleMessage(event.type(), event.data());
if (status == DataSourceEventHandler::MessageStatus::kInvalidMessage) {
// Invalid data received - restart the connection with backoff
// to get a fresh stream. The backoff mechanism prevents rapid
// reconnection attempts.
LD_LOG(self->logger_, LogLevel::kWarn)
<< "Received invalid data from stream, restarting connection";
if (self->client_) {
self->client_->async_restart("invalid data in stream");
}
}
}
});

Expand Down
273 changes: 273 additions & 0 deletions libs/server-sdk/tests/data_source_event_handler_test.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#include <gtest/gtest.h>

#include "data_components/memory_store/memory_store.hpp"
Expand Down Expand Up @@ -203,3 +203,276 @@

ASSERT_FALSE(store->GetSegment("segmentA")->item);
}

TEST(DataSourceEventHandlerTests, HandlesPatchWithNullDataForFlag) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Initialize the store
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");

// Null data should be treated as invalid, not crash the application
auto res = event_handler.HandleMessage(
"patch", R"({"path": "/flags/flagA", "data": null})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
// The error should be recorded, but we stay in Valid state after a previous successful PUT
EXPECT_EQ(DataSourceStatus::DataSourceState::kValid,
manager.Status().State());
ASSERT_TRUE(manager.Status().LastError().has_value());
EXPECT_EQ(DataSourceStatus::ErrorInfo::ErrorKind::kInvalidData,
manager.Status().LastError()->Kind());
}

TEST(DataSourceEventHandlerTests, HandlesPatchWithNullDataForSegment) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Initialize the store
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");

// Null data should be treated as invalid, not crash the application
auto res = event_handler.HandleMessage(
"patch", R"({"path": "/segments/segmentA", "data": null})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
// The error should be recorded, but we stay in Valid state after a previous successful PUT
EXPECT_EQ(DataSourceStatus::DataSourceState::kValid,
manager.Status().State());
ASSERT_TRUE(manager.Status().LastError().has_value());
EXPECT_EQ(DataSourceStatus::ErrorInfo::ErrorKind::kInvalidData,
manager.Status().LastError()->Kind());
}

TEST(DataSourceEventHandlerTests, HandlesPatchWithMissingDataField) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Initialize the store
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");

// Missing data field should also be treated as invalid
auto res = event_handler.HandleMessage(
"patch", R"({"path": "/flags/flagA"})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
}

TEST(DataSourceEventHandlerTests, HandlesPutWithNullData) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// PUT with null data should also be handled safely
auto res = event_handler.HandleMessage(
"put", R"({"path":"/", "data": null})");

// PUT handles this differently - it may succeed with empty data
// The important thing is it doesn't crash
ASSERT_TRUE(res == DataSourceEventHandler::MessageStatus::kMessageHandled ||
res == DataSourceEventHandler::MessageStatus::kInvalidMessage);
}

// Tests for wrong data types (schema validation errors)

TEST(DataSourceEventHandlerTests, HandlesPatchWithBooleanData) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Initialize the store
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");

// Boolean data instead of object should be treated as invalid
auto res = event_handler.HandleMessage(
"patch", R"({"path": "/flags/flagA", "data": true})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
}

TEST(DataSourceEventHandlerTests, HandlesPatchWithStringData) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Initialize the store
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");

// String data instead of object should be treated as invalid
auto res = event_handler.HandleMessage(
"patch", R"({"path": "/flags/flagA", "data": "not an object"})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
}

TEST(DataSourceEventHandlerTests, HandlesPatchWithArrayData) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Initialize the store
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");

// Array data instead of object should be treated as invalid
auto res = event_handler.HandleMessage(
"patch", R"({"path": "/flags/flagA", "data": []})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
}

TEST(DataSourceEventHandlerTests, HandlesPatchWithNumberData) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Initialize the store
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");

// Number data instead of object should be treated as invalid
auto res = event_handler.HandleMessage(
"patch", R"({"path": "/flags/flagA", "data": 42})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
}

TEST(DataSourceEventHandlerTests, HandlesDeleteWithStringVersion) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Initialize the store
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");

// String version instead of number should be treated as invalid
auto res = event_handler.HandleMessage(
"delete", R"({"path": "/flags/flagA", "version": "not a number"})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
}

TEST(DataSourceEventHandlerTests, HandlesPutWithInvalidFlagsType) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Flags should be an object, not a boolean
auto res = event_handler.HandleMessage(
"put", R"({"path": "/", "data": {"flags": true, "segments": {}}})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
}

TEST(DataSourceEventHandlerTests, HandlesPutWithInvalidSegmentsType) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Segments should be an object, not an array
auto res = event_handler.HandleMessage(
"put", R"({"path": "/", "data": {"flags": {}, "segments": []}})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
}

// Tests for additional malformed JSON variants

TEST(DataSourceEventHandlerTests, HandlesUnterminatedString) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Unterminated string should be treated as malformed JSON
auto res = event_handler.HandleMessage(
"patch", R"({"path": "/flags/x", "data": "unterminated)");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
}

TEST(DataSourceEventHandlerTests, HandlesTrailingComma) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Trailing comma should be treated as malformed JSON
auto res = event_handler.HandleMessage(
"patch", R"({"path": "/flags/x", "data": {},})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
}

// Tests for missing required fields

TEST(DataSourceEventHandlerTests, HandlesDeleteWithMissingPath) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Initialize the store
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");

// Missing path field should be treated as invalid
auto res = event_handler.HandleMessage(
"delete", R"({"version": 1})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
}

TEST(DataSourceEventHandlerTests, HandlesDeleteWithMissingVersion) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Initialize the store
event_handler.HandleMessage("put", R"({"path":"/", "data":{}})");

// Missing version field should be treated as invalid
auto res = event_handler.HandleMessage(
"delete", R"({"path": "/flags/flagA"})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kInvalidMessage, res);
}

TEST(DataSourceEventHandlerTests, HandlesPutWithMissingPath) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Missing/empty path is treated as unrecognized (safely ignored)
// This provides forward compatibility
auto res = event_handler.HandleMessage(
"put", R"({"data": {}})");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kMessageHandled, res);
}

TEST(DataSourceEventHandlerTests, HandlesEmptyJsonObject) {
auto logger = launchdarkly::logging::NullLogger();
auto store = std::make_shared<MemoryStore>();
DataSourceStatusManager manager;
DataSourceEventHandler event_handler(*store, logger, manager);

// Empty JSON object with missing path is treated as unrecognized (safely ignored)
// This provides forward compatibility with future event types
auto res = event_handler.HandleMessage("patch", "{}");

ASSERT_EQ(DataSourceEventHandler::MessageStatus::kMessageHandled, res);
}
9 changes: 9 additions & 0 deletions libs/server-sent-events/include/launchdarkly/sse/client.hpp
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just restart the stream when we encounter invalid JSON, then we could create a thundering-herd in the case the service somehow sent invalid JSON. So we defer the restart to the event source implementation. Which allows for it to incorporate back-off.

Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#pragma once

#include <launchdarkly/sse/error.hpp>
Expand Down Expand Up @@ -207,6 +207,15 @@
virtual ~Client() = default;
virtual void async_connect() = 0;
virtual void async_shutdown(std::function<void()> completion) = 0;

/**
* Restart the connection with exponential backoff. This should be called
* when the SDK detects invalid data from the stream and needs to
* reconnect. The backoff mechanism prevents rapid reconnection attempts
* that could overload the service.
* @param reason A description of why the restart was triggered (for logging)
*/
virtual void async_restart(std::string const& reason) = 0;
};

} // namespace launchdarkly::sse
23 changes: 23 additions & 0 deletions libs/server-sent-events/src/client.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/strand.hpp>
Expand Down Expand Up @@ -161,6 +161,29 @@
beast::bind_front_handler(&FoxyClient::do_run, shared_from_this()));
}

void async_restart(std::string const& reason) override {
boost::asio::post(
session_->get_executor(),
beast::bind_front_handler(&FoxyClient::do_restart,
shared_from_this(), reason));
}

void do_restart(std::string const& reason) {
// Cancel any ongoing read operations
try {
if (session_->stream.is_ssl()) {
session_->stream.ssl().next_layer().cancel();
} else {
session_->stream.plain().cancel();
}
} catch (boost::system::system_error const& err) {
logger_("exception canceling stream during restart: " +
std::string(err.what()));
}
// Trigger backoff and reconnect
async_backoff(reason);
}

void do_run() {
session_->async_connect(
host_, port_,
Expand Down
11 changes: 11 additions & 0 deletions libs/server-sent-events/src/curl_client.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#ifdef LD_CURL_NETWORKING

#include "curl_client.hpp"
Expand Down Expand Up @@ -76,6 +76,17 @@
[self = shared_from_this()]() { self->do_run(); });
}

void CurlClient::async_restart(std::string const& reason) {
boost::asio::post(backoff_timer_.get_executor(),
[self = shared_from_this(), reason]() {
// Close the socket to abort the current transfer.
// CURL will detect the error and call the completion
// handler, which will trigger backoff and reconnection.
self->log_message("async_restart: aborting transfer due to " + reason);
self->request_context_->abort_transfer();
});
}

void CurlClient::do_run() {
if (request_context_->is_shutting_down()) {
return;
Expand Down
Loading
Loading