From 56d95774bff6770080735cdd8e41f7ce98a38a4f Mon Sep 17 00:00:00 2001 From: Gregor Jasny Date: Tue, 22 Jan 2019 22:23:02 +0100 Subject: [PATCH] wip --- pull/CMakeLists.txt | 2 +- pull/src/beast_exposer.cc | 15 +- pull/src/beast_listener.cpp | 94 +++++++++ pull/src/beast_listener.h | 26 +++ pull/src/beast_session.cpp | 308 +++++++++++++++++++++++++++ pull/src/beast_session.h | 359 ++----------------------------- pull/src/beast_shared_state.cpp | 8 + pull/src/beast_shared_state.h | 20 ++ pull/src/metric_collector.cpp | 28 +++ pull/src/metric_collector.h | 15 ++ pull/src/old_beast_session.h | 361 ++++++++++++++++++++++++++++++++ 11 files changed, 888 insertions(+), 348 deletions(-) create mode 100644 pull/src/beast_listener.cpp create mode 100644 pull/src/beast_listener.h create mode 100644 pull/src/beast_session.cpp create mode 100644 pull/src/beast_shared_state.cpp create mode 100644 pull/src/beast_shared_state.h create mode 100644 pull/src/metric_collector.cpp create mode 100644 pull/src/metric_collector.h create mode 100644 pull/src/old_beast_session.h diff --git a/pull/CMakeLists.txt b/pull/CMakeLists.txt index 72e9b976..d776d2fd 100644 --- a/pull/CMakeLists.txt +++ b/pull/CMakeLists.txt @@ -15,7 +15,7 @@ add_library(pull #src/handler.cc #src/handler.h #$<$:$> - src/beast_exposer.cc src/beast_session.h) + src/beast_exposer.cc src/old_beast_session.h src/beast_listener.cpp src/beast_listener.h src/beast_shared_state.h src/beast_shared_state.cpp src/beast_session.cpp src/beast_session.h src/metric_collector.cpp src/metric_collector.h) add_library(${PROJECT_NAME}::pull ALIAS pull) diff --git a/pull/src/beast_exposer.cc b/pull/src/beast_exposer.cc index b24e99dc..4ccfa258 100644 --- a/pull/src/beast_exposer.cc +++ b/pull/src/beast_exposer.cc @@ -6,12 +6,8 @@ #include -#include "beast_session.h" - -//#include "prometheus/client_metric.h" - -//#include "CivetServer.h" -//#include "handler.h" +#include "beast_listener.h" +#include "beast_shared_state.h" namespace prometheus { @@ -28,12 +24,13 @@ namespace prometheus { auto const address = boost::asio::ip::make_address(host); boost::asio::ip::tcp::endpoint endpoint{address, port}; + auto shared_state = std::make_shared(uri, collectables_); + // Create and launch a listening port - std::make_shared( + std::make_shared( ioc, endpoint, - uri_, - collectables_)->run(); + shared_state)->run(); // Run the I/O service on the requested number of threads worker_.reserve(num_threads - 1); diff --git a/pull/src/beast_listener.cpp b/pull/src/beast_listener.cpp new file mode 100644 index 00000000..611582c5 --- /dev/null +++ b/pull/src/beast_listener.cpp @@ -0,0 +1,94 @@ +#include "beast_listener.h" +#include "beast_session.h" +#include + +BeastListener:: +BeastListener( + boost::asio::io_context& ioc, + boost::asio::ip::tcp::endpoint endpoint, + std::shared_ptr const& state) + : acceptor_(ioc) + , socket_(ioc) + , state_(state) +{ + boost::beast::error_code ec; + + // Open the acceptor + acceptor_.open(endpoint.protocol(), ec); + if(ec) + { + fail(ec, "open"); + return; + } + + // Allow address reuse + acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec); + if(ec) + { + fail(ec, "set_option"); + return; + } + + // Bind to the server address + acceptor_.bind(endpoint, ec); + if(ec) + { + fail(ec, "bind"); + return; + } + + // Start listening for connections + acceptor_.listen( + boost::asio::socket_base::max_listen_connections, ec); + if(ec) + { + fail(ec, "listen"); + return; + } +} + +void +BeastListener:: +run() +{ + // Start accepting a connection + acceptor_.async_accept( + socket_, + std::bind( + &BeastListener::on_accept, + shared_from_this(), + std::placeholders::_1)); +} + +// Report a failure +void +BeastListener:: +fail(boost::beast::error_code ec, char const* what) +{ + // Don't report on canceled operations + if(ec == boost::asio::error::operation_aborted) + return; + std::cerr << what << ": " << ec.message() << "\n"; +} + +// Handle a connection +void +BeastListener:: +on_accept(boost::beast::error_code ec) +{ + if(ec) + return fail(ec, "accept"); + else + // Launch a new session for this connection + std::make_shared( + std::move(socket_), + state_)->run(); + + // Accept another connection + acceptor_.async_accept( + socket_, + std::bind( + &BeastListener::on_accept, + shared_from_this(), + std::placeholders::_1)); +} \ No newline at end of file diff --git a/pull/src/beast_listener.h b/pull/src/beast_listener.h new file mode 100644 index 00000000..83fff056 --- /dev/null +++ b/pull/src/beast_listener.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include + +class BeastSharedState; + +class BeastListener : public std::enable_shared_from_this +{ + boost::asio::ip::tcp::acceptor acceptor_; + boost::asio::ip::tcp::socket socket_; + std::shared_ptr state_; + + void fail(boost::beast::error_code ec, char const* what); + void on_accept(boost::beast::error_code ec); + +public: + BeastListener( + boost::asio::io_context& ioc, + boost::asio::ip::tcp::endpoint endpoint, + std::shared_ptr const& state); + + // Start accepting incoming connections + void run(); +}; diff --git a/pull/src/beast_session.cpp b/pull/src/beast_session.cpp new file mode 100644 index 00000000..c7f3f68f --- /dev/null +++ b/pull/src/beast_session.cpp @@ -0,0 +1,308 @@ +#include + +#include + +#include "beast_shared_state.h" +#include "beast_session.h" +#include "metric_collector.h" +#include "prometheus/text_serializer.h" + +//#define BOOST_NO_CXX14_GENERIC_LAMBDAS + +using tcp = boost::asio::ip::tcp; +namespace beast = boost::beast; +namespace http = beast::http; + +static const std::string TEXT_PLAIN = "text/plain"; + +//------------------------------------------------------------------------------ +#if 0 +// Return a reasonable mime type based on the extension of a file. +beast::string_view +mime_type(beast::string_view path) +{ + using beast::iequals; + auto const ext = [&path] + { + auto const pos = path.rfind("."); + if(pos == beast::string_view::npos) + return beast::string_view{}; + return path.substr(pos); + }(); + if(iequals(ext, ".htm")) return "text/html"; + if(iequals(ext, ".html")) return "text/html"; + if(iequals(ext, ".php")) return "text/html"; + if(iequals(ext, ".css")) return "text/css"; + if(iequals(ext, ".txt")) return "text/plain"; + if(iequals(ext, ".js")) return "application/javascript"; + if(iequals(ext, ".json")) return "application/json"; + if(iequals(ext, ".xml")) return "application/xml"; + if(iequals(ext, ".swf")) return "application/x-shockwave-flash"; + if(iequals(ext, ".flv")) return "video/x-flv"; + if(iequals(ext, ".png")) return "image/png"; + if(iequals(ext, ".jpe")) return "image/jpeg"; + if(iequals(ext, ".jpeg")) return "image/jpeg"; + if(iequals(ext, ".jpg")) return "image/jpeg"; + if(iequals(ext, ".gif")) return "image/gif"; + if(iequals(ext, ".bmp")) return "image/bmp"; + if(iequals(ext, ".ico")) return "image/vnd.microsoft.icon"; + if(iequals(ext, ".tiff")) return "image/tiff"; + if(iequals(ext, ".tif")) return "image/tiff"; + if(iequals(ext, ".svg")) return "image/svg+xml"; + if(iequals(ext, ".svgz")) return "image/svg+xml"; + return "application/text"; +} + +// Append an HTTP rel-path to a local filesystem path. +// The returned path is normalized for the platform. +std::string +path_cat( + beast::string_view base, + beast::string_view path) +{ + if(base.empty()) + return path.to_string(); + std::string result = base.to_string(); +#if BOOST_MSVC + char constexpr path_separator = '\\'; + if(result.back() == path_separator) + result.resize(result.size() - 1); + result.append(path.data(), path.size()); + for(auto& c : result) + if(c == '/') + c = path_separator; +#else + char constexpr path_separator = '/'; + if(result.back() == path_separator) + result.resize(result.size() - 1); + result.append(path.data(), path.size()); +#endif + return result; +} +#endif +// This function produces an HTTP response for the given +// request. The type of the response object depends on the +// contents of the request, so the interface requires the +// caller to pass a generic lambda for receiving the response. +template< + class Body, class Allocator, + class Send> +void +handle_request( + std::shared_ptr const& state, + http::request>&& req, + Send&& send) +{ + // Returns a bad request response + auto const bad_request = + [&req](boost::beast::string_view why) + { + http::response res{http::status::bad_request, req.version()}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, TEXT_PLAIN); + res.keep_alive(req.keep_alive()); + res.body() = why.to_string(); + res.prepare_payload(); + return res; + }; + + // Returns a not found response + auto const not_found = + [&req](boost::beast::string_view target) + { + http::response res{http::status::not_found, req.version()}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, TEXT_PLAIN); + res.keep_alive(req.keep_alive()); + res.body() = "The resource '" + target.to_string() + "' was not found."; + res.prepare_payload(); + return res; + }; + + // Returns collected metrics + auto const metrics = + [&req](const std::vector>& collectables) + { + auto metrics = prometheus::CollectMetrics(collectables); + auto serializer = prometheus::TextSerializer{}; + + http::response res{http::status::ok, req.version()}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, TEXT_PLAIN); + res.keep_alive(req.keep_alive()); + res.body() = serializer.Serialize(metrics); + res.prepare_payload(); + return res; + }; + +// Make sure we can handle the method +if( req.method() != http::verb::get ) +return send(bad_request("Unknown HTTP-method")); + +// Request path must be absolute and not contain "..". +if( req.target().empty() || +req.target()[0] != '/' || +req.target().find("..") != boost::beast::string_view::npos) +return send(bad_request("Illegal request-target")); + +if( req.target() != state->get_uri() ) { +return send(not_found("Unknown URI")); +} + +return send(metrics(state->get_collectables())); +} + +//------------------------------------------------------------------------------ + +http_session:: +http_session( + tcp::socket socket, + std::shared_ptr const& state) + : socket_(std::move(socket)) + , state_(state) +{ +} + +void +http_session:: +run() +{ + // Read a request + http::async_read(socket_, buffer_, req_, + std::bind( + &http_session::on_read, + shared_from_this(), + std::placeholders::_1, + std::placeholders::_2)); +} + +// Report a failure +void +http_session:: +fail(beast::error_code ec, char const* what) +{ + // Don't report on canceled operations + if(ec == boost::asio::error::operation_aborted) + return; + + std::cerr << what << ": " << ec.message() << "\n"; +} + +template +void +http_session:: +send_lambda:: +operator()(http::message&& msg) const +{ + // The lifetime of the message has to extend + // for the duration of the async operation so + // we use a shared_ptr to manage it. + auto sp = std::make_shared< + http::message>(std::move(msg)); + + // Write the response + auto self = self_.shared_from_this(); + http::async_write( + self_.socket_, + *sp, + [self, sp](beast::error_code ec, std::size_t bytes) + { + self->on_write(ec, bytes, sp->need_eof()); + }); +} + +void +http_session:: +on_read(beast::error_code ec, std::size_t bytes_transferred) +{ + boost::ignore_unused(bytes_transferred); + + // This means they closed the connection + if(ec == http::error::end_of_stream) + { + socket_.shutdown(tcp::socket::shutdown_send, ec); + return; + } + + // Handle the error, if any + if(ec) + return fail(ec, "read"); + + // Send the response +#ifndef BOOST_NO_CXX14_GENERIC_LAMBDAS + // + // The following code requires generic + // lambdas, available in C++14 and later. + // + handle_request( + state_, + std::move(req_), + [this](auto&& response) + { + // The lifetime of the message has to extend + // for the duration of the async operation so + // we use a shared_ptr to manage it. + using response_type = typename std::decay::type; + auto sp = std::make_shared(std::forward(response)); + + #if 0 + // NOTE This causes an ICE in gcc 7.3 + // Write the response + http::async_write(this->socket_, *sp, + [self = shared_from_this(), sp]( + beast::error_code ec, std::size_t bytes) + { + self->on_write(ec, bytes, sp->need_eof()); + }); + #else + // Write the response + auto self = shared_from_this(); + http::async_write(this->socket_, *sp, + [self, sp]( + beast::error_code ec, std::size_t bytes) + { + self->on_write(ec, bytes, sp->need_eof()); + }); + #endif + }); +#else + // + // This code uses the function object type send_lambda in + // place of a generic lambda which is not available in C++11 + // + handle_request( + state_, + std::move(req_), + send_lambda(*this)); + +#endif +} + +void +http_session:: +on_write(boost::beast::error_code ec, std::size_t bytes_transferred, bool close) +{ + // Handle the error, if any + if(ec) + return fail(ec, "write"); + + if(close) + { + // This means we should close the connection, usually because + // the response indicated the "Connection: close" semantic. + socket_.shutdown(tcp::socket::shutdown_send, ec); + return; + } + + // Clear contents of the request message, + // otherwise the read behavior is undefined. + req_ = {}; + + // Read another request + http::async_read(socket_, buffer_, req_, + std::bind( + &http_session::on_read, + shared_from_this(), + std::placeholders::_1, + std::placeholders::_2)); +} \ No newline at end of file diff --git a/pull/src/beast_session.h b/pull/src/beast_session.h index 26b033a6..8f7fdc39 100644 --- a/pull/src/beast_session.h +++ b/pull/src/beast_session.h @@ -1,362 +1,45 @@ #pragma once -#include #include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include -#include "prometheus/collectable.h" +#include "beast_shared_state.h" -using tcp = boost::asio::ip::tcp; // from -namespace http = boost::beast::http; // from - -static const std::string MIME_TYPE = "text/plain"; - - - -std::vector CollectMetrics(const std::vector>& collectables) { - auto collected_metrics = std::vector{}; - - for (auto&& wcollectable : collectables) { - auto collectable = wcollectable.lock(); - if (!collectable) { - continue; - } - - auto&& metrics = collectable->Collect(); - collected_metrics.insert(collected_metrics.end(), - std::make_move_iterator(metrics.begin()), - std::make_move_iterator(metrics.end())); - } - - return collected_metrics; -} - - -// This function produces an HTTP response for the given -// request. The type of the response object depends on the -// contents of the request, so the interface requires the -// caller to pass a generic lambda for receiving the response. -template< - class Body, class Allocator, - class Send> -void -handle_request( - const std::string& uri, - const std::vector>& collectables, - http::request>&& req, - Send&& send) +/** Represents an established HTTP connection +*/ +class http_session : public std::enable_shared_from_this { - // Returns a bad request response - auto const bad_request = - [&req](boost::beast::string_view why) - { - http::response res{http::status::bad_request, req.version()}; - res.set(http::field::server, BOOST_BEAST_VERSION_STRING); - res.set(http::field::content_type, MIME_TYPE); - res.keep_alive(req.keep_alive()); - res.body() = why.to_string(); - res.prepare_payload(); - return res; - }; - - // Returns a not found response - auto const not_found = - [&req](boost::beast::string_view target) - { - http::response res{http::status::not_found, req.version()}; - res.set(http::field::server, BOOST_BEAST_VERSION_STRING); - res.set(http::field::content_type, MIME_TYPE); - res.keep_alive(req.keep_alive()); - res.body() = "The resource '" + target.to_string() + "' was not found."; - res.prepare_payload(); - return res; - }; - - // Returns collected metrics - auto const metrics = - [&req](const std::vector>& collectables) - { - auto metrics = CollectMetrics(collectables); - auto serializer = prometheus::TextSerializer{}; - - http::response res{http::status::ok, req.version()}; - res.set(http::field::server, BOOST_BEAST_VERSION_STRING); - res.set(http::field::content_type, MIME_TYPE); - res.keep_alive(req.keep_alive()); - res.body() = serializer.Serialize(metrics); - res.prepare_payload(); - return res; - }; - - // Make sure we can handle the method - if( req.method() != http::verb::get ) - return send(bad_request("Unknown HTTP-method")); - - // Request path must be absolute and not contain "..". - if( req.target().empty() || - req.target()[0] != '/' || - req.target().find("..") != boost::beast::string_view::npos) - return send(bad_request("Illegal request-target")); - - if( req.target() != uri ) { - return send(not_found("Unknown URI")); - } - - return send(metrics(collectables)); -} - -//------------------------------------------------------------------------------ - -// Report a failure -void -fail(boost::system::error_code ec, char const* what) -{ - std::cerr << what << ": " << ec.message() << "\n"; -} + boost::asio::ip::tcp::socket socket_; + boost::beast::flat_buffer buffer_; + std::shared_ptr state_; + boost::beast::http::request req_; -// Handles an HTTP server connection -class session : public std::enable_shared_from_this -{ - // This is the C++11 equivalent of a generic lambda. - // The function object is used to send an HTTP message. struct send_lambda { - session& self_; + http_session& self_; explicit - send_lambda(session& self) + send_lambda(http_session& self) : self_(self) { } template void - operator()(http::message&& msg) const - { - // The lifetime of the message has to extend - // for the duration of the async operation so - // we use a shared_ptr to manage it. - auto sp = std::make_shared< - http::message>(std::move(msg)); - - // Store a type-erased version of the shared - // pointer in the class to keep it alive. - self_.res_ = sp; - - // Write the response - http::async_write( - self_.socket_, - *sp, - boost::asio::bind_executor( - self_.strand_, - std::bind( - &session::on_write, - self_.shared_from_this(), - std::placeholders::_1, - std::placeholders::_2, - sp->need_eof()))); - } + operator()(boost::beast::http::message&& msg) const; }; - tcp::socket socket_; - boost::asio::strand< - boost::asio::io_context::executor_type> strand_; - boost::beast::flat_buffer buffer_; - const std::string& uri_; - const std::vector>& collectables_; - http::request req_; - std::shared_ptr res_; - send_lambda lambda_; + void fail(boost::beast::error_code ec, char const* what); + void on_read(boost::beast::error_code ec, std::size_t); + void on_write(boost::beast::error_code ec, std::size_t, bool close); public: - // Take ownership of the socket - explicit - session( - tcp::socket socket, - const std::string& uri, - const std::vector>& collectables) - : socket_(std::move(socket)) - , strand_(socket_.get_executor()) - , uri_(uri) - , collectables_(collectables) - , lambda_(*this) - { - } - - // Start the asynchronous operation - void - run() - { - do_read(); - } - - void - do_read() - { - // Read a request - http::async_read(socket_, buffer_, req_, - boost::asio::bind_executor( - strand_, - std::bind( - &session::on_read, - shared_from_this(), - std::placeholders::_1, - std::placeholders::_2))); - } - - void - on_read( - boost::system::error_code ec, - std::size_t bytes_transferred) - { - boost::ignore_unused(bytes_transferred); - - // This means they closed the connection - if(ec == http::error::end_of_stream) - return do_close(); - - if(ec) - return fail(ec, "read"); - - // Send the response - handle_request(uri_, collectables_, std::move(req_), lambda_); - } - - void - on_write( - boost::system::error_code ec, - std::size_t bytes_transferred, - bool close) - { - boost::ignore_unused(bytes_transferred); - - if(ec) - return fail(ec, "write"); - - if(close) - { - // This means we should close the connection, usually because - // the response indicated the "Connection: close" semantic. - return do_close(); - } - - // We're done with the response so delete it - res_ = nullptr; - - // Read another request - do_read(); - } - - void - do_close() - { - // Send a TCP shutdown - boost::system::error_code ec; - socket_.shutdown(tcp::socket::shutdown_send, ec); - - // At this point the connection is closed gracefully - } -}; - -//------------------------------------------------------------------------------ - -// Accepts incoming connections and launches the sessions -class listener : public std::enable_shared_from_this -{ - tcp::acceptor acceptor_; - tcp::socket socket_; - const std::string& uri_; - const std::vector>& collectables_; - -public: - listener( - boost::asio::io_context& ioc, - tcp::endpoint endpoint, - const std::string& uri, - const std::vector>& collectables) - : acceptor_(ioc) - , socket_(ioc), uri_(uri) - , collectables_(collectables) - { - boost::system::error_code ec; - - // Open the acceptor - acceptor_.open(endpoint.protocol(), ec); - if(ec) - { - fail(ec, "open"); - return; - } - - // Bind to the server address - acceptor_.bind(endpoint, ec); - if(ec) - { - fail(ec, "bind"); - return; - } - std::cerr << "bound to: " << endpoint << "\n"; - - // Start listening for connections - acceptor_.listen( - boost::asio::socket_base::max_listen_connections, ec); - if(ec) - { - fail(ec, "listen"); - return; - } - } - - // Start accepting incoming connections - void - run() - { - if(! acceptor_.is_open()) - return; - do_accept(); - } - - void - do_accept() - { - acceptor_.async_accept( - socket_, - std::bind( - &listener::on_accept, - shared_from_this(), - std::placeholders::_1)); - } - - void - on_accept(boost::system::error_code ec) - { - if(ec) - { - fail(ec, "accept"); - } - else - { - // Create the session and run it - std::make_shared( - std::move(socket_), uri_, - collectables_)->run(); - } + http_session( + boost::asio::ip::tcp::socket socket, + std::shared_ptr const& state); - // Accept another connection - do_accept(); - } -}; + void run(); +}; \ No newline at end of file diff --git a/pull/src/beast_shared_state.cpp b/pull/src/beast_shared_state.cpp new file mode 100644 index 00000000..ab209821 --- /dev/null +++ b/pull/src/beast_shared_state.cpp @@ -0,0 +1,8 @@ +#include "beast_shared_state.h" + + +BeastSharedState::BeastSharedState(std::string uri, std::vector>& collectables) +: uri_{uri}, collectables_{collectables} +{ +} + diff --git a/pull/src/beast_shared_state.h b/pull/src/beast_shared_state.h new file mode 100644 index 00000000..7961bea7 --- /dev/null +++ b/pull/src/beast_shared_state.h @@ -0,0 +1,20 @@ +#pragma once + +#include +#include +#include + +#include "prometheus/collectable.h" + +class BeastSharedState +{ +public: + BeastSharedState(std::string uri, std::vector>& collectables); + + const std::string& get_uri() const { return uri_; } + const std::vector>& get_collectables() const { return collectables_; } + +private: + const std::string uri_; + const std::vector>& collectables_; +}; diff --git a/pull/src/metric_collector.cpp b/pull/src/metric_collector.cpp new file mode 100644 index 00000000..a8050672 --- /dev/null +++ b/pull/src/metric_collector.cpp @@ -0,0 +1,28 @@ +// +// Created by Gregor Jasny on 2019-01-22. +// + +#include "metric_collector.h" + +namespace prometheus { + + std::vector + CollectMetrics(const std::vector> &collectables) { + auto collected_metrics = std::vector{}; + + for (auto &&wcollectable : collectables) { + auto collectable = wcollectable.lock(); + if (!collectable) { + continue; + } + + auto &&metrics = collectable->Collect(); + collected_metrics.insert(collected_metrics.end(), + std::make_move_iterator(metrics.begin()), + std::make_move_iterator(metrics.end())); + } + + return collected_metrics; + } + +} \ No newline at end of file diff --git a/pull/src/metric_collector.h b/pull/src/metric_collector.h new file mode 100644 index 00000000..e25048ce --- /dev/null +++ b/pull/src/metric_collector.h @@ -0,0 +1,15 @@ +#pragma once + +#include +#include + +#include "prometheus/collectable.h" +#include "prometheus/metric_family.h" + +class CivetServer; + +namespace prometheus { + + std::vector CollectMetrics(const std::vector>& collectables); + +} // namespace prometheus diff --git a/pull/src/old_beast_session.h b/pull/src/old_beast_session.h new file mode 100644 index 00000000..2f304a2e --- /dev/null +++ b/pull/src/old_beast_session.h @@ -0,0 +1,361 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "prometheus/collectable.h" + +using tcp = boost::asio::ip::tcp; // from +namespace http = boost::beast::http; // from + +static const std::string MIME_TYPE = "text/plain"; + + + +std::vector CollectMetrics(const std::vector>& collectables) { + auto collected_metrics = std::vector{}; + + for (auto&& wcollectable : collectables) { + auto collectable = wcollectable.lock(); + if (!collectable) { + continue; + } + + auto&& metrics = collectable->Collect(); + collected_metrics.insert(collected_metrics.end(), + std::make_move_iterator(metrics.begin()), + std::make_move_iterator(metrics.end())); + } + + return collected_metrics; +} + + +// This function produces an HTTP response for the given +// request. The type of the response object depends on the +// contents of the request, so the interface requires the +// caller to pass a generic lambda for receiving the response. +template< + class Body, class Allocator, + class Send> +void +handle_request( + const std::string& uri, + const std::vector>& collectables, + http::request>&& req, + Send&& send) +{ + // Returns a bad request response + auto const bad_request = + [&req](boost::beast::string_view why) + { + http::response res{http::status::bad_request, req.version()}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, MIME_TYPE); + res.keep_alive(req.keep_alive()); + res.body() = why.to_string(); + res.prepare_payload(); + return res; + }; + + // Returns a not found response + auto const not_found = + [&req](boost::beast::string_view target) + { + http::response res{http::status::not_found, req.version()}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, MIME_TYPE); + res.keep_alive(req.keep_alive()); + res.body() = "The resource '" + target.to_string() + "' was not found."; + res.prepare_payload(); + return res; + }; + + // Returns collected metrics + auto const metrics = + [&req](const std::vector>& collectables) + { + auto metrics = CollectMetrics(collectables); + auto serializer = prometheus::TextSerializer{}; + + http::response res{http::status::ok, req.version()}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, MIME_TYPE); + res.keep_alive(req.keep_alive()); + res.body() = serializer.Serialize(metrics); + res.prepare_payload(); + return res; + }; + + // Make sure we can handle the method + if( req.method() != http::verb::get ) + return send(bad_request("Unknown HTTP-method")); + + // Request path must be absolute and not contain "..". + if( req.target().empty() || + req.target()[0] != '/' || + req.target().find("..") != boost::beast::string_view::npos) + return send(bad_request("Illegal request-target")); + + if( req.target() != uri ) { + return send(not_found("Unknown URI")); + } + + return send(metrics(collectables)); +} + +//------------------------------------------------------------------------------ + +// Report a failure +void +fail(boost::system::error_code ec, char const* what) +{ + std::cerr << what << ": " << ec.message() << "\n"; +} + +// Handles an HTTP server connection +class session : public std::enable_shared_from_this +{ + // This is the C++11 equivalent of a generic lambda. + // The function object is used to send an HTTP message. + struct send_lambda + { + session& self_; + + explicit + send_lambda(session& self) + : self_(self) + { + } + + template + void + operator()(http::message&& msg) const + { + // The lifetime of the message has to extend + // for the duration of the async operation so + // we use a shared_ptr to manage it. + auto sp = std::make_shared< + http::message>(std::move(msg)); + + // Store a type-erased version of the shared + // pointer in the class to keep it alive. + self_.res_ = sp; + + // Write the response + http::async_write( + self_.socket_, + *sp, + boost::asio::bind_executor( + self_.strand_, + std::bind( + &session::on_write, + self_.shared_from_this(), + std::placeholders::_1, + std::placeholders::_2, + sp->need_eof()))); + } + }; + + tcp::socket socket_; + boost::asio::strand< + boost::asio::io_context::executor_type> strand_; + boost::beast::flat_buffer buffer_; + const std::string& uri_; + const std::vector>& collectables_; + http::request req_; + std::shared_ptr res_; + send_lambda lambda_; + +public: + // Take ownership of the socket + explicit + session( + tcp::socket socket, + std::shared_ptr const& state) + : socket_(std::move(socket)) + , strand_(socket_.get_executor()) + , uri_(uri) + , collectables_(collectables) + , lambda_(*this) + { + } + + // Start the asynchronous operation + void + run() + { + do_read(); + } + + void + do_read() + { + // Read a request + http::async_read(socket_, buffer_, req_, + boost::asio::bind_executor( + strand_, + std::bind( + &session::on_read, + shared_from_this(), + std::placeholders::_1, + std::placeholders::_2))); + } + + void + on_read( + boost::system::error_code ec, + std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + // This means they closed the connection + if(ec == http::error::end_of_stream) + return do_close(); + + if(ec) + return fail(ec, "read"); + + // Send the response + handle_request(uri_, collectables_, std::move(req_), lambda_); + } + + void + on_write( + boost::system::error_code ec, + std::size_t bytes_transferred, + bool close) + { + boost::ignore_unused(bytes_transferred); + + if(ec) + return fail(ec, "write"); + + if(close) + { + // This means we should close the connection, usually because + // the response indicated the "Connection: close" semantic. + return do_close(); + } + + // We're done with the response so delete it + res_ = nullptr; + + // Read another request + do_read(); + } + + void + do_close() + { + // Send a TCP shutdown + boost::system::error_code ec; + socket_.shutdown(tcp::socket::shutdown_send, ec); + + // At this point the connection is closed gracefully + } +}; + +//------------------------------------------------------------------------------ + +// Accepts incoming connections and launches the sessions +class listener : public std::enable_shared_from_this +{ + tcp::acceptor acceptor_; + tcp::socket socket_; + const std::string& uri_; + const std::vector>& collectables_; + +public: + listener( + boost::asio::io_context& ioc, + tcp::endpoint endpoint, + const std::string& uri, + const std::vector>& collectables) + : acceptor_(ioc) + , socket_(ioc), uri_(uri) + , collectables_(collectables) + { + boost::system::error_code ec; + + // Open the acceptor + acceptor_.open(endpoint.protocol(), ec); + if(ec) + { + fail(ec, "open"); + return; + } + + // Bind to the server address + acceptor_.bind(endpoint, ec); + if(ec) + { + fail(ec, "bind"); + return; + } + std::cerr << "bound to: " << endpoint << "\n"; + + // Start listening for connections + acceptor_.listen( + boost::asio::socket_base::max_listen_connections, ec); + if(ec) + { + fail(ec, "listen"); + return; + } + } + + // Start accepting incoming connections + void + run() + { + if(! acceptor_.is_open()) + return; + do_accept(); + } + + void + do_accept() + { + acceptor_.async_accept( + socket_, + std::bind( + &listener::on_accept, + shared_from_this(), + std::placeholders::_1)); + } + + void + on_accept(boost::system::error_code ec) + { + if(ec) + { + fail(ec, "accept"); + } + else + { + // Create the session and run it + std::make_shared( + std::move(socket_), uri_, + collectables_)->run(); + } + + // Accept another connection + do_accept(); + } +};