Skip to content

Commit

Permalink
Merge pull request #18446 from WillemKauf/s3_client_max_keys_backport…
Browse files Browse the repository at this point in the history
…_v23.2.x

[v23.2.x] cloud_storage: correct `list_object()` request headers and parameters (manual backport)
  • Loading branch information
piyushredpanda committed May 13, 2024
2 parents 3066923 + 0262f50 commit 4e619eb
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 63 deletions.
23 changes: 19 additions & 4 deletions src/v/cloud_storage/remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,9 @@ ss::future<remote::list_result> remote::list_objects(
retry_chain_node& parent,
std::optional<cloud_storage_clients::object_key> prefix,
std::optional<char> delimiter,
std::optional<cloud_storage_clients::client::item_filter> item_filter) {
std::optional<cloud_storage_clients::client::item_filter> item_filter,
std::optional<size_t> max_keys,
std::optional<ss::sstring> continuation_token) {
ss::gate::holder gh{_gate};
retry_chain_node fib(&parent);
retry_chain_logger ctxlog(cst_log, fib);
Expand All @@ -1026,18 +1028,23 @@ ss::future<remote::list_result> remote::list_objects(
std::optional<list_result> result;

bool items_remaining = true;
std::optional<ss::sstring> continuation_token = std::nullopt;

// Gathers the items from a series of successful ListObjectsV2 calls
cloud_storage_clients::client::list_bucket_result list_bucket_result;

// Keep iterating until the ListObjectsV2 calls has more items to return
const auto caller_handle_truncation = max_keys.has_value();

if (caller_handle_truncation) {
vassert(max_keys.value() > 0, "Max keys must be greater than 0.");
}

// Keep iterating while the ListObjectsV2 calls has more items to return
while (!_gate.is_closed() && permit.is_allowed && !result) {
auto res = co_await lease.client->list_objects(
bucket,
prefix,
std::nullopt,
std::nullopt,
max_keys,
continuation_token,
fib.get_timeout(),
delimiter,
Expand Down Expand Up @@ -1070,6 +1077,14 @@ ss::future<remote::list_result> remote::list_objects(

// Continue to list the remaining items
if (items_remaining) {
// But, return early if max_keys was specified (caller will
// handle truncation)
if (caller_handle_truncation) {
list_bucket_result.is_truncated = true;
list_bucket_result.next_continuation_token
= continuation_token.value();
co_return list_bucket_result;
}
continue;
}

Expand Down
20 changes: 17 additions & 3 deletions src/v/cloud_storage/remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,15 +325,29 @@ class remote : public ss::peering_sharded_service<remote> {
/// \param prefix Optional prefix to restrict listing of objects
/// \param delimiter A character to use as a delimiter when grouping list
/// results
/// \param item_filter Optional filter to apply to items before
/// collecting
/// \param max_keys The maximum number of keys to return. If left
/// unspecified, all object keys that fulfill the request will be collected,
/// and the result will not be truncated (truncation not allowed). If
/// specified, it will be up to the user to deal with a possibly-truncated
/// result (using list_result.is_truncated) at the call site, most likely in
/// a while loop. The continuation-token generated by that request will be
/// available through list_result.next_continuation_token for future
/// requests. It is also important to note that the value for max_keys will
/// be capped by the cloud provider default (which may vary between
/// providers, e.g AWS has a limit of 1000 keys per ListObjects request).
/// \param continuation_token The token hopefully passed back to the user
/// from a prior list_objects() request, in the case that they are handling
/// a truncated result manually.
/// \param item_filter Optional filter to apply to items before collecting
ss::future<list_result> list_objects(
const cloud_storage_clients::bucket_name& name,
retry_chain_node& parent,
std::optional<cloud_storage_clients::object_key> prefix = std::nullopt,
std::optional<char> delimiter = std::nullopt,
std::optional<cloud_storage_clients::client::item_filter> item_filter
= std::nullopt);
= std::nullopt,
std::optional<size_t> max_keys = std::nullopt,
std::optional<ss::sstring> continuation_token = std::nullopt);

/// \brief Upload small objects to bucket. Suitable for uploading simple
/// strings, does not check for leadership before upload like the segment
Expand Down
125 changes: 124 additions & 1 deletion src/v/cloud_storage/tests/remote_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,130 @@ FIXTURE_TEST(test_list_bucket, remote_fixture) {
}
}

FIXTURE_TEST(test_list_bucket_with_max_keys, remote_fixture) {
set_expectations_and_listen({});
cloud_storage_clients::bucket_name bucket{"test"};
retry_chain_node fib(never_abort, 10s, 20ms);

const auto s3_imposter_max_keys = s3_imposter_fixture::default_max_keys;
const auto size = s3_imposter_max_keys + 50;
for (int i = 0; i < size; i++) {
cloud_storage_clients::object_key path{fmt::format("{}", i)};
auto result
= remote.local().upload_object(bucket, path, iobuf{}, fib).get();
BOOST_REQUIRE_EQUAL(cloud_storage::upload_result::success, result);
}

{
// Passing max_keys indicates we, as a user, will handle truncation
// results. Here, we know that that size > s3_imposter_max_keys, and the
// result will end up truncated.
auto max_keys = s3_imposter_max_keys;
auto result
= remote.local()
.list_objects(
bucket, fib, std::nullopt, std::nullopt, std::nullopt, max_keys)
.get();
BOOST_REQUIRE(result.has_value());
BOOST_REQUIRE(result.value().is_truncated);
// This continuation token is /54 because objects are sorted
// lexicographically.
BOOST_REQUIRE_EQUAL(result.value().next_continuation_token, "/54");
BOOST_REQUIRE_EQUAL(
result.value().contents.size(), s3_imposter_max_keys);
BOOST_REQUIRE(result.value().common_prefixes.empty());

// Now, we can use the next_continuation_token from the previous,
// truncated result in order to query for the rest of the objects. We
// should expect to get the rest of the objects in "storage", and that
// this request is not truncated.
auto next_result = remote.local()
.list_objects(
bucket,
fib,
std::nullopt,
std::nullopt,
std::nullopt,
max_keys,
result.value().next_continuation_token)
.get();
BOOST_REQUIRE(next_result.has_value());
BOOST_REQUIRE(!next_result.value().is_truncated);
BOOST_REQUIRE_EQUAL(
next_result.value().contents.size(), size - s3_imposter_max_keys);
BOOST_REQUIRE(next_result.value().common_prefixes.empty());
}
{
// On the other hand, passing max_keys as std::nullopt means
// truncation will be handled by the remote API, (all object keys will
// be read in a loop, we should expect no truncation in the return
// value), and the result contents should be full.
auto max_keys = std::nullopt;
auto result
= remote.local()
.list_objects(
bucket, fib, std::nullopt, std::nullopt, std::nullopt, max_keys)
.get();
BOOST_REQUIRE(result.has_value());
BOOST_REQUIRE(!result.value().is_truncated);
BOOST_REQUIRE_EQUAL(result.value().contents.size(), size);
BOOST_REQUIRE(result.value().common_prefixes.empty());
}
{
auto max_keys = 2;
auto result
= remote.local()
.list_objects(
bucket, fib, std::nullopt, std::nullopt, std::nullopt, max_keys)
.get();
BOOST_REQUIRE(result.has_value());
BOOST_REQUIRE(result.value().is_truncated);
// This continuation token is /10 because objects are sorted
// lexicographically.
BOOST_REQUIRE_EQUAL(result.value().next_continuation_token, "/10");
const auto& contents = result.value().contents;
BOOST_REQUIRE_EQUAL(contents.size(), max_keys);
BOOST_REQUIRE_EQUAL(contents[0].key, "0");
BOOST_REQUIRE_EQUAL(contents[1].key, "1");
BOOST_REQUIRE(result.value().common_prefixes.empty());
}
{
// This will also be truncated, since size > s3_imposter_max_keys.
auto max_keys = size;
auto result
= remote.local()
.list_objects(
bucket, fib, std::nullopt, std::nullopt, std::nullopt, max_keys)
.get();
BOOST_REQUIRE(result.has_value());
BOOST_REQUIRE(result.value().is_truncated);
BOOST_REQUIRE_EQUAL(
result.value().contents.size(), s3_imposter_max_keys);
// This continuation token is /54 because objects are sorted
// lexicographically.
BOOST_REQUIRE_EQUAL(result.value().next_continuation_token, "/54");
BOOST_REQUIRE(result.value().common_prefixes.empty());

// Reissue another request with continuation-token. This should capture
// the rest of the object keys, we expect a non-truncated result.
auto next_result = remote.local()
.list_objects(
bucket,
fib,
std::nullopt,
std::nullopt,
std::nullopt,
max_keys,
result.value().next_continuation_token)
.get();
BOOST_REQUIRE(next_result.has_value());
BOOST_REQUIRE(!next_result.value().is_truncated);
BOOST_REQUIRE_EQUAL(
next_result.value().contents.size(), size - s3_imposter_max_keys);
BOOST_REQUIRE(next_result.value().common_prefixes.empty());
}
}

FIXTURE_TEST(test_list_bucket_with_prefix, remote_fixture) {
set_expectations_and_listen({});
cloud_storage_clients::bucket_name bucket{"test"};
Expand Down Expand Up @@ -668,7 +792,6 @@ FIXTURE_TEST(test_list_bucket_with_prefix, remote_fixture) {
BOOST_REQUIRE_EQUAL(request.method, "GET");
BOOST_REQUIRE_EQUAL(request.q_list_type, "2");
BOOST_REQUIRE_EQUAL(request.q_prefix, "x/");
BOOST_REQUIRE_EQUAL(request.h_prefix, "x/");
}

FIXTURE_TEST(test_list_bucket_with_filter, remote_fixture) {
Expand Down
67 changes: 56 additions & 11 deletions src/v/cloud_storage/tests/s3_imposter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,39 @@ uint16_t unit_test_httpd_port_number() { return 4442; }

namespace {

using expectation_map_t
= std::map<ss::sstring, s3_imposter_fixture::expectation>;

// Takes the input map of keys to expectations and returns a stringified XML
// corresponding to the appropriate S3 response.
ss::sstring list_objects_resp(
const std::map<ss::sstring, s3_imposter_fixture::expectation>& objects,
const expectation_map_t& objects,
ss::sstring prefix,
ss::sstring delimiter) {
ss::sstring delimiter,
std::optional<size_t> max_keys_opt,
std::optional<ss::sstring> continuation_token_opt) {
std::map<ss::sstring, size_t> content_key_to_size;
std::set<ss::sstring> common_prefixes;
// Filter by prefix and group by the substring between the prefix and first
// delimiter.
for (const auto& [_, expectation] : objects) {
auto max_keys = max_keys_opt.has_value()
? std::min(
max_keys_opt.value(),
s3_imposter_fixture::default_max_keys)
: s3_imposter_fixture::default_max_keys;
auto it = (continuation_token_opt.has_value())
? objects.find(continuation_token_opt.value())
: objects.begin();
auto end_it = objects.end();
ss::sstring next_continuation_token = "";
for (; it != end_it; ++it) {
const auto& expectation = it->second;

if (content_key_to_size.size() == max_keys) {
next_continuation_token = it->first;
break;
}

auto key = expectation.url;
if (!key.empty() && key[0] == '/') {
// Remove / character that S3 client adds
Expand Down Expand Up @@ -89,6 +111,8 @@ ss::sstring list_objects_resp(
prefix
+ key.substr(prefix.size(), delimiter_pos - prefix.size() + 1));
}

const bool is_truncated = (it != end_it);
// Populate the returned XML.
ss::sstring ret;
ret += fmt::format(
Expand All @@ -97,14 +121,17 @@ ss::sstring list_objects_resp(
<Name>test-bucket</Name>
<Prefix>{}</Prefix>
<KeyCount>{}</KeyCount>
<MaxKeys>1000</MaxKeys>
<MaxKeys>{}</MaxKeys>
<Delimiter>{}</Delimiter>
<IsTruncated>false</IsTruncated>
<NextContinuationToken>next</NextContinuationToken>
<IsTruncated>{}</IsTruncated>
<NextContinuationToken>{}</NextContinuationToken>
)xml",
prefix,
content_key_to_size.size(),
delimiter);
max_keys,
delimiter,
is_truncated,
next_continuation_token);
for (const auto& [key, size] : content_key_to_size) {
ret += fmt::format(
R"xml(
Expand Down Expand Up @@ -241,15 +268,33 @@ void s3_imposter_fixture::set_routes(
if (
fixture._search_on_get_list
&& request.get_query_param("list-type") == "2") {
auto prefix = request.get_header("prefix");
auto delimiter = request.get_header("delimiter");
auto prefix = request.get_query_param("prefix");
auto delimiter = request.get_query_param("delimiter");
auto max_keys_str = request.get_query_param("max-keys");
auto continuation_token_str = request.get_query_param(
"continuation-token");
std::optional<size_t> max_keys = (max_keys_str.empty())
? std::optional<size_t>{}
: std::stoi(
max_keys_str);
std::optional<ss::sstring> continuation_token
= (continuation_token_str.empty())
? std::optional<ss::sstring>{}
: continuation_token_str;
vlog(
fixt_log.trace,
"S3 imposter list request {} - {} - {}",
"S3 imposter list request {} - {} - {} - {} - {}",
prefix,
delimiter,
max_keys,
continuation_token,
request._method);
return list_objects_resp(expectations, prefix, delimiter);
return list_objects_resp(
expectations,
prefix,
delimiter,
max_keys,
continuation_token);
}
auto it = expectations.find(request._url);
if (it == expectations.end() || !it->second.body.has_value()) {
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage/tests/s3_imposter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
/// be retrieved using the GET request or deleted using the DELETE request.
class s3_imposter_fixture {
public:
static constexpr size_t default_max_keys = 100;
uint16_t httpd_port_number();
static constexpr const char* httpd_host_name = "127.0.0.1";

Expand Down

0 comments on commit 4e619eb

Please sign in to comment.