Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.3.x] cloud_storage: correct list_object() request headers and parameters (manual backport) #18447

23 changes: 19 additions & 4 deletions src/v/cloud_storage/remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,9 @@ ss::future<remote::list_result> remote::list_objects(
retry_chain_node& parent,
std::optional<cloud_storage_clients::object_key> prefix,
std::optional<char> delimiter,
std::optional<cloud_storage_clients::client::item_filter> item_filter) {
std::optional<cloud_storage_clients::client::item_filter> item_filter,
std::optional<size_t> max_keys,
std::optional<ss::sstring> continuation_token) {
ss::gate::holder gh{_gate};
retry_chain_node fib(&parent);
retry_chain_logger ctxlog(cst_log, fib);
Expand All @@ -1281,18 +1283,23 @@ ss::future<remote::list_result> remote::list_objects(
std::optional<list_result> result;

bool items_remaining = true;
std::optional<ss::sstring> continuation_token = std::nullopt;

// Gathers the items from a series of successful ListObjectsV2 calls
cloud_storage_clients::client::list_bucket_result list_bucket_result;

// Keep iterating until the ListObjectsV2 calls has more items to return
const auto caller_handle_truncation = max_keys.has_value();

if (caller_handle_truncation) {
vassert(max_keys.value() > 0, "Max keys must be greater than 0.");
}

// Keep iterating while the ListObjectsV2 calls has more items to return
while (!_gate.is_closed() && permit.is_allowed && !result) {
auto res = co_await lease.client->list_objects(
bucket,
prefix,
std::nullopt,
std::nullopt,
max_keys,
continuation_token,
fib.get_timeout(),
delimiter,
Expand Down Expand Up @@ -1325,6 +1332,14 @@ ss::future<remote::list_result> remote::list_objects(

// Continue to list the remaining items
if (items_remaining) {
// But, return early if max_keys was specified (caller will
// handle truncation)
if (caller_handle_truncation) {
list_bucket_result.is_truncated = true;
list_bucket_result.next_continuation_token
= continuation_token.value();
co_return list_bucket_result;
}
continue;
}

Expand Down
20 changes: 17 additions & 3 deletions src/v/cloud_storage/remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,29 @@ class remote : public ss::peering_sharded_service<remote> {
/// \param prefix Optional prefix to restrict listing of objects
/// \param delimiter A character to use as a delimiter when grouping list
/// results
/// \param item_filter Optional filter to apply to items before
/// collecting
/// \param max_keys The maximum number of keys to return. If left
/// unspecified, all object keys that fulfill the request will be collected,
/// and the result will not be truncated (truncation not allowed). If
/// specified, it will be up to the user to deal with a possibly-truncated
/// result (using list_result.is_truncated) at the call site, most likely in
/// a while loop. The continuation-token generated by that request will be
/// available through list_result.next_continuation_token for future
/// requests. It is also important to note that the value for max_keys will
/// be capped by the cloud provider default (which may vary between
/// providers, e.g AWS has a limit of 1000 keys per ListObjects request).
/// \param continuation_token The token hopefully passed back to the user
/// from a prior list_objects() request, in the case that they are handling
/// a truncated result manually.
/// \param item_filter Optional filter to apply to items before collecting
ss::future<list_result> list_objects(
const cloud_storage_clients::bucket_name& name,
retry_chain_node& parent,
std::optional<cloud_storage_clients::object_key> prefix = std::nullopt,
std::optional<char> delimiter = std::nullopt,
std::optional<cloud_storage_clients::client::item_filter> item_filter
= std::nullopt);
= std::nullopt,
std::optional<size_t> max_keys = std::nullopt,
std::optional<ss::sstring> continuation_token = std::nullopt);

/// \brief Upload small objects to bucket. Suitable for uploading simple
/// strings, does not check for leadership before upload like the segment
Expand Down
125 changes: 124 additions & 1 deletion src/v/cloud_storage/tests/remote_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,130 @@ FIXTURE_TEST(test_list_bucket, remote_fixture) {
}
}

FIXTURE_TEST(test_list_bucket_with_max_keys, remote_fixture) {
set_expectations_and_listen({});
cloud_storage_clients::bucket_name bucket{"test"};
retry_chain_node fib(never_abort, 10s, 20ms);

const auto s3_imposter_max_keys = s3_imposter_fixture::default_max_keys;
const auto size = s3_imposter_max_keys + 50;
for (int i = 0; i < size; i++) {
cloud_storage_clients::object_key path{fmt::format("{}", i)};
auto result
= remote.local().upload_object(bucket, path, iobuf{}, fib).get();
BOOST_REQUIRE_EQUAL(cloud_storage::upload_result::success, result);
}

{
// Passing max_keys indicates we, as a user, will handle truncation
// results. Here, we know that that size > s3_imposter_max_keys, and the
// result will end up truncated.
auto max_keys = s3_imposter_max_keys;
auto result
= remote.local()
.list_objects(
bucket, fib, std::nullopt, std::nullopt, std::nullopt, max_keys)
.get();
BOOST_REQUIRE(result.has_value());
BOOST_REQUIRE(result.value().is_truncated);
// This continuation token is /54 because objects are sorted
// lexicographically.
BOOST_REQUIRE_EQUAL(result.value().next_continuation_token, "/54");
BOOST_REQUIRE_EQUAL(
result.value().contents.size(), s3_imposter_max_keys);
BOOST_REQUIRE(result.value().common_prefixes.empty());

// Now, we can use the next_continuation_token from the previous,
// truncated result in order to query for the rest of the objects. We
// should expect to get the rest of the objects in "storage", and that
// this request is not truncated.
auto next_result = remote.local()
.list_objects(
bucket,
fib,
std::nullopt,
std::nullopt,
std::nullopt,
max_keys,
result.value().next_continuation_token)
.get();
BOOST_REQUIRE(next_result.has_value());
BOOST_REQUIRE(!next_result.value().is_truncated);
BOOST_REQUIRE_EQUAL(
next_result.value().contents.size(), size - s3_imposter_max_keys);
BOOST_REQUIRE(next_result.value().common_prefixes.empty());
}
{
// On the other hand, passing max_keys as std::nullopt means
// truncation will be handled by the remote API, (all object keys will
// be read in a loop, we should expect no truncation in the return
// value), and the result contents should be full.
auto max_keys = std::nullopt;
auto result
= remote.local()
.list_objects(
bucket, fib, std::nullopt, std::nullopt, std::nullopt, max_keys)
.get();
BOOST_REQUIRE(result.has_value());
BOOST_REQUIRE(!result.value().is_truncated);
BOOST_REQUIRE_EQUAL(result.value().contents.size(), size);
BOOST_REQUIRE(result.value().common_prefixes.empty());
}
{
auto max_keys = 2;
auto result
= remote.local()
.list_objects(
bucket, fib, std::nullopt, std::nullopt, std::nullopt, max_keys)
.get();
BOOST_REQUIRE(result.has_value());
BOOST_REQUIRE(result.value().is_truncated);
// This continuation token is /10 because objects are sorted
// lexicographically.
BOOST_REQUIRE_EQUAL(result.value().next_continuation_token, "/10");
const auto& contents = result.value().contents;
BOOST_REQUIRE_EQUAL(contents.size(), max_keys);
BOOST_REQUIRE_EQUAL(contents[0].key, "0");
BOOST_REQUIRE_EQUAL(contents[1].key, "1");
BOOST_REQUIRE(result.value().common_prefixes.empty());
}
{
// This will also be truncated, since size > s3_imposter_max_keys.
auto max_keys = size;
auto result
= remote.local()
.list_objects(
bucket, fib, std::nullopt, std::nullopt, std::nullopt, max_keys)
.get();
BOOST_REQUIRE(result.has_value());
BOOST_REQUIRE(result.value().is_truncated);
BOOST_REQUIRE_EQUAL(
result.value().contents.size(), s3_imposter_max_keys);
// This continuation token is /54 because objects are sorted
// lexicographically.
BOOST_REQUIRE_EQUAL(result.value().next_continuation_token, "/54");
BOOST_REQUIRE(result.value().common_prefixes.empty());

// Reissue another request with continuation-token. This should capture
// the rest of the object keys, we expect a non-truncated result.
auto next_result = remote.local()
.list_objects(
bucket,
fib,
std::nullopt,
std::nullopt,
std::nullopt,
max_keys,
result.value().next_continuation_token)
.get();
BOOST_REQUIRE(next_result.has_value());
BOOST_REQUIRE(!next_result.value().is_truncated);
BOOST_REQUIRE_EQUAL(
next_result.value().contents.size(), size - s3_imposter_max_keys);
BOOST_REQUIRE(next_result.value().common_prefixes.empty());
}
}

FIXTURE_TEST(test_list_bucket_with_prefix, remote_fixture) {
set_expectations_and_listen({});
cloud_storage_clients::bucket_name bucket{"test"};
Expand Down Expand Up @@ -668,7 +792,6 @@ FIXTURE_TEST(test_list_bucket_with_prefix, remote_fixture) {
BOOST_REQUIRE_EQUAL(request.method, "GET");
BOOST_REQUIRE_EQUAL(request.q_list_type, "2");
BOOST_REQUIRE_EQUAL(request.q_prefix, "x/");
BOOST_REQUIRE_EQUAL(request.h_prefix, "x/");
}

FIXTURE_TEST(test_list_bucket_with_filter, remote_fixture) {
Expand Down
68 changes: 56 additions & 12 deletions src/v/cloud_storage/tests/s3_imposter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,39 @@ uint16_t unit_test_httpd_port_number() { return 4442; }

namespace {

using expectation_map_t
= std::map<ss::sstring, s3_imposter_fixture::expectation>;

// Takes the input map of keys to expectations and returns a stringified XML
// corresponding to the appropriate S3 response.
ss::sstring list_objects_resp(
const std::map<ss::sstring, s3_imposter_fixture::expectation>& objects,
const expectation_map_t& objects,
ss::sstring prefix,
ss::sstring delimiter) {
ss::sstring delimiter,
std::optional<size_t> max_keys_opt,
std::optional<ss::sstring> continuation_token_opt) {
std::map<ss::sstring, size_t> content_key_to_size;
std::set<ss::sstring> common_prefixes;
// Filter by prefix and group by the substring between the prefix and first
// delimiter.
for (const auto& [_, expectation] : objects) {
auto max_keys = max_keys_opt.has_value()
? std::min(
max_keys_opt.value(),
s3_imposter_fixture::default_max_keys)
: s3_imposter_fixture::default_max_keys;
auto it = (continuation_token_opt.has_value())
? objects.find(continuation_token_opt.value())
: objects.begin();
auto end_it = objects.end();
ss::sstring next_continuation_token = "";
for (; it != end_it; ++it) {
const auto& expectation = it->second;

if (content_key_to_size.size() == max_keys) {
next_continuation_token = it->first;
break;
}

auto key = expectation.url;
if (!key.empty() && key[0] == '/') {
// Remove / character that S3 client adds
Expand Down Expand Up @@ -89,6 +111,8 @@ ss::sstring list_objects_resp(
prefix
+ key.substr(prefix.size(), delimiter_pos - prefix.size() + 1));
}

const bool is_truncated = (it != end_it);
// Populate the returned XML.
ss::sstring ret;
ret += fmt::format(
Expand All @@ -97,14 +121,17 @@ ss::sstring list_objects_resp(
<Name>test-bucket</Name>
<Prefix>{}</Prefix>
<KeyCount>{}</KeyCount>
<MaxKeys>1000</MaxKeys>
<MaxKeys>{}</MaxKeys>
<Delimiter>{}</Delimiter>
<IsTruncated>false</IsTruncated>
<NextContinuationToken>next</NextContinuationToken>
<IsTruncated>{}</IsTruncated>
<NextContinuationToken>{}</NextContinuationToken>
)xml",
prefix,
content_key_to_size.size(),
delimiter);
max_keys,
delimiter,
is_truncated,
next_continuation_token);
for (const auto& [key, size] : content_key_to_size) {
ret += fmt::format(
R"xml(
Expand Down Expand Up @@ -212,15 +239,32 @@ struct s3_imposter_fixture::content_handler {
if (
fixture._search_on_get_list
&& request.get_query_param("list-type") == "2") {
auto prefix = request.get_header("prefix");
auto delimiter = request.get_header("delimiter");
auto prefix = request.get_query_param("prefix");
auto delimiter = request.get_query_param("delimiter");
auto max_keys_str = request.get_query_param("max-keys");
auto continuation_token_str = request.get_query_param(
"continuation-token");
std::optional<size_t> max_keys = (max_keys_str.empty())
? std::optional<size_t>{}
: std::stoi(max_keys_str);
std::optional<ss::sstring> continuation_token
= (continuation_token_str.empty())
? std::optional<ss::sstring>{}
: continuation_token_str;
vlog(
fixt_log.trace,
"S3 imposter list request {} - {} - {}",
"S3 imposter list request {} - {} - {} - {} - {}",
prefix,
delimiter,
max_keys,
continuation_token,
request._method);
return list_objects_resp(expectations, prefix, delimiter);
return list_objects_resp(
expectations,
prefix,
delimiter,
max_keys,
continuation_token);
}
if (
expect_iter == expectations.end()
Expand Down Expand Up @@ -319,7 +363,7 @@ struct s3_imposter_fixture::content_handler {
RPTEST_FAIL("Unexpected request");
return "";
}
std::map<ss::sstring, s3_imposter_fixture::expectation> expectations;
expectation_map_t expectations;
s3_imposter_fixture& fixture;
std::optional<absl::flat_hash_set<ss::sstring>> headers = std::nullopt;
};
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage/tests/s3_imposter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
/// be retrieved using the GET request or deleted using the DELETE request.
class s3_imposter_fixture {
public:
static constexpr size_t default_max_keys = 100;
uint16_t httpd_port_number();
static constexpr const char* httpd_host_name = "127.0.0.1";

Expand Down
Loading
Loading