Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/sse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,3 @@ jobs:
with:
repo: 'sse-contract-tests'
test_service_port: ${{ env.TEST_SERVICE_PORT }}
extra_params: '-skip HTTP'
7 changes: 7 additions & 0 deletions apps/sse-contract-tests/include/definitions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,10 @@ struct CommentMessage {
};

NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(CommentMessage, kind, comment);

struct ErrorMessage {
std::string kind;
std::string comment;
};

NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(ErrorMessage, kind, comment);
16 changes: 14 additions & 2 deletions apps/sse-contract-tests/include/event_outbox.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <memory>
#include <string>
#include <variant>

namespace beast = boost::beast;
namespace http = beast::http;
Expand Down Expand Up @@ -44,23 +45,34 @@ class EventOutbox : public std::enable_shared_from_this<EventOutbox> {
* @param callback_url Target URL.
*/
EventOutbox(net::any_io_executor executor, std::string callback_url);

/**
* Enqueues an event, which will be posted to the server
* Queues an event, which will be posted to the server
* later.
* @param event Event to post.
*/
void post_event(launchdarkly::sse::Event event);

/**
* Queues an error, which will be posted to the server later.
* @param error Error to post.
*/
void post_error(launchdarkly::sse::Error error);

/**
* Begins an async operation to connect to the server.
*/
void run();

/**
* Begins an async operation to disconnect from the server.
*/
void stop();

private:
RequestType build_request(std::size_t counter, launchdarkly::sse::Event ev);
RequestType build_request(
std::size_t counter,
std::variant<launchdarkly::sse::Event, launchdarkly::sse::Error> ev);
void on_resolve(beast::error_code ec, tcp::resolver::results_type results);
void on_connect(beast::error_code ec,
tcp::resolver::results_type::endpoint_type);
Expand Down
3 changes: 3 additions & 0 deletions apps/sse-contract-tests/src/entity_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ std::optional<std::string> EntityManager::create(ConfigParams const& params) {
copy->post_event(std::move(e));
});

client_builder.errors(
[copy = poster](launchdarkly::sse::Error e) { copy->post_error(e); });

