Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.2.x] cloud_storage: Make wait for hydration abortable #15732

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 82 additions & 35 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,12 @@ remote_segment::offset_data_stream(
kafka::offset start,
kafka::offset end,
std::optional<model::timestamp> first_timestamp,
ss::io_priority_class io_priority) {
ss::io_priority_class io_priority,
storage::opt_abort_source_t as) {
vlog(_ctxlog.debug, "remote segment file input stream at offset {}", start);
ss::gate::holder g(_gate);
co_await hydrate();

co_await hydrate(as);

std::optional<offset_index::find_result> indexed_pos;
std::optional<uint16_t> prefetch_override = std::nullopt;
Expand Down Expand Up @@ -891,7 +893,73 @@ ss::future<> remote_segment::run_hydrate_bg() {
_hydration_loop_running = false;
}

ss::future<> remote_segment::hydrate() {
namespace {

void log_hydration_abort_cause(
const retry_chain_logger& logger,
const ss::lowres_clock::time_point& deadline,
storage::opt_abort_source_t as) {
if (ss::lowres_clock::now() > deadline) {
vlog(logger.warn, "timed out while waiting for hydration");
} else if (as.has_value() && as->get().abort_requested()) {
// TODO it might be useful to be able to log the client info here from
// log reader config.
vlog(logger.debug, "consumer disconnected during hydration");
}
}

} // namespace

ss::future<> remote_segment::do_hydrate(
ss::abort_source& as, ss::lowres_clock::time_point deadline) {
ss::promise<ss::file> p;
auto fut = ssx::with_timeout_abortable(p.get_future(), deadline, as);

_wait_list.push_back(std::move(p), ss::lowres_clock::time_point::max());
_bg_cvar.signal();

return fut
.handle_exception_type(
[this, deadline, &as](const ss::timed_out_error& ex) {
log_hydration_abort_cause(_ctxlog, deadline, as);
return ss::make_exception_future<ss::file>(ex);
})
.handle_exception_type(
[this, deadline, &as](const ss::abort_requested_exception& ex) {
log_hydration_abort_cause(_ctxlog, deadline, as);
return ss::make_exception_future<ss::file>(ex);
})
.handle_exception_type(
[this, deadline, &as](const download_exception& ex) {
// If we are working with an index-only format, and index
// download failed, we may not be able to progress. So we
// fallback to old format where the full segment was downloaded,
// and try to hydrate again.
if (ex.path == _index_path && !_fallback_mode) {
vlog(
_ctxlog.info,
"failed to download index with error [{}], switching to "
"fallback mode and retrying hydration.",
ex);
_fallback_mode = fallback_mode::yes;
return do_hydrate(as, deadline).then([] {
// This is an empty file to match the type returned by
// `fut`. The result is discarded immediately so it is
// unused.
return ss::file{};
});
}

// If the download failure was something other than the index,
// OR if we are in the fallback mode already or if we are
// working with old format, rethrow the exception and let the
// upper layer handle it.
return ss::make_exception_future<ss::file>(ex);
})
.discard_result();
}

ss::future<> remote_segment::hydrate(storage::opt_abort_source_t as) {
if (!_hydration_loop_running) {
vlog(
_ctxlog.error,
Expand All @@ -905,37 +973,15 @@ ss::future<> remote_segment::hydrate() {

gate_guard g{_gate};
vlog(_ctxlog.debug, "segment {} hydration requested", _path);
ss::promise<ss::file> p;
auto fut = p.get_future();
_wait_list.push_back(std::move(p), ss::lowres_clock::time_point::max());
_bg_cvar.signal();
return fut
.handle_exception_type([this](const download_exception& ex) {
// If we are working with an index-only format, and index download
// failed, we may not be able to progress. So we fallback to old
// format where the full segment was downloaded, and try to hydrate
// again.
if (ex.path == _index_path && !_fallback_mode) {
vlog(
_ctxlog.info,
"failed to download index with error [{}], switching to "
"fallback mode and retrying hydration.",
ex);
_fallback_mode = fallback_mode::yes;
return hydrate().then([] {
// This is an empty file to match the type returned by `fut`.
// The result is discarded immediately so it is unused.
return ss::file{};
});
}

// If the download failure was something other than the index, OR
// if we are in the fallback mode already or if we are working
// with old format, rethrow the exception and let the upper layer
// handle it.
throw;
})
.discard_result();

// A no-op abort source is created on heap so that `do_hydrate` can call
// `with_timeout_abortable` if we do not have an input abort source.
return ss::do_with(ss::abort_source{}, [this, as](ss::abort_source& noop) {
return do_hydrate(
as.value_or(noop),
ss::lowres_clock::now()
+ config::shard_local_cfg().cloud_storage_hydration_timeout_ms());
});
}

ss::future<> remote_segment::hydrate_chunk(segment_chunk_range range) {
Expand Down Expand Up @@ -1385,7 +1431,8 @@ remote_segment_batch_reader::init_parser() {
model::offset_cast(_config.start_offset),
model::offset_cast(_config.max_offset),
_config.first_timestamp,
priority_manager::local().shadow_indexing_priority());
priority_manager::local().shadow_indexing_priority(),
_config.abort_source);

vlog(
_ctxlog.debug,
Expand Down
13 changes: 11 additions & 2 deletions src/v/cloud_storage/remote_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <seastar/core/condition-variable.hh>
#include <seastar/core/expiring_fifo.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/shared_future.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/temporary_buffer.hh>

Expand Down Expand Up @@ -124,14 +125,22 @@ class remote_segment final {
kafka::offset start,
kafka::offset end,
std::optional<model::timestamp>,
ss::io_priority_class);
ss::io_priority_class,
storage::opt_abort_source_t as);

/// Hydrates the segment, index or tx-range depending on segment meta
/// version, returning a future that the caller can use to wait for the
/// download to finish. If the abort source is triggered or the deadline is
/// reached before the download finishes, the wait is aborted. The download
/// will still complete but the caller will need to handle the exception.
ss::future<> do_hydrate(ss::abort_source&, ss::lowres_clock::time_point);

/// Hydrate the segment for segment meta version v2 or lower. For v3 or
/// higher, only hydrate the index. If the index hydration fails, fall back
/// to old mode where the full segment is hydrated. For v3 or higher
/// versions, the actual segment data is hydrated by the data source
/// implementation, but the index is still required to be present first.
ss::future<> hydrate();
ss::future<> hydrate(storage::opt_abort_source_t as = std::nullopt);

/// Hydrate a part of a segment, identified by the given range. The range
/// can contain data for multiple contiguous chunks, in which case multiple
Expand Down
81 changes: 79 additions & 2 deletions src/v/cloud_storage/tests/remote_segment_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "storage/types.h"
#include "test_utils/async.h"
#include "test_utils/fixture.h"
#include "test_utils/scoped_config.h"
#include "utils/retry_chain_node.h"

#include <seastar/core/future.hh>
Expand Down Expand Up @@ -580,6 +581,7 @@ FIXTURE_TEST(test_remote_segment_chunk_read, cloud_storage_fixture) {
probe,
ts_probe);

ss::abort_source as{};
// The offset data stream uses an implementation which will iterate over all
// chunks in the segment.
auto stream = segment
Expand All @@ -589,7 +591,8 @@ FIXTURE_TEST(test_remote_segment_chunk_read, cloud_storage_fixture) {
// over the entire segment in chunks.
kafka::offset{100000000},
std::nullopt,
ss::default_priority_class())
ss::default_priority_class(),
as)
.get()
.stream;

Expand Down Expand Up @@ -644,12 +647,14 @@ FIXTURE_TEST(test_remote_segment_chunk_read_fallback, cloud_storage_fixture) {
probe,
ts_probe);

ss::abort_source as;
auto stream = segment
.offset_data_stream(
m.get(key)->base_kafka_offset(),
kafka::offset{100000000},
std::nullopt,
ss::default_priority_class())
ss::default_priority_class(),
as)
.get()
.stream;

Expand Down Expand Up @@ -1151,3 +1156,75 @@ FIXTURE_TEST(test_chunk_prefetch, cloud_storage_fixture) {
BOOST_REQUIRE(prefetched_chunk.handle.has_value());
}
}

FIXTURE_TEST(test_abort_hydration_timeout, cloud_storage_fixture) {
scoped_config reset;
reset.get("cloud_storage_hydration_timeout_ms").set_value(0ms);

auto key = model::offset(1);
retry_chain_node fib(never_abort, 300s, 200ms);
iobuf segment_bytes = generate_segment(model::offset(1), 300);

auto m = chunk_read_baseline(*this, key, fib, segment_bytes.copy());
auto meta = *m.get(key);
partition_probe probe(manifest_ntp);
auto& ts_probe = api.local().materialized().get_read_path_probe();
remote_segment segment(
api.local(),
cache.local(),
bucket,
m.generate_segment_path(meta),
m.get_ntp(),
meta,
fib,
probe,
ts_probe);

auto stop_segment = ss::defer([&segment] { segment.stop().get(); });
ss::abort_source as;
BOOST_REQUIRE_THROW(
segment
.offset_data_stream(
m.get(key)->base_kafka_offset(),
kafka::offset{100000000},
std::nullopt,
ss::default_priority_class(),
as)
.get(),
ss::timed_out_error);
}

FIXTURE_TEST(test_abort_hydration_triggered_externally, cloud_storage_fixture) {
auto key = model::offset(1);
retry_chain_node fib(never_abort, 300s, 200ms);
iobuf segment_bytes = generate_segment(model::offset(1), 300);

auto m = chunk_read_baseline(*this, key, fib, segment_bytes.copy());
auto meta = *m.get(key);
partition_probe probe(manifest_ntp);
auto& ts_probe = api.local().materialized().get_read_path_probe();
remote_segment segment(
api.local(),
cache.local(),
bucket,
m.generate_segment_path(meta),
m.get_ntp(),
meta,
fib,
probe,
ts_probe);

auto stop_segment = ss::defer([&segment] { segment.stop().get(); });
ss::abort_source as;
as.request_abort();
BOOST_REQUIRE_THROW(
segment
.offset_data_stream(
m.get(key)->base_kafka_offset(),
kafka::offset{100000000},
std::nullopt,
ss::default_priority_class(),
as)
.get(),
ss::abort_requested_exception);
}
8 changes: 8 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,14 @@ configuration::configuration()
"option should be disabled.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
true)
, cloud_storage_hydration_timeout_ms(
*this,
"cloud_storage_hydration_timeout_ms",
"Duration to wait for a hydration request to be fulfilled, if hydration "
"is not completed within this time, the consumer will be notified with a "
"timeout error.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
600s)
, cloud_storage_azure_storage_account(
*this,
"cloud_storage_azure_storage_account",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ struct configuration final : public config_store {
cloud_storage_topic_purge_grace_period_ms;
property<bool> cloud_storage_disable_upload_consistency_checks;
property<bool> cloud_storage_disable_metadata_consistency_checks;
property<std::chrono::milliseconds> cloud_storage_hydration_timeout_ms;

// Azure Blob Storage
property<std::optional<ss::sstring>> cloud_storage_azure_storage_account;
Expand Down
Loading
Loading