diff --git a/CMakeLists.txt b/CMakeLists.txt index dad650f44..9cd988916 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,11 +11,20 @@ project( LANGUAGES CXX C ) +if (${CMAKE_VERSION} VERSION_GREATER_EQUAL "3.24") + # Affects robustness of timestamp checking on FetchContent dependencies. + cmake_policy(SET CMP0135 NEW) +endif () + +# All projects in this repo should share the same version of 3rd party depends. +# It's the only way to remain sane. +set(CMAKE_FILES "${CMAKE_CURRENT_SOURCE_DIR}/cmake") set(CMAKE_CXX_STANDARD 17) + option(BUILD_TESTING "Enable C++ unit tests." ON) -if(BUILD_TESTING) +if (BUILD_TESTING) include(FetchContent) FetchContent_Declare( googletest @@ -26,7 +35,7 @@ if(BUILD_TESTING) FetchContent_MakeAvailable(googletest) enable_testing() -endif() +endif () add_subdirectory(libs/common) diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt index aef466b75..9bf0d3db2 100644 --- a/apps/CMakeLists.txt +++ b/apps/CMakeLists.txt @@ -1 +1,3 @@ +#add_subdirectory(hello-c) +add_subdirectory(sse-contract-tests) add_subdirectory(hello-cpp) diff --git a/apps/hello-cpp/main.cpp b/apps/hello-cpp/main.cpp index 5b6e285a9..d0f72e70c 100644 --- a/apps/hello-cpp/main.cpp +++ b/apps/hello-cpp/main.cpp @@ -1,10 +1,14 @@ -#include #include -#include -#include +#include + +#include + #include "console_backend.hpp" #include "logger.hpp" +#include +#include + namespace net = boost::asio; // from using launchdarkly::ConsoleBackend; @@ -22,12 +26,23 @@ int main() { net::io_context ioc; - // curl "https://stream-stg.launchdarkly.com/all?filter=even-flags-2" -H - // "Authorization: sdk-66a5dbe0-8b26-445a-9313-761e7e3d381b" -v + char const* key = std::getenv("STG_SDK_KEY"); + if (!key) { + std::cout << "Set environment variable STG_SDK_KEY to the sdk key\n"; + return 1; + } auto client = - launchdarkly::sse::builder(ioc, + launchdarkly::sse::Builder(ioc.get_executor(), "https://stream-stg.launchdarkly.com/all") - .header("Authorization", "sdk-66a5dbe0-8b26-445a-9313-761e7e3d381b") + .header("Authorization", key) + .receiver([&](launchdarkly::sse::Event ev) { + LD_LOG(logger, LogLevel::kInfo) << "event: " << ev.type(); + LD_LOG(logger, LogLevel::kInfo) + << "data: " << std::move(ev).take(); + }) + .logger([&](std::string msg) { + LD_LOG(logger, LogLevel::kDebug) << std::move(msg); + }) .build(); if (!client) { @@ -35,7 +50,6 @@ int main() { return 1; } - std::thread t([&]() { ioc.run(); }); - client->run(); + ioc.run(); } diff --git a/apps/sse-contract-tests/CMakeLists.txt b/apps/sse-contract-tests/CMakeLists.txt new file mode 100644 index 000000000..68b97dcc2 --- /dev/null +++ b/apps/sse-contract-tests/CMakeLists.txt @@ -0,0 +1,29 @@ +# Required for Apple Silicon support. +cmake_minimum_required(VERSION 3.19) + +project( + LaunchDarklyCPPSSETestHarness + VERSION 0.1 + DESCRIPTION "LaunchDarkly CPP SSE Test Harness" + LANGUAGES CXX +) + +include(${CMAKE_FILES}/json.cmake) + +add_executable(sse-tests + src/main.cpp + src/server.cpp + src/entity_manager.cpp + src/session.cpp + src/event_outbox.cpp + ) + +target_link_libraries(sse-tests PRIVATE + launchdarkly::sse + launchdarkly::common + nlohmann_json::nlohmann_json + ) + +target_include_directories(sse-tests PUBLIC include) + +#add_definitions(-DBOOST_ASIO_ENABLE_HANDLER_TRACKING) diff --git a/apps/sse-contract-tests/README.md b/apps/sse-contract-tests/README.md new file mode 100644 index 000000000..7533c42a8 --- /dev/null +++ b/apps/sse-contract-tests/README.md @@ -0,0 +1,39 @@ +## SSE contract tests + +Contract tests have a "test service" on one side, and the "test harness" on +the other. + +This project implements the test service for the C++ EventSource client. + +**session (session.hpp)** + +This provides a simple REST API for creating/destroying +test entities. Examples: + +`GET /` - returns the capabilities of this service. + +`DELETE /` - shutdown the service. + +`POST /` - create a new test entity, and return its ID. + +`DELETE /entity/1` - delete the an entity identified by `1`. + +**entity manager (entity_manager.hpp)** + +This manages "entities" - the combination of an SSE client, and an outbox that posts events _received_ from the stream +_back to_ the test harness. + +The point is to allow the test harness to assert that events were parsed and dispatched as expected. + +**event outbox (event_outbox.hpp)** + +The 2nd half of an "entity". It receives events from the SSE client, pushes them into a queue, +and then periodically flushes the queue out to the test harness. + +**definitions (definitions.hpp)** + +Contains JSON definitions that are used to communicate with the test harness. + +**server (server.hpp)** + +Glues everything together, mainly providing the TCP acceptor that spawns new sessions. diff --git a/apps/sse-contract-tests/include/definitions.hpp b/apps/sse-contract-tests/include/definitions.hpp new file mode 100644 index 000000000..34539a279 --- /dev/null +++ b/apps/sse-contract-tests/include/definitions.hpp @@ -0,0 +1,86 @@ +#pragma once + +#include +#include +#include +#include +#include "nlohmann/json.hpp" + +namespace nlohmann { + +template +struct adl_serializer> { + static void to_json(json& j, std::optional const& opt) { + if (opt == std::nullopt) { + j = nullptr; + } else { + j = *opt; // this will call adl_serializer::to_json which will + // find the free function to_json in T's namespace! + } + } + + static void from_json(json const& j, std::optional& opt) { + if (j.is_null()) { + opt = std::nullopt; + } else { + opt = j.get(); // same as above, but with + // adl_serializer::from_json + } + } +}; +} // namespace nlohmann + +// Represents the initial JSON configuration sent by the test harness. +struct ConfigParams { + std::string streamUrl; + std::string callbackUrl; + std::string tag; + std::optional initialDelayMs; + std::optional readTimeoutMs; + std::optional lastEventId; + std::optional> headers; + std::optional method; + std::optional body; +}; + +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(ConfigParams, + streamUrl, + callbackUrl, + tag, + initialDelayMs, + readTimeoutMs, + lastEventId, + headers, + method, + body); + +// Represents an event payload that this service posts back +// to the test harness. The events are originally received by this server +// via the SSE stream; they are posted back so the test harness can verify +// that we parsed and dispatched them successfully. +struct Event { + std::string type; + std::string id; + std::string data; + Event() = default; + explicit Event(launchdarkly::sse::Event event) + : type(event.type()), + id(event.id().value_or("")), + data(std::move(event).take()) {} +}; + +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(Event, type, data, id); + +struct EventMessage { + std::string kind; + Event event; +}; + +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(EventMessage, kind, event); + +struct CommentMessage { + std::string kind; + std::string comment; +}; + +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(CommentMessage, kind, comment); diff --git a/apps/sse-contract-tests/include/entity_manager.hpp b/apps/sse-contract-tests/include/entity_manager.hpp new file mode 100644 index 000000000..a4d3f1db2 --- /dev/null +++ b/apps/sse-contract-tests/include/entity_manager.hpp @@ -0,0 +1,52 @@ +#pragma once + +#include "definitions.hpp" +#include "logger.hpp" + +#include + +#include + +#include +#include +#include +#include +#include + +class EventOutbox; + +class EntityManager { + using Inbox = std::shared_ptr; + using Outbox = std::shared_ptr; + using Entity = std::pair; + + std::unordered_map entities_; + + std::size_t counter_; + boost::asio::any_io_executor executor_; + + launchdarkly::Logger& logger_; + + public: + /** + * Create an entity manager, which can be used to create and destroy + * entities (SSE clients + event channel back to test harness). + * @param executor Executor. + * @param logger Logger. + */ + EntityManager(boost::asio::any_io_executor executor, + launchdarkly::Logger& logger); + /** + * Create an entity with the given configuration. + * @param params Config of the entity. + * @return An ID representing the entity, or none if the entity couldn't + * be created. + */ + std::optional create(ConfigParams params); + /** + * Destroy an entity with the given ID. + * @param id ID of the entity. + * @return True if the entity was found and destroyed. + */ + bool destroy(std::string const& id); +}; diff --git a/apps/sse-contract-tests/include/event_outbox.hpp b/apps/sse-contract-tests/include/event_outbox.hpp new file mode 100644 index 000000000..a8598bc44 --- /dev/null +++ b/apps/sse-contract-tests/include/event_outbox.hpp @@ -0,0 +1,70 @@ +#pragma once + +#include "entity_manager.hpp" + +#include + +#include +#include +#include +#include +#include + +#include +#include + +namespace beast = boost::beast; +namespace http = beast::http; +namespace net = boost::asio; +using tcp = boost::asio::ip::tcp; + +class EventOutbox : public std::enable_shared_from_this { + using RequestType = http::request; + + std::string callback_url_; + std::string callback_port_; + std::string callback_host_; + size_t callback_counter_; + + net::any_io_executor executor_; + tcp::resolver resolver_; + beast::tcp_stream event_stream_; + + boost::lockfree::spsc_queue outbox_; + + net::deadline_timer flush_timer_; + std::string id_; + + bool shutdown_; + + public: + /** + * Instantiate an outbox; events will be posted to the given URL. + * @param executor Executor. + * @param callback_url Target URL. + */ + EventOutbox(net::any_io_executor executor, std::string callback_url); + /** + * Enqueues an event, which will be posted to the server + * later. + * @param event Event to post. + */ + void post_event(launchdarkly::sse::Event event); + /** + * Begins an async operation to connect to the server. + */ + void run(); + /** + * Begins an async operation to disconnect from the server. + */ + void stop(); + + private: + RequestType build_request(std::size_t counter, launchdarkly::sse::Event ev); + void on_resolve(beast::error_code ec, tcp::resolver::results_type results); + void on_connect(beast::error_code ec, + tcp::resolver::results_type::endpoint_type); + void on_flush_timer(boost::system::error_code ec); + void on_write(beast::error_code ec, std::size_t); + void do_shutdown(beast::error_code ec, std::string what); +}; diff --git a/apps/sse-contract-tests/include/server.hpp b/apps/sse-contract-tests/include/server.hpp new file mode 100644 index 000000000..dbae43a5c --- /dev/null +++ b/apps/sse-contract-tests/include/server.hpp @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include +#include +#include "entity_manager.hpp" +#include "logger.hpp" + +namespace net = boost::asio; // from + +#include + +using tcp = boost::asio::ip::tcp; // from + +class server : public std::enable_shared_from_this { + net::io_context& ioc_; + tcp::acceptor acceptor_; + EntityManager entity_manager_; + std::vector caps_; + launchdarkly::Logger& logger_; + + public: + /** + * Constructs a server, which stands up a REST API at the given + * port and address. + * @param ioc IO context. + * @param address Address to bind. + * @param port Port to bind. + * @param logger Logger. + */ + server(net::io_context& ioc, + std::string const& address, + std::string const& port, + launchdarkly::Logger& logger); + /** + * Advertise an optional test-harness capability, such as "comments". + * @param cap + */ + void add_capability(std::string cap); + /** + * Begins an async operation to start accepting requests. + */ + void run(); + + private: + void do_accept(); + void on_accept(boost::system::error_code const& ec, tcp::socket socket); + void fail(boost::beast::error_code ec, char const* what); +}; diff --git a/apps/sse-contract-tests/include/session.hpp b/apps/sse-contract-tests/include/session.hpp new file mode 100644 index 000000000..d0140a85e --- /dev/null +++ b/apps/sse-contract-tests/include/session.hpp @@ -0,0 +1,69 @@ +#pragma once + +#include "entity_manager.hpp" +#include "logger.hpp" + +#include +#include +#include + +namespace beast = boost::beast; // from +namespace http = beast::http; // from +namespace net = boost::asio; // from +using tcp = boost::asio::ip::tcp; // from + +class Session : public std::enable_shared_from_this { + beast::tcp_stream stream_; + beast::flat_buffer buffer_{8192}; + http::request request_; + EntityManager& manager_; + std::vector capabilities_; + std::function on_shutdown_cb_; + bool shutdown_requested_; + launchdarkly::Logger& logger_; + + public: + /** + * Constructs a session, which provides a REST API. + * @param socket Connected socket. + * @param manager Manager through which entities can be created/destroyed. + * @param caps Test service capabilities to advertise. + * @param logger Logger. + */ + explicit Session(tcp::socket&& socket, + EntityManager& manager, + std::vector caps, + launchdarkly::Logger& logger); + + ~Session(); + + /** + * Set a callback to be invoked when a REST client requests shutdown. + */ + template + void on_shutdown(Callback cb) { + on_shutdown_cb_ = cb; + } + /** + * Begin waiting for requests. + */ + void start(); + /** + * Stop waiting for requests and close the session. + */ + void stop(); + + private: + http::message_generator handle_request( + http::request&& req); + void do_read(); + + void do_stop(char const* reason); + void on_read(beast::error_code ec, std::size_t bytes_transferred); + + void send_response(http::message_generator&& msg); + + void on_write(bool keep_alive, + beast::error_code ec, + std::size_t bytes_transferred); +}; diff --git a/apps/sse-contract-tests/src/entity_manager.cpp b/apps/sse-contract-tests/src/entity_manager.cpp new file mode 100644 index 000000000..574b23b0d --- /dev/null +++ b/apps/sse-contract-tests/src/entity_manager.cpp @@ -0,0 +1,73 @@ +#include "entity_manager.hpp" +#include "event_outbox.hpp" + +using launchdarkly::LogLevel; + +EntityManager::EntityManager(boost::asio::any_io_executor executor, + launchdarkly::Logger& logger) + : entities_(), + counter_{0}, + executor_{std::move(executor)}, + logger_{logger} {} + +std::optional EntityManager::create(ConfigParams params) { + std::string id = std::to_string(counter_++); + + auto poster = std::make_shared(executor_, params.callbackUrl); + poster->run(); + + auto client_builder = + launchdarkly::sse::Builder(executor_, params.streamUrl); + + if (params.headers) { + for (auto const& h : *params.headers) { + client_builder.header(h.first, h.second); + } + } + + if (params.method) { + client_builder.method(http::string_to_verb(*params.method)); + } + + if (params.body) { + client_builder.body(std::move(*params.body)); + } + + if (params.readTimeoutMs) { + client_builder.read_timeout( + std::chrono::milliseconds(*params.readTimeoutMs)); + } + + client_builder.logger([this](std::string msg) { + LD_LOG(logger_, LogLevel::kDebug) << std::move(msg); + }); + + client_builder.receiver([copy = poster](launchdarkly::sse::Event e) { + copy->post_event(std::move(e)); + }); + + auto client = client_builder.build(); + if (!client) { + LD_LOG(logger_, LogLevel::kWarn) + << "entity_manager: couldn't build sse client"; + return std::nullopt; + } + + client->run(); + + entities_.emplace(id, std::make_pair(client, poster)); + return id; +} + +bool EntityManager::destroy(std::string const& id) { + auto it = entities_.find(id); + if (it == entities_.end()) { + return false; + } + + it->second.second->stop(); + + entities_.erase(it); + + return true; +} diff --git a/apps/sse-contract-tests/src/event_outbox.cpp b/apps/sse-contract-tests/src/event_outbox.cpp new file mode 100644 index 000000000..aac1901c6 --- /dev/null +++ b/apps/sse-contract-tests/src/event_outbox.cpp @@ -0,0 +1,132 @@ +#include "event_outbox.hpp" +#include "definitions.hpp" + +#include +#include +#include + +// Check the outbox at this interval. Normally a flush is triggered for +// every event; this is just a failsafe in case a flush is happening +// concurrently. +auto const kFlushInterval = boost::posix_time::milliseconds{500}; +auto const kOutboxCapacity = 1023; + +EventOutbox::EventOutbox(net::any_io_executor executor, + std::string callback_url) + : callback_url_{std::move(callback_url)}, + callback_port_{}, + callback_host_{}, + callback_counter_{0}, + executor_{executor}, + resolver_{executor}, + event_stream_{executor}, + outbox_{kOutboxCapacity}, + flush_timer_{executor}, + shutdown_(false) { + auto uri_components = boost::urls::parse_uri(callback_url_); + + callback_host_ = uri_components->host(); + callback_port_ = uri_components->port(); +} + +void EventOutbox::do_shutdown(beast::error_code ec, std::string what) { + event_stream_.socket().shutdown(tcp::socket::shutdown_both, ec); + flush_timer_.cancel(); +} + +void EventOutbox::post_event(launchdarkly::sse::Event event) { + auto http_request = build_request(callback_counter_++, std::move(event)); + outbox_.push(http_request); + flush_timer_.expires_from_now(boost::posix_time::milliseconds(0)); +} + +void EventOutbox::run() { + resolver_.async_resolve(callback_host_, callback_port_, + beast::bind_front_handler(&EventOutbox::on_resolve, + shared_from_this())); +} + +void EventOutbox::stop() { + beast::error_code ec = net::error::basic_errors::operation_aborted; + std::string reason = "stop"; + shutdown_ = true; + net::post(executor_, + beast::bind_front_handler(&EventOutbox::do_shutdown, + shared_from_this(), ec, reason)); +} + +EventOutbox::RequestType EventOutbox::build_request( + std::size_t counter, + launchdarkly::sse::Event ev) { + RequestType req; + + req.set(http::field::host, callback_host_); + req.method(http::verb::get); + req.target(callback_url_ + "/" + std::to_string(counter)); + + nlohmann::json json; + + if (ev.type() == "comment") { + json = CommentMessage{"comment", std::move(ev).take()}; + } else { + json = EventMessage{"event", Event{ev}}; + } + + req.body() = json.dump(); + req.prepare_payload(); + return req; +} + +void EventOutbox::on_resolve(beast::error_code ec, + tcp::resolver::results_type results) { + if (ec) { + return do_shutdown(ec, "resolve"); + } + + beast::get_lowest_layer(event_stream_) + .async_connect(results, + beast::bind_front_handler(&EventOutbox::on_connect, + shared_from_this())); +} + +void EventOutbox::on_connect(beast::error_code ec, + tcp::resolver::results_type::endpoint_type) { + if (ec) { + return do_shutdown(ec, "connect"); + } + + boost::system::error_code dummy; + net::post(executor_, beast::bind_front_handler(&EventOutbox::on_flush_timer, + shared_from_this(), dummy)); +} + +void EventOutbox::on_flush_timer(boost::system::error_code ec) { + if (ec && shutdown_) { + return do_shutdown(ec, "flush"); + } + + if (!outbox_.empty()) { + RequestType& request = outbox_.front(); + + // Flip-flop between this function and on_write; peeking an event + // and then popping it. + + http::async_write(event_stream_, request, + beast::bind_front_handler(&EventOutbox::on_write, + shared_from_this())); + return; + } + + // If the outbox is empty, wait a bit before trying again. + flush_timer_.expires_from_now(kFlushInterval); + flush_timer_.async_wait(beast::bind_front_handler( + &EventOutbox::on_flush_timer, shared_from_this())); +} + +void EventOutbox::on_write(beast::error_code ec, std::size_t) { + if (ec) { + return do_shutdown(ec, "write"); + } + outbox_.pop(); + on_flush_timer(boost::system::error_code{}); +} diff --git a/apps/sse-contract-tests/src/main.cpp b/apps/sse-contract-tests/src/main.cpp new file mode 100644 index 000000000..5b61f2f97 --- /dev/null +++ b/apps/sse-contract-tests/src/main.cpp @@ -0,0 +1,46 @@ +#include "server.hpp" + +#include "console_backend.hpp" + +#include +#include +#include + +#include + +namespace net = boost::asio; +namespace beast = boost::beast; + +using launchdarkly::ConsoleBackend; + +using launchdarkly::LogLevel; + +int main(int argc, char* argv[]) { + launchdarkly::Logger logger{ + std::make_unique("sse-contract-tests")}; + + try { + net::io_context ioc{1}; + + auto s = std::make_shared(ioc, "0.0.0.0", "8111", logger); + s->add_capability("headers"); + s->add_capability("comments"); + s->add_capability("report"); + s->add_capability("post"); + s->add_capability("read-timeout"); + s->run(); + + net::signal_set signals{ioc, SIGINT, SIGTERM}; + signals.async_wait([&](beast::error_code const&, int) { + LD_LOG(logger, LogLevel::kInfo) << "shutting down.."; + ioc.stop(); + }); + + ioc.run(); + LD_LOG(logger, LogLevel::kInfo) << "bye!"; + + } catch (std::exception const& e) { + LD_LOG(logger, LogLevel::kError) << e.what(); + return EXIT_FAILURE; + } +} diff --git a/apps/sse-contract-tests/src/server.cpp b/apps/sse-contract-tests/src/server.cpp new file mode 100644 index 000000000..52c58caed --- /dev/null +++ b/apps/sse-contract-tests/src/server.cpp @@ -0,0 +1,100 @@ +#include "server.hpp" +#include "session.hpp" + +#include +#include +#include +#include +#include + +using launchdarkly::LogLevel; + +server::server(net::io_context& ioc, + std::string const& address, + std::string const& port, + launchdarkly::Logger& logger) + : ioc_{ioc}, + acceptor_{ioc}, + entity_manager_{ioc.get_executor(), logger}, + caps_{}, + logger_{logger} { + beast::error_code ec; + + tcp::resolver resolver{ioc_}; + tcp::endpoint endpoint = *resolver.resolve(address, port, ec).begin(); + if (ec) { + fail(ec, "resolve"); + return; + } + acceptor_.open(endpoint.protocol(), ec); + if (ec) { + fail(ec, "open"); + return; + } + acceptor_.set_option(tcp::acceptor::reuse_address(true), ec); + if (ec) { + fail(ec, "set_option"); + return; + } + acceptor_.bind(endpoint, ec); + if (ec) { + fail(ec, "bind"); + return; + } + acceptor_.listen(net::socket_base::max_listen_connections, ec); + if (ec) { + fail(ec, "listen"); + return; + } + + LD_LOG(logger_, LogLevel::kInfo) + << "server: listening on " << address << ":" << port; +} + +void server::fail(beast::error_code ec, char const* what) { + LD_LOG(logger_, LogLevel::kError) + << "server: " << what << ": " << ec.message(); +} + +void server::add_capability(std::string cap) { + LD_LOG(logger_, LogLevel::kDebug) + << "server: test capability: <" << cap << ">"; + caps_.push_back(std::move(cap)); +} + +void server::run() { + LD_LOG(logger_, LogLevel::kDebug) << "server: run requested"; + net::dispatch( + acceptor_.get_executor(), + beast::bind_front_handler(&server::do_accept, shared_from_this())); +} + +void server::do_accept() { + LD_LOG(logger_, LogLevel::kDebug) << "server: waiting for connection"; + acceptor_.async_accept( + net::make_strand(ioc_), + beast::bind_front_handler(&server::on_accept, shared_from_this())); +} + +void server::on_accept(boost::system::error_code const& ec, + tcp::socket socket) { + if (!acceptor_.is_open()) { + return; + } + if (ec) { + fail(ec, "accept"); + return; + } + + auto session = std::make_shared(std::move(socket), entity_manager_, + caps_, logger_); + + session->on_shutdown([this]() { + LD_LOG(logger_, LogLevel::kDebug) << "server: terminating"; + ioc_.stop(); + }); + + session->start(); + + do_accept(); +} diff --git a/apps/sse-contract-tests/src/session.cpp b/apps/sse-contract-tests/src/session.cpp new file mode 100644 index 000000000..7237095e8 --- /dev/null +++ b/apps/sse-contract-tests/src/session.cpp @@ -0,0 +1,207 @@ +#include "session.hpp" +#include +#include +#include + +const std::string kEntityPath = "/entity/"; + +namespace net = boost::asio; + +using launchdarkly::LogLevel; + +Session::Session(tcp::socket&& socket, + EntityManager& manager, + std::vector caps, + launchdarkly::Logger& logger) + : stream_{std::move(socket)}, + manager_{manager}, + capabilities_{std::move(caps)}, + on_shutdown_cb_{}, + shutdown_requested_{false}, + logger_{logger} { + LD_LOG(logger_, LogLevel::kDebug) << "session: created"; +} + +Session::~Session() { + LD_LOG(logger_, LogLevel::kDebug) << "session: destroyed"; +} + +void Session::start() { + LD_LOG(logger_, LogLevel::kDebug) << "session: start"; + net::dispatch( + stream_.get_executor(), + beast::bind_front_handler(&Session::do_read, shared_from_this())); +} + +void Session::stop() { + LD_LOG(logger_, LogLevel::kDebug) << "session: stop"; + net::dispatch(stream_.get_executor(), + beast::bind_front_handler( + &Session::do_stop, shared_from_this(), "stop requested")); +} + +void Session::do_stop(char const* reason) { + LD_LOG(logger_, LogLevel::kDebug) + << "session: closing socket (" << reason << ")"; + stream_.close(); +} + +void Session::do_read() { + request_ = {}; + + LD_LOG(logger_, LogLevel::kDebug) << "session: awaiting request"; + http::async_read( + stream_, buffer_, request_, + beast::bind_front_handler(&Session::on_read, shared_from_this())); +} + +void Session::on_read(beast::error_code ec, std::size_t bytes_transferred) { + boost::ignore_unused(bytes_transferred); + + if (ec == http::error::end_of_stream) { + return do_stop("end of stream"); + } + + if (ec) { + return do_stop("read failed"); + } + + send_response(handle_request(std::move(request_))); +} + +void Session::send_response(http::message_generator&& msg) { + beast::async_write( + stream_, std::move(msg), + beast::bind_front_handler(&Session::on_write, shared_from_this(), + request_.keep_alive())); +} + +void Session::on_write(bool keep_alive, + beast::error_code ec, + std::size_t bytes_transferred) { + boost::ignore_unused(bytes_transferred); + + if (shutdown_requested_ && on_shutdown_cb_) { + LD_LOG(logger_, LogLevel::kDebug) + << "session: client requested server termination"; + on_shutdown_cb_(); + } + + if (ec) { + return do_stop("write failed"); + } + + if (!keep_alive) { + return do_stop("client dropped connection"); + } + + do_read(); +} + +http::message_generator Session::handle_request( + http::request&& req) { + auto const bad_request = [&req](beast::string_view why) { + http::response res{http::status::bad_request, + req.version()}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, "application/json"); + res.keep_alive(req.keep_alive()); + res.body() = nlohmann::json{"error", why}.dump(); + res.prepare_payload(); + return res; + }; + + auto const not_found = [&req](beast::string_view target) { + http::response res{http::status::not_found, + req.version()}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, "text/html"); + res.keep_alive(req.keep_alive()); + res.body() = + "The resource '" + std::string(target) + "' was not found."; + res.prepare_payload(); + return res; + }; + + auto const server_error = [&req](beast::string_view what) { + http::response res{ + http::status::internal_server_error, req.version()}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, "text/html"); + res.keep_alive(req.keep_alive()); + res.body() = "An error occurred: '" + std::string(what) + "'"; + res.prepare_payload(); + return res; + }; + + auto const capabilities_response = [&req](std::vector const& + caps) { + http::response res{http::status::ok, req.version()}; + res.set(http::field::content_type, "application/json"); + res.keep_alive(req.keep_alive()); + res.body() = nlohmann::json{{"capabilities", caps}}.dump(); + res.prepare_payload(); + return res; + }; + + auto const create_entity_response = [&req](std::string const& id) { + http::response res{http::status::ok, req.version()}; + res.keep_alive(req.keep_alive()); + res.set("Location", kEntityPath + id); + res.prepare_payload(); + return res; + }; + + auto const destroy_entity_response = [&req](bool erased) { + auto status = erased ? http::status::ok : http::status::not_found; + http::response res{status, req.version()}; + res.keep_alive(req.keep_alive()); + res.prepare_payload(); + return res; + }; + + auto const shutdown_server_response = [&req]() { + http::response res{http::status::ok, req.version()}; + res.keep_alive(false); + res.prepare_payload(); + return res; + }; + + if (req.method() == http::verb::get && req.target() == "/") { + return capabilities_response(capabilities_); + } + + if (req.method() == http::verb::head && req.target() == "/") { + return http::response{http::status::ok, + req.version()}; + } + + if (req.method() == http::verb::delete_ && req.target() == "/") { + shutdown_requested_ = true; + return shutdown_server_response(); + } + + if (req.method() == http::verb::post && req.target() == "/") { + try { + auto json = nlohmann::json::parse(request_.body()); + auto params = json.get(); + if (auto id = manager_.create(std::move(params))) { + return create_entity_response(*id); + } else { + return server_error("couldn't create client entity"); + } + } catch (nlohmann::json::exception& e) { + return bad_request("unable to parse config JSON"); + } + } + + if (req.method() == http::verb::delete_ && + req.target().starts_with(kEntityPath)) { + std::string id = req.target(); + boost::erase_first(id, kEntityPath); + bool erased = manager_.destroy(id); + return destroy_entity_response(erased); + } + + return not_found(req.target()); +} diff --git a/cmake/certify.cmake b/cmake/certify.cmake new file mode 100644 index 000000000..e6a87e47a --- /dev/null +++ b/cmake/certify.cmake @@ -0,0 +1,10 @@ +cmake_minimum_required(VERSION 3.11) + +include(FetchContent) + +FetchContent_Declare(certify + GIT_REPOSITORY https://github.com/djarek/certify.git + GIT_TAG 97f5eebfd99a5d6e99d07e4820240994e4e59787 +) + +FetchContent_MakeAvailable(certify) diff --git a/cmake/json.cmake b/cmake/json.cmake new file mode 100644 index 000000000..ee1c9d42d --- /dev/null +++ b/cmake/json.cmake @@ -0,0 +1,11 @@ +cmake_minimum_required(VERSION 3.11) + +include(FetchContent) + +set(JSON_ImplicitConversions OFF) + +FetchContent_Declare(json + URL https://github.com/nlohmann/json/releases/download/v3.11.2/json.tar.xz +) + +FetchContent_MakeAvailable(json) diff --git a/libs/client-sdk/CMakeLists.txt b/libs/client-sdk/CMakeLists.txt index 09f85c66b..6233cf41f 100644 --- a/libs/client-sdk/CMakeLists.txt +++ b/libs/client-sdk/CMakeLists.txt @@ -30,6 +30,6 @@ include(FetchContent) # Add main SDK sources. add_subdirectory(src) -if(BUILD_TESTING) +if (BUILD_TESTING) add_subdirectory(tests) -endif() +endif () diff --git a/libs/client-sdk/src/api.cpp b/libs/client-sdk/src/api.cpp index df0d26d09..48c153fda 100644 --- a/libs/client-sdk/src/api.cpp +++ b/libs/client-sdk/src/api.cpp @@ -4,7 +4,10 @@ #include namespace launchdarkly { + +auto const kAnswerToLifeTheUniverseAndEverything = 42; + std::optional foo() { - return 42; + return kAnswerToLifeTheUniverseAndEverything; } } // namespace launchdarkly diff --git a/libs/client-sdk/tests/CMakeLists.txt b/libs/client-sdk/tests/CMakeLists.txt index 12990c2cf..4d959d69d 100644 --- a/libs/client-sdk/tests/CMakeLists.txt +++ b/libs/client-sdk/tests/CMakeLists.txt @@ -5,7 +5,7 @@ include_directories("${PROJECT_SOURCE_DIR}/include") file(GLOB tests "${PROJECT_SOURCE_DIR}/tests/*.cpp") -set (CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) add_executable(gtest_${LIBNAME} ${tests}) diff --git a/libs/common/include/logger.hpp b/libs/common/include/logger.hpp index 03e35a506..888482304 100644 --- a/libs/common/include/logger.hpp +++ b/libs/common/include/logger.hpp @@ -15,7 +15,7 @@ namespace launchdarkly { * Logger logger(std::make_unique(LogLevel::kInfo, * "Example")); * - * // Use log macro for logging. + * // Use the macro for logging. * LD_LOG(logger, LogLevel::kInfo) << "this is a log"; * ``` */ diff --git a/libs/common/src/log_level.cpp b/libs/common/src/log_level.cpp index 5453ca49e..67d7f1e36 100644 --- a/libs/common/src/log_level.cpp +++ b/libs/common/src/log_level.cpp @@ -31,15 +31,17 @@ LogLevel GetLogLevelEnum(char const* level, LogLevel default_) { if (lowercase == "debug") { return LogLevel::kDebug; - } else if (lowercase == "info") { + } + if (lowercase == "info") { return LogLevel::kInfo; - } else if (lowercase == "warn") { + } + if (lowercase == "warn") { return LogLevel::kWarn; - } else if (lowercase == "error") { + } + if (lowercase == "error") { return LogLevel::kError; - } else { - return default_; } + return default_; } } // namespace launchdarkly diff --git a/libs/common/tests/CMakeLists.txt b/libs/common/tests/CMakeLists.txt index a9f4277a1..e83e84c70 100644 --- a/libs/common/tests/CMakeLists.txt +++ b/libs/common/tests/CMakeLists.txt @@ -5,7 +5,7 @@ include_directories("${PROJECT_SOURCE_DIR}/include") file(GLOB tests "${PROJECT_SOURCE_DIR}/tests/*.cpp") -set (CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) add_executable(gtest_${LIBNAME} ${tests}) diff --git a/libs/server-sent-events/CMakeLists.txt b/libs/server-sent-events/CMakeLists.txt index 0231eedf9..23c79e98c 100644 --- a/libs/server-sent-events/CMakeLists.txt +++ b/libs/server-sent-events/CMakeLists.txt @@ -40,6 +40,9 @@ set(Boost_USE_STATIC_RUNTIME OFF) find_package(Boost 1.80 REQUIRED) message(STATUS "LaunchDarkly: using Boost v${Boost_VERSION}") +include(${CMAKE_FILES}/certify.cmake) + + add_subdirectory(src) diff --git a/libs/server-sent-events/include/launchdarkly/sse/client.hpp b/libs/server-sent-events/include/launchdarkly/sse/client.hpp new file mode 100644 index 000000000..17400b761 --- /dev/null +++ b/libs/server-sent-events/include/launchdarkly/sse/client.hpp @@ -0,0 +1,139 @@ +#pragma once + +#include + +#include +#include +#include +#include + +#include +#include +#include + +namespace launchdarkly::sse { + +namespace beast = boost::beast; +namespace http = beast::http; +namespace net = boost::asio; + +class Client; + +/** + * Builder can be used to create an instance of Client. Minimal example: + * @code + * auto client = launchdarkly::sse::Builder(executor, + * "https://example.com").build(); + * @endcode + */ +class Builder { + public: + using EventReceiver = std::function; + using LogCallback = std::function; + + /** + * Create a builder for the given URL. If the port is omitted, 443 is + * assumed for https scheme while 80 is assumed for http scheme. + * + * Example: https://example.com:8123/endpoint + * + * @param ioc Executor for the Client. + * @param url Server-Sent-Events server URL. + */ + Builder(net::any_io_executor ioc, std::string url); + + /** + * Add a custom header to the initial request. The following headers + * are added by default and can be overridden: + * + * User-Agent: the default Boost.Beast user agent. + * Accept: text/event-stream + * Cache-Control: no-cache + * + * Note that Content-Type and + * + * @param name Header name. + * @param value Header value. + * @return Reference to this builder. + */ + Builder& header(std::string const& name, std::string const& value); + + /** + * Specifies a request body. The body is sent when the method is POST or + * REPORT. + * @return Reference to this builder. + */ + Builder& body(std::string); + + /** + * Specifies the maximum time duration between subsequent reads from the + * stream. A read counts as receiving any amount of bytes. + * @param timeout + * @return Reference to this builder. + */ + Builder& read_timeout(std::chrono::milliseconds timeout); + + /** + * Specify the method for the initial request. The default method is GET. + * @param verb The HTTP method. + * @return Reference to this builder. + */ + Builder& method(http::verb verb); + + /** + * Specify a receiver of events generated by the Client. For example: + * @code + * builder.receiver([](launchdarkly::sse::Event event) -> void { + * std::cout << event.type() << ": " << event.data() << std::endl; + * }); + * @endcode + * + * @return Reference to this builder. + */ + Builder& receiver(EventReceiver); + + /** + * Specify a logging callback for the Client. + * @param callback Callback to receive a string from the Client. + * @return Reference to this builder. + */ + Builder& logger(LogCallback callback); + + /** + * Builds a Client. The shared pointer is necessary to extend the lifetime + * of the Client to encompass each asynchronous operation that it performs. + * @return New client; call run() to kickoff the connection process and + * begin reading. + */ + std::shared_ptr build(); + + private: + std::string url_; + net::any_io_executor executor_; + http::request request_; + std::optional read_timeout_; + LogCallback logging_cb_; + EventReceiver receiver_; +}; + +/** + * Client is a long-lived Server-Sent-Events (EventSource) client which + * reads from an event stream and dispatches events to a user-specified + * receiver. + */ +class Client { + public: + virtual ~Client() = default; + /** + * Kicks off a connection to the server and begins reading the event stream. + * The provided event receiver and logging callbacks will be invoked from + * the thread that is servicing the Client's executor. + */ + virtual void run() = 0; + /** + * Closes the stream. + */ + virtual void close() = 0; +}; + +} // namespace launchdarkly::sse diff --git a/libs/server-sent-events/include/launchdarkly/sse/detail/parser.hpp b/libs/server-sent-events/include/launchdarkly/sse/detail/parser.hpp new file mode 100644 index 000000000..f0e774789 --- /dev/null +++ b/libs/server-sent-events/include/launchdarkly/sse/detail/parser.hpp @@ -0,0 +1,250 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace launchdarkly::sse::detail { + +using namespace boost::beast; + +struct Event { + std::string type; + std::string data; + std::optional id; + + Event(); + + void append_data(std::string const&); + void trim_trailing_newline(); +}; + +template +struct EventBody { + using event_type = EventReceiver; + class reader; + class value_type; +}; + +template +class EventBody::value_type { + friend class reader; + friend struct EventBody; + + EventReceiver events_; + + public: + void on_event(EventReceiver&& receiver) { events_ = std::move(receiver); } +}; + +template +struct EventBody::reader { + value_type& body_; + + std::optional buffered_line_; + std::deque complete_lines_; + bool begin_CR_; + std::optional last_event_id_; + std::optional event_; + + public: + template + reader(http::header& h, value_type& body) + : body_(body), + buffered_line_(), + complete_lines_(), + begin_CR_(false), + last_event_id_(), + event_() { + boost::ignore_unused(h); + } + + /** + * Initialize the reader. + * This is called after construction and before the first + * call to `put`. The message is valid and complete upon + * entry. + * @param content_length + * @param ec Set to the error, if any occurred. + */ + void init(boost::optional const& content_length, + error_code& ec) { + boost::ignore_unused(content_length); + + // The specification requires this to indicate "no error" + ec = {}; + } + + /** + * Store buffers. + * This is called zero or more times with parsed body octets. + * @tparam ConstBufferSequence + * @param buffers The constant buffer sequence to store. + * @param ec et to the error, if any occurred. + * @return The number of bytes transferred from the input buffers. + */ + template + std::size_t put(ConstBufferSequence const& buffers, error_code& ec) { + // The specification requires this to indicate "no error" + ec = {}; + parse_stream(buffers_to_string(buffers)); + parse_events(); + return buffer_bytes(buffers); + } + + /** + * Called when the body is complete. + * @param ec Set to the error, if any occurred. + */ + void finish(error_code& ec) { + // The specification requires this to indicate "no error" + ec = {}; + } + + private: + void complete_line() { + if (buffered_line_.has_value()) { + complete_lines_.push_back(buffered_line_.value()); + buffered_line_.reset(); + } + } + + // Appends the body to the buffered line until reaching any of the + // characters specified within the search parameter. The search parameter is + // treated as an array of search characters, not as a single token. + size_t append_up_to(boost::string_view body, std::string const& search) { + std::size_t index = body.find_first_of(search); + if (index != std::string::npos) { + body.remove_suffix(body.size() - index); + } + if (buffered_line_.has_value()) { + buffered_line_->append(body.to_string()); + } else { + buffered_line_ = std::string{body}; + } + return index == std::string::npos ? body.size() : index; + } + + void parse_stream(boost::string_view body) { + size_t i = 0; + while (i < body.size()) { + i += this->append_up_to(body.substr(i, body.length() - i), "\r\n"); + if (i == body.size()) { + break; + } + if (body.at(i) == '\r') { + complete_line(); + begin_CR_ = true; + i++; + } else if (body.at(i) == '\n') { + if (begin_CR_) { + begin_CR_ = false; + i++; + } else { + complete_line(); + i++; + } + } else { + begin_CR_ = false; + } + } + } + + static std::pair parse_field(std::string field) { + if (field.empty()) { + assert(0 && "should never parse an empty line"); + } + + size_t colon_index = field.find(':'); + switch (colon_index) { + case 0: + field.erase(0, 1); + return std::make_pair(std::string{"comment"}, std::move(field)); + case std::string::npos: + return std::make_pair(std::move(field), std::string{}); + default: + auto key = field.substr(0, colon_index); + field.erase(0, colon_index + 1); + if (field.find(' ') == 0) { + field.erase(0, 1); + } + return std::make_pair(std::move(key), std::move(field)); + } + } + + void parse_events() { + while (true) { + bool seen_empty_line = false; + + while (!complete_lines_.empty()) { + std::string line = std::move(complete_lines_.front()); + complete_lines_.pop_front(); + + if (line.empty()) { + if (event_.has_value()) { + seen_empty_line = true; + break; + } + continue; + } + + auto field = parse_field(std::move(line)); + if (field.first == "comment") { + body_.events_( + launchdarkly::sse::Event("comment", field.second)); + continue; + } + + if (!event_.has_value()) { + event_.emplace(Event{}); + event_->id = last_event_id_; + } + + if (field.first == "event") { + event_->type = field.second; + } else if (field.first == "data") { + event_->append_data(field.second); + } else if (field.first == "id") { + if (field.second.find('\0') != std::string::npos) { + // IDs with null-terminators are acceptable, but + // ignored. + continue; + } + last_event_id_ = field.second; + event_->id = last_event_id_; + } else if (field.first == "retry") { + // todo: implement + } + } + + if (seen_empty_line) { + if (event_.has_value()) { + event_->trim_trailing_newline(); + body_.events_(launchdarkly::sse::Event( + event_->type, event_->data, event_->id)); + event_.reset(); + } + continue; + } + + break; + } + } +}; + +template +std::ostream& operator<<( + std::ostream&, + http::message, Fields> const&) = delete; + +} // namespace launchdarkly::sse::detail diff --git a/libs/server-sent-events/include/launchdarkly/sse/detail/sse_stream.hpp b/libs/server-sent-events/include/launchdarkly/sse/detail/sse_stream.hpp deleted file mode 100644 index 746bb89f0..000000000 --- a/libs/server-sent-events/include/launchdarkly/sse/detail/sse_stream.hpp +++ /dev/null @@ -1,191 +0,0 @@ -#pragma once -#include -#include -#include -#include -#include - -namespace launchdarkly::sse::detail { - -namespace beast = boost::beast; // from -namespace http = beast::http; // from -namespace net = boost::asio; // from - -// A layered stream which implements the SSE protocol. -template -class sse_stream { - NextLayer m_nextLayer; - boost::optional m_bufferedLine; - bool m_lastCharWasCR; - std::vector m_completeLines; - - // This is the "initiation" object passed to async_initiate to start the - // operation - struct run_read_op { - template - void operator()(ReadHandler&& handler, - sse_stream* stream, - MutableBufferSequence const& buffers) { - using handler_type = typename std::decay::type; - - // async_base handles all of the composed operation boilerplate for - // us - using base = beast::async_base>; - - // Our composed operation is implemented as a completion handler - // object - struct op : base { - sse_stream& stream_; - - op(sse_stream& stream, - handler_type&& handler, - MutableBufferSequence const& buffers) - : base(std::move(handler), stream.get_executor()), - stream_(stream) { - // Start the asynchronous operation - stream_.next_layer().async_read_some(buffers, - std::move(*this)); - } - - void operator()(beast::error_code ec, - std::size_t bytes_transferred) { - this->complete_now(ec, bytes_transferred); - } - }; - - op(*stream, std::forward(handler), buffers); - } - }; - - // This is the "initiation" object passed to async_initiate to start the - // operation - struct run_write_op { - template - void operator()(WriteHandler&& handler, - sse_stream* stream, - ConstBufferSequence const& buffers) { - using handler_type = typename std::decay::type; - - // async_base handles all of the composed operation boilerplate for - // us - using base = beast::async_base>; - - // Our composed operation is implemented as a completion handler - // object - struct op : base { - sse_stream& stream_; - - op(sse_stream& stream, - handler_type&& handler, - ConstBufferSequence const& buffers) - : base(std::move(handler), stream.get_executor()), - stream_(stream) { - // Start the asynchronous operation - stream_.next_layer().async_write_some(buffers, - std::move(*this)); - } - - void operator()(beast::error_code ec, - std::size_t bytes_transferred) { - this->complete_now(ec, bytes_transferred); - } - }; - - op(*stream, std::forward(handler), buffers); - } - }; - - void push_buffered() { - if (m_bufferedLine) { - m_completeLines.push_back(*m_bufferedLine); - m_bufferedLine.reset(); - } - } - - void append_buffered(std::string token) { - if (m_bufferedLine) { - m_bufferedLine->append(token); - } else { - m_bufferedLine.emplace(token); - } - } - - template - void decode_and_buffer_lines(MutableBufferSequence const& chunk) { - boost::char_separator sep{"", "\r\n|\r|\n"}; - boost::tokenizer> tokens_all{chunk, sep}; - - std::vector tokens{std::begin(tokens_all), - std::end(tokens_all)}; - - for (auto tok = tokens.begin(); tok != tokens.end(); tok = tok++) { - if (*tok == "\r" || *tok == "\n" || *tok == "\r\n") { - this->push_buffered(); - } else { - this->append_buffered(*tok); - } - } - } - - public: - using executor_type = beast::executor_type; - - template - explicit sse_stream(Args&&... args) - : m_nextLayer{std::forward(args)...}, - m_lastCharWasCR{false}, - m_bufferedLine{} {} - - /// Returns an instance of the executor used to submit completion handlers - executor_type get_executor() noexcept { return m_nextLayer.get_executor(); } - - /// Returns a reference to the next layer - NextLayer& next_layer() noexcept { return m_nextLayer; } - - /// Returns a reference to the next layer - NextLayer const& next_layer() const noexcept { return m_nextLayer; } - - /// Read some data from the stream - template - std::size_t read_some(MutableBufferSequence const& buffers) { - auto const bytes_transferred = m_nextLayer.read_some(buffers); - this->decode_and_buffer_lines(buffers); - return bytes_transferred; - } - - /// Read some data from the stream - template - std::size_t read_some(MutableBufferSequence const& buffers, - beast::error_code& ec) { - auto const bytes_transferred = m_nextLayer.read_some(buffers, ec); - this->decode_and_buffer_lines(buffers); - return bytes_transferred; - } - - template > - BOOST_BEAST_ASYNC_RESULT2(ReadHandler) - async_read_some(MutableBufferSequence const& buffers, - ReadHandler&& handler = - net::default_completion_token{}) { - return net::async_initiate( - run_read_op{}, handler, this, buffers); - } - - /// Write some data to the stream asynchronously - template > - BOOST_BEAST_ASYNC_RESULT2(WriteHandler) - async_write_some(ConstBufferSequence const& buffers, - WriteHandler&& handler = - net::default_completion_token_t{}) { - return net::async_initiate( - run_write_op{}, handler, this, buffers); - } -}; - -} // namespace launchdarkly::sse::detail diff --git a/libs/server-sent-events/include/launchdarkly/sse/event.hpp b/libs/server-sent-events/include/launchdarkly/sse/event.hpp new file mode 100644 index 000000000..fb01c6edc --- /dev/null +++ b/libs/server-sent-events/include/launchdarkly/sse/event.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include +#include + +namespace launchdarkly::sse { + +class Event { + public: + Event(std::string type, std::string data); + Event(std::string type, std::string data, std::string id); + Event(std::string type, std::string data, std::optional id); + std::string const& type() const; + std::string const& data() const; + std::optional const& id() const; + std::string&& take() &&; + + private: + std::string type_; + std::string data_; + std::optional id_; +}; + +} // namespace launchdarkly::sse diff --git a/libs/server-sent-events/include/launchdarkly/sse/sse.hpp b/libs/server-sent-events/include/launchdarkly/sse/sse.hpp deleted file mode 100644 index 0bf271b5d..000000000 --- a/libs/server-sent-events/include/launchdarkly/sse/sse.hpp +++ /dev/null @@ -1,91 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace launchdarkly::sse { - -namespace beast = boost::beast; // from -namespace http = beast::http; // from -namespace net = boost::asio; // from -namespace ssl = boost::asio::ssl; // from -using tcp = boost::asio::ip::tcp; // from - -class client; - -class builder { - public: - builder(net::io_context& ioc, std::string url); - builder& header(std::string const& name, std::string const& value); - builder& method(http::verb verb); - std::shared_ptr build(); - - private: - std::string m_url; - net::io_context& m_executor; - ssl::context m_ssl_ctx; - http::request m_request; -}; - -class event_data { - std::string m_type; - std::string m_data; - boost::optional m_id; - - public: - explicit event_data(boost::optional id); - void set_type(std::string); - std::string const& get_type(); - std::string const& get_data(); - void append_data(std::string const&); -}; - -using sse_event = event_data; -using sse_comment = std::string; - -using event = std::variant; - -class client : public std::enable_shared_from_this { - using parser = - http::response_parser>; - tcp::resolver m_resolver; - beast::ssl_stream m_stream; - beast::flat_buffer m_buffer; - http::request m_request; - http::response m_response; - parser m_parser; - std::string m_host; - std::string m_port; - boost::optional m_buffered_line; - std::deque m_complete_lines; - std::vector m_events; - bool m_begin_CR; - boost::optional m_event_data; - void complete_line(); - size_t append_up_to(std::string_view body, std::string const& search); - std::size_t parse_stream(std::uint64_t remain, - std::string_view body, - beast::error_code& ec); - void parse_events(); - - public: - explicit client(net::any_io_executor ex, - ssl::context& ctx, - http::request req, - std::string host, - std::string port); - void run(); -}; - -} // namespace launchdarkly::sse diff --git a/libs/server-sent-events/src/CMakeLists.txt b/libs/server-sent-events/src/CMakeLists.txt index 9f839b815..f7a2411a2 100644 --- a/libs/server-sent-events/src/CMakeLists.txt +++ b/libs/server-sent-events/src/CMakeLists.txt @@ -2,8 +2,11 @@ file(GLOB HEADER_LIST CONFIGURE_DEPENDS "${LaunchDarklySSEClient_SOURCE_DIR}/include/launchdarkly/*.hpp") # Automatic library: static or dynamic based on user config. -add_library(${LIBNAME} sse.cpp boost-url.cpp ${HEADER_LIST}) -target_link_libraries(${LIBNAME} PUBLIC OpenSSL::SSL Boost::headers) +add_library(${LIBNAME} client.cpp parser.cpp event.cpp boost-url.cpp ${HEADER_LIST}) +target_link_libraries(${LIBNAME} + PUBLIC OpenSSL::SSL Boost::headers + PRIVATE certify::core + ) add_library(launchdarkly::sse ALIAS ${LIBNAME}) diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp new file mode 100644 index 000000000..d157bafe1 --- /dev/null +++ b/libs/server-sent-events/src/client.cpp @@ -0,0 +1,366 @@ +#include +#include + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace launchdarkly::sse { + +namespace beast = boost::beast; // from +namespace http = beast::http; // from +namespace net = boost::asio; // from +namespace ssl = boost::asio::ssl; // from +using tcp = boost::asio::ip::tcp; // from + +auto const kDefaultUserAgent = BOOST_BEAST_VERSION_STRING; + +// The allowed amount of time to connect the socket and perform +// any TLS handshake, if necessary. +const std::chrono::milliseconds kDefaultConnectTimeout = + std::chrono::seconds(15); +// Once connected, the amount of time to send a request and receive the first +// batch of bytes back. +const std::chrono::milliseconds kDefaultResponseTimeout = + std::chrono::seconds(15); + +template +class + Session { // NOLINT(cppcoreguidelines-non-private-member-variables-in-classes) + private: + Derived& derived() { return static_cast(*this); } + http::request req_; + std::chrono::milliseconds connect_timeout_; + std::chrono::milliseconds response_timeout_; + std::optional read_timeout_; + + protected: + beast::flat_buffer buffer_; + std::string host_; + std::string port_; + tcp::resolver resolver_; + Builder::LogCallback logger_; + + using cb = std::function; + using body = launchdarkly::sse::detail::EventBody; + http::response_parser parser_; + + public: + Session(net::any_io_executor const& exec, + std::string host, + std::string port, + http::request req, + std::chrono::milliseconds connect_timeout, + std::chrono::milliseconds response_timeout, + std::optional read_timeout, + Builder::EventReceiver receiver, + Builder::LogCallback logger) + : req_(std::move(req)), + resolver_(exec), + connect_timeout_(connect_timeout), + response_timeout_(response_timeout), + read_timeout_(std::move(read_timeout)), + host_(std::move(host)), + port_(std::move(port)), + logger_(std::move(logger)), + parser_() { + parser_.get().body().on_event(std::move(receiver)); + } + + void fail(beast::error_code ec, char const* what) { + logger_(std::string(what) + ": " + ec.message()); + } + + void do_resolve() { + logger_("resolving " + host_ + ":" + port_); + resolver_.async_resolve( + host_, port_, + beast::bind_front_handler(&Session::on_resolve, + derived().shared_from_this())); + } + + void on_resolve(beast::error_code ec, tcp::resolver::results_type results) { + if (ec) + return fail(ec, "resolve"); + + logger_("connecting (" + std::to_string(connect_timeout_.count()) + + " sec timeout)"); + + beast::get_lowest_layer(derived().stream()) + .expires_after(connect_timeout_); + + beast::get_lowest_layer(derived().stream()) + .async_connect(results, beast::bind_front_handler( + &Session::on_connect, + derived().shared_from_this())); + } + + void on_connect(beast::error_code ec, + tcp::resolver::results_type::endpoint_type eps) { + if (ec) { + return fail(ec, "connect"); + } + + derived().do_handshake(); + } + + void on_handshake(beast::error_code ec) { + if (ec) + return fail(ec, "handshake"); + + do_write(); + } + + void do_write() { + logger_("making request (" + std::to_string(response_timeout_.count()) + + " sec timeout)"); + + beast::get_lowest_layer(derived().stream()) + .expires_after(response_timeout_); + + http::async_write( + derived().stream(), req_, + beast::bind_front_handler(&Session::on_write, + derived().shared_from_this())); + } + + void on_write(beast::error_code ec, std::size_t) { + if (ec) + return fail(ec, "write"); + + logger_("reading response"); + + http::async_read_some( + derived().stream(), buffer_, parser_, + beast::bind_front_handler(&Session::on_read, + derived().shared_from_this())); + } + + void on_read(beast::error_code ec, std::size_t bytes_transferred) { + boost::ignore_unused(bytes_transferred); + if (ec) { + return fail(ec, "read"); + } + + if (read_timeout_) { + beast::get_lowest_layer(derived().stream()) + .expires_after(*read_timeout_); + } else { + beast::get_lowest_layer(derived().stream()).expires_never(); + }; + + http::async_read_some( + derived().stream(), buffer_, parser_, + beast::bind_front_handler(&Session::on_read, + derived().shared_from_this())); + } + + void do_close() { + logger_("closing"); + beast::get_lowest_layer(derived().stream()).cancel(); + } +}; + +class EncryptedClient : public Client, + public Session, + public std::enable_shared_from_this { + public: + EncryptedClient(net::any_io_executor ex, + ssl::context ctx, + http::request req, + std::string host, + std::string port, + std::optional read_timeout, + Builder::EventReceiver receiver, + Builder::LogCallback logger) + : Session(ex, + std::move(host), + std::move(port), + std::move(req), + kDefaultConnectTimeout, + kDefaultResponseTimeout, + std::move(read_timeout), + std::move(receiver), + std::move(logger)), + ssl_ctx_(std::move(ctx)), + stream_{ex, ssl_ctx_} {} + + virtual void run() override { + // Set SNI Hostname (many hosts need this to handshake successfully) + if (!SSL_set_tlsext_host_name(stream_.native_handle(), host_.c_str())) { + beast::error_code ec{static_cast(::ERR_get_error()), + net::error::get_ssl_category()}; + logger_("failed to set TLS host name extension: " + ec.message()); + return; + } + + do_resolve(); + } + + virtual void close() override { do_close(); } + + void do_handshake() { + stream_.async_handshake(ssl::stream_base::client, + beast::bind_front_handler( + &EncryptedClient::on_handshake, shared())); + } + + beast::ssl_stream& stream() { return stream_; } + + private: + ssl::context ssl_ctx_; + beast::ssl_stream stream_; + + std::shared_ptr shared() { + return std::static_pointer_cast(shared_from_this()); + } +}; + +class PlaintextClient : public Client, + public Session, + public std::enable_shared_from_this { + public: + PlaintextClient(net::any_io_executor ex, + http::request req, + std::string host, + std::string port, + std::optional read_timeout, + Builder::EventReceiver receiver, + Builder::LogCallback logger) + : Session(ex, + std::move(host), + std::move(port), + std::move(req), + kDefaultConnectTimeout, + kDefaultResponseTimeout, + read_timeout, + std::move(receiver), + std::move(logger)), + stream_{ex} {} + + virtual void run() override { do_resolve(); } + + void do_handshake() { + // No handshake for plaintext; immediately send the request instead. + do_write(); + } + + virtual void close() override { do_close(); } + + beast::tcp_stream& stream() { return stream_; } + + private: + beast::tcp_stream stream_; +}; + +Builder::Builder(net::any_io_executor ctx, std::string url) + : url_{std::move(url)}, + executor_{std::move(ctx)}, + read_timeout_{std::nullopt} { + receiver_ = [](launchdarkly::sse::Event const&) {}; + + request_.version(11); + request_.set(http::field::user_agent, kDefaultUserAgent); + request_.method(http::verb::get); + request_.set(http::field::accept, "text/event-stream"); + request_.set(http::field::cache_control, "no-cache"); +} + +Builder& Builder::header(std::string const& name, std::string const& value) { + request_.set(name, value); + return *this; +} + +Builder& Builder::body(std::string data) { + request_.body() = std::move(data); + return *this; +} + +Builder& Builder::read_timeout(std::chrono::milliseconds timeout) { + read_timeout_ = timeout; + return *this; +} + +Builder& Builder::method(http::verb verb) { + request_.method(verb); + return *this; +} + +Builder& Builder::receiver(EventReceiver receiver) { + receiver_ = std::move(receiver); + return *this; +} + +Builder& Builder::logger(std::function callback) { + logging_cb_ = std::move(callback); + return *this; +} + +std::shared_ptr Builder::build() { + auto uri_components = boost::urls::parse_uri(url_); + if (!uri_components) { + return nullptr; + } + + // Don't send a body unless the method is POST or REPORT + if (!(request_.method() == http::verb::post || + request_.method() == http::verb::report)) { + request_.body() = ""; + } else { + // If it is, then setup Content-Type, only if one wasn't + // specified. + if (auto it = request_.find(http::field::content_type); + it == request_.end()) { + request_.set(http::field::content_type, "text/plain"); + } + } + + request_.prepare_payload(); + + std::string host = uri_components->host(); + + request_.set(http::field::host, host); + request_.target(uri_components->path()); + + if (uri_components->scheme_id() == boost::urls::scheme::https) { + std::string port = + uri_components->has_port() ? uri_components->port() : "443"; + + ssl::context ssl_ctx{ssl::context::tlsv12_client}; + + ssl_ctx.set_verify_mode(ssl::verify_peer | + ssl::verify_fail_if_no_peer_cert); + boost::certify::enable_native_https_server_verification(ssl_ctx); + + return std::make_shared( + net::make_strand(executor_), std::move(ssl_ctx), request_, host, + port, read_timeout_, receiver_, logging_cb_); + } else { + std::string port = + uri_components->has_port() ? uri_components->port() : "80"; + + return std::make_shared( + net::make_strand(executor_), request_, host, port, read_timeout_, + receiver_, logging_cb_); + } +} + +} // namespace launchdarkly::sse diff --git a/libs/server-sent-events/src/event.cpp b/libs/server-sent-events/src/event.cpp new file mode 100644 index 000000000..7ee25c652 --- /dev/null +++ b/libs/server-sent-events/src/event.cpp @@ -0,0 +1,30 @@ +#include + +namespace launchdarkly::sse { + +Event::Event(std::string type, std::string data) + : Event(std::move(type), std::move(data), std::nullopt) {} + +Event::Event(std::string type, std::string data, std::string id) + : Event(std::move(type), + std::move(data), + std::optional{std::move(id)}) {} + +Event::Event(std::string type, std::string data, std::optional id) + : type_(std::move(type)), data_(std::move(data)), id_(std::move(id)) {} + +std::string const& Event::type() const { + return type_; +} +std::string const& Event::data() const { + return data_; +} +std::optional const& Event::id() const { + return id_; +} + +std::string&& Event::take() && { + return std::move(data_); +}; + +} // namespace launchdarkly::sse diff --git a/libs/server-sent-events/src/parser.cpp b/libs/server-sent-events/src/parser.cpp new file mode 100644 index 000000000..2fc243c02 --- /dev/null +++ b/libs/server-sent-events/src/parser.cpp @@ -0,0 +1,18 @@ +#include + +namespace launchdarkly::sse::detail { + +Event::Event() : type("message"), data(), id() {} + +void Event::append_data(std::string const& input) { + data.append(input); + data.append("\n"); +} + +void Event::trim_trailing_newline() { + if (data[data.size() - 1] == '\n') { + data.resize(data.size() - 1); + } +} + +} // namespace launchdarkly::sse::detail diff --git a/libs/server-sent-events/src/sse.cpp b/libs/server-sent-events/src/sse.cpp deleted file mode 100644 index 120bb36d7..000000000 --- a/libs/server-sent-events/src/sse.cpp +++ /dev/null @@ -1,248 +0,0 @@ -#include -#include -#include -#include -#include -#include - -namespace launchdarkly::sse { - -namespace beast = boost::beast; // from -namespace http = beast::http; // from -namespace net = boost::asio; // from -namespace ssl = boost::asio::ssl; // from -using tcp = boost::asio::ip::tcp; // from - -builder::builder(net::io_context& ctx, std::string url) - : m_url{std::move(url)}, - m_ssl_ctx{ssl::context::tlsv12_client}, - m_request{}, - m_executor{ctx} { - // This needs to be verify_peer in production!! - m_ssl_ctx.set_verify_mode(ssl::verify_none); - - m_request.version(11); - m_request.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING); - m_request.method(http::verb::get); - m_request.set("Accept", "text/event-stream"); - m_request.set("Cache-Control", "no-cache"); -} - -builder& builder::header(std::string const& name, std::string const& value) { - m_request.set(name, value); - return *this; -} - -builder& builder::method(http::verb verb) { - m_request.method(verb); - return *this; -} - -std::shared_ptr builder::build() { - boost::system::result uri_components = - boost::urls::parse_uri(m_url); - if (!uri_components) { - return nullptr; - } - std::string port; - if (!uri_components->has_port() && - uri_components->scheme_id() == boost::urls::scheme::https) { - port = "443"; - } - - m_request.set(http::field::host, uri_components->host()); - m_request.target(uri_components->path()); - - return std::make_shared(net::make_strand(m_executor), m_ssl_ctx, - m_request, uri_components->host(), port); -} - -event_data::event_data(boost::optional id) - : m_type{}, m_data{}, m_id{std::move(id)} {} - -void event_data::set_type(std::string type) { - m_type = std::move(type); -} -void event_data::append_data(std::string const& data) { - m_data.append(data); -} - -std::string const& event_data::get_type() { - return m_type; -} -std::string const& event_data::get_data() { - return m_data; -} - -client::client(net::any_io_executor ex, - ssl::context& ctx, - http::request req, - std::string host, - std::string port) - : m_resolver{ex}, - m_stream{ex, ctx}, - m_request{std::move(req)}, - m_host{std::move(host)}, - m_port{std::move(port)}, - m_parser{}, - m_buffered_line{}, - m_complete_lines{}, - m_begin_CR{false}, - m_event_data{}, - m_events{} { - // The HTTP response body is of potentially infinite length. - m_parser.body_limit(boost::none); -} - -void client::run() { - // Set SNI Hostname (many hosts need this to handshake successfully) - if (!SSL_set_tlsext_host_name(m_stream.native_handle(), m_host.c_str())) { - beast::error_code ec{static_cast(::ERR_get_error()), - net::error::get_ssl_category()}; - std::cerr << ec.message() << "\n"; - return; - } - - beast::get_lowest_layer(m_stream).expires_after(std::chrono::seconds(10)); - - auto results = m_resolver.resolve(m_host, m_port); - beast::get_lowest_layer(m_stream).connect(results); - m_stream.handshake(ssl::stream_base::client); - http::write(m_stream, m_request); - - beast::get_lowest_layer(m_stream).expires_never(); - - auto callback = [this](std::uint64_t remain, std::string_view body, - beast::error_code& ec) { - size_t read = this->parse_stream(remain, body, ec); - this->parse_events(); - return read; - }; - - m_parser.on_chunk_body(callback); - - // Blocking call until the stream terminates. - http::read(m_stream, m_buffer, m_parser); -} - -void client::complete_line() { - if (m_buffered_line.has_value()) { - m_complete_lines.push_back(m_buffered_line.value()); - std::cout << "Line: <" << m_buffered_line.value() << ">" << std::endl; - m_buffered_line.reset(); - } -} - -size_t client::append_up_to(std::string_view body, std::string const& search) { - std::size_t index = body.find_first_of(search); - if (index != std::string::npos) { - body.remove_suffix(body.size() - index); - } - if (m_buffered_line.has_value()) { - m_buffered_line->append(body); - } else { - m_buffered_line = std::string{body}; - } - return index == std::string::npos ? body.size() : index; -} - -std::size_t client::parse_stream(std::uint64_t remain, - std::string_view body, - beast::error_code& ec) { - size_t i = 0; - while (i < body.length()) { - i += this->append_up_to(body.substr(i, body.length() - i), "\r\n"); - if (body[i] == '\r') { - if (this->m_begin_CR) { - // todo: illegal token - } else { - this->m_begin_CR = true; - } - } else if (body[i] == '\n') { - this->m_begin_CR = false; - this->complete_line(); - i++; - } - } - return body.length(); -} - -boost::optional> parse_field( - std::string field) { - if (field.empty()) { - assert(0 && "should never parse an empty line"); - } - - size_t colon_index = field.find(':'); - switch (colon_index) { - case 0: - field.erase(0, 1); - return std::make_pair(std::string{"comment"}, std::move(field)); - case std::string::npos: - return std::make_pair(std::move(field), std::string{}); - default: - auto key = field.substr(0, colon_index); - field.erase(0, colon_index + 1); - if (field.find(' ') == 0) { - field.erase(0, 1); - } - return std::make_pair(std::move(key), std::move(field)); - } -} - -void client::parse_events() { - while (true) { - bool seen_empty_line = false; - - while (!m_complete_lines.empty()) { - std::string line = std::move(m_complete_lines.front()); - m_complete_lines.pop_front(); - - if (line.empty()) { - if (m_event_data.has_value()) { - seen_empty_line = true; - break; - } - continue; - } - - if (auto field = parse_field(std::move(line))) { - if (field->first == "comment") { - m_events.emplace_back(field->second); - continue; - } - - if (!m_event_data.has_value()) { - m_event_data.emplace(event_data{boost::none}); - } - - if (field->first == "event") { - m_event_data->set_type(field->second); - } else if (field->first == "data") { - m_event_data->append_data(field->second); - } else if (field->first == "id") { - std::cout << "Got ID field\n"; - } else if (field->first == "retry") { - std::cout << "Got RETRY field\n"; - } - } - } - - if (seen_empty_line) { - boost::optional data = m_event_data; - m_event_data.reset(); - - if (data.has_value()) { - std::cout << "Got event:\n"; - std::cout << "Type = <" << data->get_type() << ">, Data = <" - << data->get_data() << ">\n"; - } - - continue; - } - - break; - } -} - -} // namespace launchdarkly::sse diff --git a/libs/server-sent-events/tests/CMakeLists.txt b/libs/server-sent-events/tests/CMakeLists.txt index 4390c6f5a..d1baf87ea 100644 --- a/libs/server-sent-events/tests/CMakeLists.txt +++ b/libs/server-sent-events/tests/CMakeLists.txt @@ -5,7 +5,7 @@ include_directories("${PROJECT_SOURCE_DIR}/include") file(GLOB tests "${PROJECT_SOURCE_DIR}/tests/*.cpp") -set (CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) add_executable(gtest_${LIBNAME} ${tests})