auto client = client_builder.build();
if (!client) {
LD_LOG(logger_, LogLevel::kWarn)
Expand Down
41 changes: 35 additions & 6 deletions apps/sse-contract-tests/src/event_outbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ void EventOutbox::post_event(launchdarkly::sse::Event event) {
flush_timer_.expires_from_now(boost::posix_time::milliseconds(0));
}

void EventOutbox::post_error(launchdarkly::sse::Error error) {
auto http_request = build_request(callback_counter_++, error);
outbox_.push(http_request);
flush_timer_.expires_from_now(boost::posix_time::milliseconds(0));
}

void EventOutbox::run() {
resolver_.async_resolve(callback_host_, callback_port_,
beast::bind_front_handler(&EventOutbox::on_resolve,
Expand All @@ -57,7 +63,7 @@ void EventOutbox::stop() {

EventOutbox::RequestType EventOutbox::build_request(
std::size_t counter,
launchdarkly::sse::Event ev) {
std::variant<launchdarkly::sse::Event, launchdarkly::sse::Error> ev) {
RequestType req;

req.set(http::field::host, callback_host_);
Expand All @@ -66,11 +72,34 @@ EventOutbox::RequestType EventOutbox::build_request(

nlohmann::json json;

if (ev.type() == "comment") {
json = CommentMessage{"comment", std::move(ev).take()};
} else {
json = EventMessage{"event", Event{ev}};
}
std::visit(
[&](auto&& arg) {
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, launchdarkly::sse::Event>) {
if (arg.type() == "comment") {
json = CommentMessage{"comment", std::move(arg).take()};
} else {
json = EventMessage{"event", Event{std::move(arg)}};
}
} else if constexpr (std::is_same_v<T, launchdarkly::sse::Error>) {
using launchdarkly::sse::Error;
auto msg = ErrorMessage{"error"};
switch (arg) {
case Error::NoContent:
msg.comment = "no content";
break;
case Error::InvalidRedirectLocation:
msg.comment = "invalid redirect location";
break;
case Error::UnrecoverableClientError:
msg.comment = "unrecoverable client error";
default:
msg.comment = "unspecified error";
}
json = msg;
}
},
std::move(ev));

req.body() = json.dump();
req.prepare_payload();
Expand Down
12 changes: 11 additions & 1 deletion libs/server-sent-events/include/launchdarkly/sse/client.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <launchdarkly/sse/error.hpp>
#include <launchdarkly/sse/event.hpp>

#include <boost/asio/any_io_executor.hpp>
Expand Down Expand Up @@ -29,8 +30,9 @@ class Client;
*/
class Builder {
public:
using EventReceiver = std::function<void(launchdarkly::sse::Event)>;
using EventReceiver = std::function<void(Event)>;
using LogCallback = std::function<void(std::string)>;
using ErrorCallback = std::function<void(Error)>;

/**
* Create a builder for the given URL. If the port is omitted, 443 is
Expand Down Expand Up @@ -114,6 +116,13 @@ class Builder {
*/
Builder& logger(LogCallback callback);

/**
* Specify an error reporting callback for the Client.
* @param callback Callback to receive an error from the Client.
* @return Reference to this builder.
*/
Builder& errors(ErrorCallback callback);

/**
* Builds a Client. The shared pointer is necessary to extend the lifetime
* of the Client to encompass each asynchronous operation that it performs.
Expand All @@ -131,6 +140,7 @@ class Builder {
std::optional<std::chrono::milliseconds> connect_timeout_;
LogCallback logging_cb_;
EventReceiver receiver_;
ErrorCallback error_cb_;
};

/**
Expand Down
10 changes: 10 additions & 0 deletions libs/server-sent-events/include/launchdarkly/sse/error.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma once

namespace launchdarkly::sse {

enum class Error {
NoContent = 1,
InvalidRedirectLocation = 2,
UnrecoverableClientError = 3,
};
}
100 changes: 80 additions & 20 deletions libs/server-sent-events/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include <boost/beast/version.hpp>

#include <boost/url/parse.hpp>

#include <boost/url/url.hpp>
#include <chrono>
#include <iostream>
#include <memory>
Expand Down Expand Up @@ -58,6 +58,7 @@ class FoxyClient : public Client,
std::optional<std::chrono::milliseconds> write_timeout,
Builder::EventReceiver receiver,
Builder::LogCallback logger,
Builder::ErrorCallback errors,
std::optional<net::ssl::context> maybe_ssl)
: ssl_context_(std::move(maybe_ssl)),
host_(std::move(host)),
Expand All @@ -74,12 +75,13 @@ class FoxyClient : public Client,
last_event_id_(std::nullopt),
backoff_timer_(session_.get_executor()),
event_receiver_(std::move(receiver)),
logger_(std::move(logger)) {
logger_(std::move(logger)),
errors_(std::move(errors)) {
create_parser();
}

/** The body parser is recreated each time a connection is made because its
* internal state cannot be explicitly reset.
/** The body parser is recreated each time a connection is made because
* its internal state cannot be explicitly reset.
*
* Since SSE body will never end unless
* an error occurs, the body size limit must be removed.
Expand All @@ -100,6 +102,12 @@ class FoxyClient : public Client,
void do_backoff(std::string const& reason) {
backoff_.fail();

if (auto id = body_parser_->get().body().last_event_id()) {
if (!id->empty()) {
last_event_id_ = id;
}
}

std::stringstream msg;
msg << "backing off in ("
<< std::chrono::duration_cast<std::chrono::seconds>(
Expand All @@ -109,7 +117,6 @@ class FoxyClient : public Client,

logger_(msg.str());

last_event_id_ = body_parser_->get().body().last_event_id();
create_parser();
backoff_timer_.expires_from_now(backoff_.delay());
backoff_timer_.async_wait(beast::bind_front_handler(
Expand Down Expand Up @@ -138,7 +145,7 @@ class FoxyClient : public Client,
return do_backoff(ec.what());
}

if (last_event_id_ && !last_event_id_->empty()) {
if (last_event_id_) {
req_.set("last-event-id", *last_event_id_);
} else {
req_.erase("last-event-id");
Expand Down Expand Up @@ -186,6 +193,10 @@ class FoxyClient : public Client,
auto status_class = beast::http::to_status_class(response.result());

if (status_class == beast::http::status_class::successful) {
if (response.result() == beast::http::status::no_content) {
errors_(Error::NoContent);
return;
}
if (!correct_content_type(response)) {
return do_backoff("invalid Content-Type");
}
Expand All @@ -197,13 +208,30 @@ class FoxyClient : public Client,
shared_from_this()));
}

if (status_class == beast::http::status_class::redirection) {
if (can_redirect(response)) {
auto new_url =
redirect_url("base", response.find("location")->value());

if (!new_url) {
errors_(Error::InvalidRedirectLocation);
return;
}

req_.set(http::field::host, new_url->host());
req_.target(new_url->encoded_target());
} else {
errors_(Error::InvalidRedirectLocation);
return;
}
}

if (status_class == beast::http::status_class::client_error) {
if (recoverable_client_error(response.result())) {
return do_backoff(backoff_reason(response.result()));
}

// TODO: error callback

errors_(Error::UnrecoverableClientError);
return;
}

Expand Down Expand Up @@ -244,11 +272,6 @@ class FoxyClient : public Client,
}
}

void fail(boost::system::error_code ec, std::string const& what) {
logger_("sse-client: " + what + ": " + ec.message());
async_shutdown(nullptr);
}

static bool recoverable_client_error(beast::http::status status) {
return (status == beast::http::status::bad_request ||
status == beast::http::status::request_timeout ||
Expand All @@ -264,21 +287,52 @@ class FoxyClient : public Client,
return false;
}

static bool can_redirect(FoxyClient::response const& response) {
return (response.result() == beast::http::status::moved_permanently ||
response.result() == beast::http::status::temporary_redirect) &&
response.find("location") != response.end();
}

static std::optional<boost::urls::url> redirect_url(
std::string orig_base,
std::string orig_location) {
auto location = boost::urls::parse_uri(orig_location);
if (!location) {
return std::nullopt;
}
if (location->has_scheme()) {
return location.value();
}

boost::urls::url base(orig_base);
auto result = base.resolve(*location);
if (!result) {
return std::nullopt;
}

return base;
}
Copy link
Contributor Author

@cwaldren-ld cwaldren-ld May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the AppendUrl function in common, but I'm not sure I need it here since these redirects are probably not human input.

I think it suffices to do:

  • If it has a scheme, it's absolute, so that's the new location
  • Otherwise, it's relative so try and resolve it against the base URL.

https://www.rfc-editor.org/rfc/rfc9110#field.location

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it probably will not be a problem, but a relative URL is relative to the target URI, not the base URL. It is not uncommon for the redirect to have "../" and require dot resolution.

https://www.rfc-editor.org/rfc/rfc3986#section-5.2

"agent MUST process the redirection as if the value inherits the fragment component of the URI reference used to generate the target URI "

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not as easy to real-world test as by far the most common redirect is http to https.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(If you have a bunch of instances, and you change an endpoint name, and the endpoints are already segregated by a fragment, then you just use a relative fragment. Not just user input case. I would be comfortable for client SDKs I think. But for server SDKs I would want to solve this.)


private:
std::optional<net::ssl::context> ssl_context_;
std::string host_;
std::string port_;

std::optional<std::chrono::milliseconds> connect_timeout_;
std::optional<std::chrono::milliseconds> read_timeout_;
std::optional<std::chrono::milliseconds> write_timeout_;

http::request<http::string_body> req_;

Builder::EventReceiver event_receiver_;
std::optional<http::response_parser<body> > body_parser_;
Builder::LogCallback logger_;
Builder::ErrorCallback errors_;

std::optional<http::response_parser<body>> body_parser_;
launchdarkly::foxy::client_session session_;
std::optional<std::string> last_event_id_;
Backoff backoff_;
boost::asio::steady_timer backoff_timer_;
Builder::LogCallback logger_;
};

Builder::Builder(net::any_io_executor ctx, std::string url)
Expand All @@ -287,9 +341,9 @@ Builder::Builder(net::any_io_executor ctx, std::string url)
read_timeout_{std::nullopt},
write_timeout_{std::nullopt},
connect_timeout_{std::nullopt},
logging_cb_([](auto msg) {}) {
receiver_ = [](launchdarkly::sse::Event const&) {};

logging_cb_([](auto msg) {}),
receiver_([](launchdarkly::sse::Event const&) {}),
error_cb_([](auto err) {}) {
request_.version(11);
request_.set(http::field::user_agent, kDefaultUserAgent);
request_.method(http::verb::get);
Expand Down Expand Up @@ -332,11 +386,16 @@ Builder& Builder::receiver(EventReceiver receiver) {
return *this;
}

Builder& Builder::logger(std::function<void(std::string)> callback) {
Builder& Builder::logger(LogCallback callback) {
logging_cb_ = std::move(callback);
return *this;
}

Builder& Builder::errors(ErrorCallback callback) {
error_cb_ = std::move(callback);
return *this;
}

std::shared_ptr<Client> Builder::build() {
auto uri_components = boost::urls::parse_uri(url_);
if (!uri_components) {
Expand Down Expand Up @@ -376,7 +435,8 @@ std::shared_ptr<Client> Builder::build() {

return std::make_shared<FoxyClient>(
net::make_strand(executor_), request, host, service, connect_timeout_,
read_timeout_, write_timeout_, receiver_, logging_cb_, std::move(ssl));
read_timeout_, write_timeout_, receiver_, logging_cb_, error_cb_,
std::move(ssl));
}

} // namespace launchdarkly::sse