Skip to content

Commit

Permalink
Merge 'Add metrics to S3 client' from Pavel Emelyanov
Browse files Browse the repository at this point in the history
The added metrics include:

- http client metrics, which include the number of connections, the number of active connections and the number of new connections made so far
- IO metrics that mimic those for traditional IO -- total number of object read/write ops, total number of get/put/uploaded bytes and individual IO request delay (round-trip, including body transfer time)

fixes: #13369

Closes #14494

* github.com:scylladb/scylladb:
  s3/client: Add IO stats metrics
  s3/client: Add HTTP client metrics
  s3/client: Split make_request()
  s3/client: Wrap http client with struct group_client
  s3/client: Move client::stats to namespace scope
  s3/client: Keep part size local variable
  • Loading branch information
denesb committed Sep 14, 2023
2 parents fa88ed7 + 308db51 commit cc16502
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 17 deletions.
2 changes: 1 addition & 1 deletion test/boost/s3_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_put_get_object) {
BOOST_REQUIRE_EQUAL(sz, 10);

testlog.info("Get object stats\n");
s3::client::stats st = cln->get_object_stats(name).get0();
s3::stats st = cln->get_object_stats(name).get0();
BOOST_REQUIRE_EQUAL(st.size, 10);
// forgive timezone difference as minio server is GMT by default
BOOST_REQUIRE(std::difftime(st.last_modified, gc_clock::to_time_t(gc_clock::now())) < 24*3600);
Expand Down
79 changes: 68 additions & 11 deletions utils/s3/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,38 @@ void client::authorize(http::request& req) {
req._headers["Authorization"] = format("AWS4-HMAC-SHA256 Credential={}/{}/{}/s3/aws4_request,SignedHeaders={},Signature={}", _cfg->aws->key, time_point_st, _cfg->aws->region, signed_headers_list, sig);
}

future<> client::make_request(http::request req, http::experimental::client::reply_handler handle, http::reply::status_type expected) {
authorize(req);
client::group_client::group_client(std::unique_ptr<http::experimental::connection_factory> f, unsigned max_conn)
: http(std::move(f), max_conn)
{
}

void client::group_client::register_metrics(std::string class_name, std::string host) {
namespace sm = seastar::metrics;
auto ep_label = sm::label("endpoint")(host);
auto sg_label = sm::label("class")(class_name);
metrics.add_group("s3", {
sm::make_gauge("nr_connections", [this] { return http.connections_nr(); },
sm::description("Total number of connections"), {ep_label, sg_label}),
sm::make_gauge("nr_active_connections", [this] { return http.connections_nr() - http.idle_connections_nr(); },
sm::description("Total number of connections with running requests"), {ep_label, sg_label}),
sm::make_counter("total_new_connections", [this] { return http.total_new_connections_nr(); },
sm::description("Total number of new connections created so far"), {ep_label, sg_label}),
sm::make_counter("total_read_requests", [this] { return read_stats.ops; },
sm::description("Total number of object read requests"), {ep_label, sg_label}),
sm::make_counter("total_write_requests", [this] { return write_stats.ops; },
sm::description("Total number of object write requests"), {ep_label, sg_label}),
sm::make_counter("total_read_bytes", [this] { return read_stats.bytes; },
sm::description("Total number of bytes read from objects"), {ep_label, sg_label}),
sm::make_counter("total_write_bytes", [this] { return write_stats.bytes; },
sm::description("Total number of bytes written to objects"), {ep_label, sg_label}),
sm::make_counter("total_read_latency_sec", [this] { return read_stats.duration.count(); },
sm::description("Total time spent reading data from objects"), {ep_label, sg_label}),
sm::make_counter("total_write_latency_sec", [this] { return write_stats.duration.count(); },
sm::description("Total time spend writing data to objects"), {ep_label, sg_label}),
});
}

client::group_client& client::find_or_create_client() {
auto sg = current_scheduling_group();
auto it = _https.find(sg);
if (it == _https.end()) [[unlikely]] {
Expand All @@ -192,8 +222,25 @@ future<> client::make_request(http::request req, http::experimental::client::rep
std::forward_as_tuple(sg),
std::forward_as_tuple(std::move(factory), max_connections)
).first;

it->second.register_metrics(sg.name(), _host);
}
return it->second.make_request(std::move(req), std::move(handle), expected);
return it->second;
}

future<> client::make_request(http::request req, http::experimental::client::reply_handler handle, http::reply::status_type expected) {
authorize(req);
auto& gc = find_or_create_client();
return gc.http.make_request(std::move(req), std::move(handle), expected);
}

future<> client::make_request(http::request req, reply_handler_ext handle_ex, http::reply::status_type expected) {
authorize(req);
auto& gc = find_or_create_client();
auto handle = [&gc, handle = std::move(handle_ex)] (const http::reply& rep, input_stream<char>&& in) {
return handle(gc, rep, std::move(in));
};
return gc.http.make_request(std::move(req), std::move(handle), expected);
}

future<> client::get_object_header(sstring object_name, http::experimental::client::reply_handler handler) {
Expand Down Expand Up @@ -224,7 +271,7 @@ static std::time_t parse_http_last_modified_time(const sstring& object_name, sst
return std::mktime(&tm);
}

future<client::stats> client::get_object_stats(sstring object_name) {
future<stats> client::get_object_stats(sstring object_name) {
struct stats st{};
co_await get_object_header(object_name, [&] (const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
st.size = rep.content_length;
Expand Down Expand Up @@ -334,7 +381,7 @@ future<temporary_buffer<char>> client::get_object_contiguous(sstring object_name

size_t off = 0;
std::optional<temporary_buffer<char>> ret;
co_await make_request(std::move(req), [&off, &ret, &object_name] (const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
co_await make_request(std::move(req), [&off, &ret, &object_name, start = s3_clock::now()] (group_client& gc, const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
auto in = std::move(in_);
ret = temporary_buffer<char>(rep.content_length);
s3l.trace("Consume {} bytes for {}", ret->size(), object_name);
Expand All @@ -349,6 +396,8 @@ future<temporary_buffer<char>> client::get_object_contiguous(sstring object_name
off += to_copy;
}
return make_ready_future<consumption_result<char>>(continue_consuming());
}).then([&gc, &off, start] {
gc.read_stats.update(off, s3_clock::now() - start);
});
}, expected);
ret->trim(off);
Expand All @@ -374,7 +423,10 @@ future<> client::put_object(sstring object_name, temporary_buffer<char> buf) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
});
co_await make_request(std::move(req));
co_await make_request(std::move(req), [len, start = s3_clock::now()] (group_client& gc, const auto& rep, auto&& in) {
gc.write_stats.update(len, s3_clock::now() - start);
return ignore_reply(rep, std::move(in));
});
}

future<> client::put_object(sstring object_name, ::memory_data_sink_buffers bufs) {
Expand All @@ -397,7 +449,10 @@ future<> client::put_object(sstring object_name, ::memory_data_sink_buffers bufs
co_await coroutine::return_exception_ptr(std::move(ex));
}
});
co_await make_request(std::move(req));
co_await make_request(std::move(req), [len, start = s3_clock::now()] (group_client& gc, const auto& rep, auto&& in) {
gc.write_stats.update(len, s3_clock::now() - start);
return ignore_reply(rep, std::move(in));
});
}

future<> client::delete_object(sstring object_name) {
Expand Down Expand Up @@ -548,10 +603,11 @@ future<> client::upload_sink_base::upload_part(memory_data_sink_buffers bufs) {
_part_etags.emplace_back();
s3l.trace("PUT part {} {} bytes in {} buffers (upload id {})", part_number, bufs.size(), bufs.buffers().size(), _upload_id);
auto req = http::request::make("PUT", _client->_host, _object_name);
req._headers["Content-Length"] = format("{}", bufs.size());
auto size = bufs.size();
req._headers["Content-Length"] = format("{}", size);
req.query_parameters["partNumber"] = format("{}", part_number + 1);
req.query_parameters["uploadId"] = _upload_id;
req.write_body("bin", bufs.size(), [this, part_number, bufs = std::move(bufs)] (output_stream<char>&& out_) mutable -> future<> {
req.write_body("bin", size, [this, part_number, bufs = std::move(bufs)] (output_stream<char>&& out_) mutable -> future<> {
auto out = std::move(out_);
std::exception_ptr ex;
s3l.trace("upload {} part data (upload id {})", part_number, _upload_id);
Expand Down Expand Up @@ -581,10 +637,11 @@ future<> client::upload_sink_base::upload_part(memory_data_sink_buffers bufs) {
// In case part upload goes wrong and doesn't happen, the _part_etags[part]
// is not set, so the finalize_upload() sees it and aborts the whole thing.
auto gh = _bg_flushes.hold();
(void)_client->make_request(std::move(req), [this, part_number] (const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
(void)_client->make_request(std::move(req), [this, size, part_number, start = s3_clock::now()] (group_client& gc, const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
auto etag = rep.get_header("ETag");
s3l.trace("uploaded {} part data -> etag = {} (upload id {})", part_number, etag, _upload_id);
_part_etags[part_number] = std::move(etag);
gc.write_stats.update(size, s3_clock::now() - start);
return make_ready_future<>();
}).handle_exception([this, part_number] (auto ex) {
// ... the exact exception only remains in logs
Expand Down Expand Up @@ -879,7 +936,7 @@ file client::make_readable_file(sstring object_name) {

future<> client::close() {
co_await coroutine::parallel_for_each(_https, [] (auto& it) -> future<> {
co_await it.second.close();
co_await it.second.http.close();
});
}

Expand Down
36 changes: 31 additions & 5 deletions utils/s3/client.hh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <seastar/core/file.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/metrics.hh>
#include <seastar/http/client.hh>
#include "utils/s3/creds.hh"

Expand All @@ -17,6 +18,8 @@ class memory_data_sink_buffers;

namespace s3 {

using s3_clock = std::chrono::steady_clock;

struct range {
uint64_t off;
size_t len;
Expand All @@ -29,6 +32,11 @@ struct tag {
};
using tag_set = std::vector<tag>;

struct stats {
uint64_t size;
std::time_t last_modified;
};

future<> ignore_reply(const http::reply& rep, input_stream<char>&& in_);

class client : public enable_shared_from_this<client> {
Expand All @@ -38,25 +46,43 @@ class client : public enable_shared_from_this<client> {
class readable_file;
std::string _host;
endpoint_config_ptr _cfg;
std::unordered_map<seastar::scheduling_group, http::experimental::client> _https;
struct io_stats {
uint64_t ops = 0;
uint64_t bytes = 0;
std::chrono::duration<double> duration = std::chrono::duration<double>(0);

void update(uint64_t len, std::chrono::duration<double> lat) {
ops++;
bytes += len;
duration += lat;
}
};
struct group_client {
http::experimental::client http;
io_stats read_stats;
io_stats write_stats;
seastar::metrics::metric_groups metrics;
group_client(std::unique_ptr<http::experimental::connection_factory> f, unsigned max_conn);
void register_metrics(std::string class_name, std::string host);
};
std::unordered_map<seastar::scheduling_group, group_client> _https;
using global_factory = std::function<shared_ptr<client>(std::string)>;
global_factory _gf;

struct private_tag {};

void authorize(http::request&);
group_client& find_or_create_client();
future<> make_request(http::request req, http::experimental::client::reply_handler handle = ignore_reply, http::reply::status_type expected = http::reply::status_type::ok);
using reply_handler_ext = noncopyable_function<future<>(group_client&, const http::reply&, input_stream<char>&& body)>;
future<> make_request(http::request req, reply_handler_ext handle, http::reply::status_type expected = http::reply::status_type::ok);

future<> get_object_header(sstring object_name, http::experimental::client::reply_handler handler);
public:
explicit client(std::string host, endpoint_config_ptr cfg, global_factory gf, private_tag);
static shared_ptr<client> make(std::string endpoint, endpoint_config_ptr cfg, global_factory gf = {});

future<uint64_t> get_object_size(sstring object_name);
struct stats {
uint64_t size;
std::time_t last_modified;
};
future<stats> get_object_stats(sstring object_name);
future<tag_set> get_object_tagging(sstring object_name);
future<> put_object_tagging(sstring object_name, tag_set tagging);
Expand Down

0 comments on commit cc16502

Please sign in to comment.