Skip to content

Commit

Permalink
Merge 'Add concurrency control and workload isolation for S3 client' …
Browse files Browse the repository at this point in the history
…from Pavel Emelyanov

In its current state s3 client uses a single default-configured http client thus making different sched classes' workload compete with each other for sockets to make requests on. There's an attempt to handle that in upload-sink implementation that limits itself with some small number of concurrent PUT requests, but that doesn't help much as many sinks don't share this limit.

This PR makes S3 client maintain a set of http clients, one per sched-group, configures maximum number of TCP connections proportional to group's shares and removes the artificial limit from sinks thus making them share the group's http concurrency limit.

As a side effect, the upload-sink fixes the no-writes-after-flush protection -- if it's violated, write will result in exception, while currently it just hangs on a semaphore forever.

fixes: #13458
fixes: #13320
fixes: #13021

Closes #14187

* github.com:scylladb/scylladb:
  s3/client: Replace skink flush semaphore with gate
  s3/client: Configure different max-connections on http clients
  s3/client: Maintain several http clients on-board
  s3/client: Remove now unused http reference from sink and file
  s3/client: Add make_request() method
  • Loading branch information
denesb committed Jun 20, 2023
2 parents 7deba4f + c1c1752 commit ddf8547
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 42 deletions.
90 changes: 49 additions & 41 deletions utils/s3/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
#include <boost/algorithm/string/classification.hpp>
#include <boost/range/adaptor/map.hpp>
#include <seastar/core/coroutine.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/shared_future.hh>
#include <seastar/coroutine/all.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/net/dns.hh>
#include <seastar/net/tls.hh>
#include <seastar/util/short_streams.hh>
Expand Down Expand Up @@ -110,7 +111,6 @@ class dns_connection_factory : public http::experimental::connection_factory {
client::client(std::string host, endpoint_config_ptr cfg, global_factory gf, private_tag)
: _host(std::move(host))
, _cfg(std::move(cfg))
, _http(std::make_unique<dns_connection_factory>(_host, _cfg->port, _cfg->use_https))
, _gf(std::move(gf))
{
}
Expand Down Expand Up @@ -167,11 +167,28 @@ void client::authorize(http::request& req) {
req._headers["Authorization"] = format("AWS4-HMAC-SHA256 Credential={}/{}/{}/s3/aws4_request,SignedHeaders={},Signature={}", _cfg->aws->key, time_point_st, _cfg->aws->region, signed_headers_list, sig);
}

future<> client::make_request(http::request req, http::experimental::client::reply_handler handle, http::reply::status_type expected) {
authorize(req);
auto sg = current_scheduling_group();
auto it = _https.find(sg);
if (it == _https.end()) [[unlikely]] {
auto factory = std::make_unique<dns_connection_factory>(_host, _cfg->port, _cfg->use_https);
// Limit the maximum number of connections this group's http client
// may have proportional to its shares. Shares are typically in the
// range of 100...1000, thus resulting in 1..10 connections
auto max_connections = std::max((unsigned)(sg.get_shares() / 100), 1u);
it = _https.emplace(std::piecewise_construct,
std::forward_as_tuple(sg),
std::forward_as_tuple(std::move(factory), max_connections)
).first;
}
return it->second.make_request(std::move(req), std::move(handle), expected);
}

future<> client::get_object_header(sstring object_name, http::experimental::client::reply_handler handler) {
s3l.trace("HEAD {}", object_name);
auto req = http::request::make("HEAD", _host, object_name);
authorize(req);
return _http.make_request(std::move(req), std::move(handler));
return make_request(std::move(req), std::move(handler));
}

future<uint64_t> client::get_object_size(sstring object_name) {
Expand Down Expand Up @@ -220,8 +237,7 @@ future<temporary_buffer<char>> client::get_object_contiguous(sstring object_name

size_t off = 0;
std::optional<temporary_buffer<char>> ret;
authorize(req);
co_await _http.make_request(std::move(req), [&off, &ret, &object_name] (const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
co_await make_request(std::move(req), [&off, &ret, &object_name] (const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
auto in = std::move(in_);
ret = temporary_buffer<char>(rep.content_length);
s3l.trace("Consume {} bytes for {}", ret->size(), object_name);
Expand Down Expand Up @@ -261,8 +277,7 @@ future<> client::put_object(sstring object_name, temporary_buffer<char> buf) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
});
authorize(req);
co_await _http.make_request(std::move(req), ignore_reply);
co_await make_request(std::move(req));
}

future<> client::put_object(sstring object_name, ::memory_data_sink_buffers bufs) {
Expand All @@ -285,27 +300,22 @@ future<> client::put_object(sstring object_name, ::memory_data_sink_buffers bufs
co_await coroutine::return_exception_ptr(std::move(ex));
}
});
authorize(req);
co_await _http.make_request(std::move(req), ignore_reply);
co_await make_request(std::move(req));
}

future<> client::delete_object(sstring object_name) {
s3l.trace("DELETE {}", object_name);
auto req = http::request::make("DELETE", _host, object_name);
authorize(req);
co_await _http.make_request(std::move(req), ignore_reply, http::reply::status_type::no_content);
co_await make_request(std::move(req), ignore_reply, http::reply::status_type::no_content);
}

class client::upload_sink_base : public data_sink_impl {
static constexpr int flush_concurrency = 3;

protected:
shared_ptr<client> _client;
http::experimental::client& _http;
sstring _object_name;
sstring _upload_id;
utils::chunked_vector<sstring> _part_etags;
semaphore _flush_sem{flush_concurrency};
gate _bg_flushes;

future<> start_upload();
future<> finalize_upload();
Expand All @@ -320,7 +330,6 @@ class client::upload_sink_base : public data_sink_impl {
public:
upload_sink_base(shared_ptr<client> cln, sstring object_name)
: _client(std::move(cln))
, _http(_client->_http)
, _object_name(std::move(object_name))
{
}
Expand Down Expand Up @@ -422,8 +431,7 @@ future<> client::upload_sink_base::start_upload() {
s3l.trace("POST uploads {}", _object_name);
auto rep = http::request::make("POST", _client->_host, _object_name);
rep.query_parameters["uploads"] = "";
_client->authorize(rep);
co_await _http.make_request(std::move(rep), [this] (const http::reply& rep, input_stream<char>&& in_) -> future<> {
co_await _client->make_request(std::move(rep), [this] (const http::reply& rep, input_stream<char>&& in_) -> future<> {
auto in = std::move(in_);
auto body = co_await util::read_entire_stream_contiguous(in);
_upload_id = parse_multipart_upload_id(body);
Expand Down Expand Up @@ -465,36 +473,38 @@ future<> client::upload_sink_base::upload_part(memory_data_sink_buffers bufs) {
});

// Do upload in the background so that several parts could go in parallel.
// The semaphore is used for two things -- control the concurrency and let
// the finalize_upload() wait in any background activity before checking
// the progress.
// The gate lets the finalize_upload() wait in any background activity
// before checking the progress.
//
// Upload parallelizm is managed per-sched-group -- client maintains a set
// of http clients each with its own max-connections. When upload happens it
// will naturally be limited with the relevant http client's connections
// limit not affecting other groups' requests concurrency
//
// In case part upload goes wrong and doesn't happen, the _part_etags[part]
// is not set, so the finalize_upload() sees it and aborts the whole thing.
_client->authorize(req);
auto units = co_await get_units(_flush_sem, 1);
(void)_http.make_request(std::move(req), [this, part_number] (const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
auto gh = _bg_flushes.hold();
(void)_client->make_request(std::move(req), [this, part_number] (const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
auto etag = rep.get_header("ETag");
s3l.trace("uploaded {} part data -> etag = {} (upload id {})", part_number, etag, _upload_id);
_part_etags[part_number] = std::move(etag);
return make_ready_future<>();
}).handle_exception([this, part_number] (auto ex) {
// ... the exact exception only remains in logs
s3l.warn("couldn't upload part {}: {} (upload id {})", part_number, ex, _upload_id);
}).finally([units = std::move(units)] {});
}).finally([gh = std::move(gh)] {});
}

future<> client::upload_sink_base::abort_upload() {
s3l.trace("DELETE upload {}", _upload_id);
auto req = http::request::make("DELETE", _client->_host, _object_name);
req.query_parameters["uploadId"] = std::exchange(_upload_id, ""); // now upload_started() returns false
_client->authorize(req);
co_await _http.make_request(std::move(req), ignore_reply, http::reply::status_type::no_content);
co_await _client->make_request(std::move(req), ignore_reply, http::reply::status_type::no_content);
}

future<> client::upload_sink_base::finalize_upload() {
s3l.trace("wait for {} parts to complete (upload id {})", _part_etags.size(), _upload_id);
co_await _flush_sem.wait(flush_concurrency);
co_await _bg_flushes.close();

unsigned parts_xml_len = prepare_multipart_upload_parts(_part_etags);
if (parts_xml_len == 0) {
Expand All @@ -508,8 +518,7 @@ future<> client::upload_sink_base::finalize_upload() {
req.write_body("xml", parts_xml_len, [this] (output_stream<char>&& out) -> future<> {
return dump_multipart_upload_parts(std::move(out), _part_etags);
});
_client->authorize(req);
co_await _http.make_request(std::move(req), ignore_reply);
co_await _client->make_request(std::move(req));
}

future<> client::upload_sink_base::close() {
Expand All @@ -518,9 +527,9 @@ future<> client::upload_sink_base::close() {
// If we got here, we need to pick up any background activity as it may
// still trying to handle successful request and 'this' should remain alive
//
// The semaphore is not waited by finalize_upload() (i.e. -- no self-lock),
// The gate is not closed by finalize_upload() (i.e. -- no double-close),
// because otherwise the upload_started() would return false
co_await _flush_sem.wait(flush_concurrency);
co_await _bg_flushes.close();
co_await abort_upload();
} else {
s3l.trace("closing multipart upload");
Expand Down Expand Up @@ -581,17 +590,16 @@ future<> client::upload_sink_base::upload_part(std::unique_ptr<upload_sink> piec
req._headers["x-amz-copy-source"] = piece._object_name;

// See comment in upload_part(memory_data_sink_buffers) overload regarding the
// _flush_sem usage and _part_etags assignments
// _bg_flushes usage and _part_etags assignments
//
// Before the piece's object can be copied into the target one, it should be
// flushed and closed. After the object is copied, it can be removed. If copy
// goes wrong, the object should be removed anyway.
_client->authorize(req);
auto units = co_await get_units(_flush_sem, 1);
auto gh = _bg_flushes.hold();
(void)piece.flush().then([&piece] () {
return piece.close();
}).then([this, part_number, req = std::move(req)] () mutable {
return _http.make_request(std::move(req), [this, part_number] (const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
return _client->make_request(std::move(req), [this, part_number] (const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
return do_with(std::move(in_), [this, part_number] (auto& in) mutable {
return util::read_entire_stream_contiguous(in).then([this, part_number] (auto body) mutable {
auto etag = parse_multipart_copy_upload_etag(body);
Expand All @@ -611,7 +619,7 @@ future<> client::upload_sink_base::upload_part(std::unique_ptr<upload_sink> piec
return _client->delete_object(piece._object_name).handle_exception([&piece] (auto ex) {
s3l.warn("failed to remove copy-upload piece {}", piece._object_name);
});
}).finally([units = std::move(units), piece_ptr = std::move(piece_ptr)] {});
}).finally([gh = std::move(gh), piece_ptr = std::move(piece_ptr)] {});
}

class client::upload_jumbo_sink final : public upload_sink_base {
Expand Down Expand Up @@ -675,7 +683,6 @@ data_sink client::make_upload_jumbo_sink(sstring object_name, std::optional<unsi

class client::readable_file : public file_impl {
shared_ptr<client> _client;
http::experimental::client& _http;
sstring _object_name;

[[noreturn]] void unsupported() {
Expand All @@ -685,7 +692,6 @@ class client::readable_file : public file_impl {
public:
readable_file(shared_ptr<client> cln, sstring object_name)
: _client(std::move(cln))
, _http(_client->_http)
, _object_name(std::move(object_name))
{
}
Expand Down Expand Up @@ -775,7 +781,9 @@ file client::make_readable_file(sstring object_name) {
}

future<> client::close() {
co_await _http.close();
co_await coroutine::parallel_for_each(_https, [] (auto& it) -> future<> {
co_await it.second.close();
});
}

} // s3 namespace
5 changes: 4 additions & 1 deletion utils/s3/client.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@ struct range {
size_t len;
};

future<> ignore_reply(const http::reply& rep, input_stream<char>&& in_);

class client : public enable_shared_from_this<client> {
class upload_sink_base;
class upload_sink;
class upload_jumbo_sink;
class readable_file;
std::string _host;
endpoint_config_ptr _cfg;
http::experimental::client _http;
std::unordered_map<seastar::scheduling_group, http::experimental::client> _https;
using global_factory = std::function<shared_ptr<client>(std::string)>;
global_factory _gf;

struct private_tag {};

void authorize(http::request&);
future<> make_request(http::request req, http::experimental::client::reply_handler handle = ignore_reply, http::reply::status_type expected = http::reply::status_type::ok);

future<> get_object_header(sstring object_name, http::experimental::client::reply_handler handler);
public:
Expand Down

0 comments on commit ddf8547

Please sign in to comment.