-
Notifications
You must be signed in to change notification settings - Fork 552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clear topics orphan files #8396
Conversation
4ff1f95
to
850b0cb
Compare
src/v/cluster/controller_backend.cc
Outdated
} | ||
} | ||
} | ||
auto last_applied_revision = _topics.local().last_applied_revision(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
last_applied_revision is updated as topic table applies the deltas. So by the time this cleanup runs, it may not have applied all the deltas, so we potentially may be operating on a not-yet-up-to-date snapshot of the topics, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar question: what are the guarantees that a node has the correct information it needs to make this orphan file removal decision independently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After controller backend bootstrap we must have reconcilied all messages in controller_log
and topic_table
deltas that were on node before restart.
Orphan files might be left only for topics that existed before we have restarted node. So all deltas for topics that may have orphan files must be reconcilied at this point.
If we running deletion of topic files while node is running we will retry it until success. So we cannot get orphan files in runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Orphan files might be left only for topics that existed before we have restarted node. So all deltas for topics that may have orphan files must be reconcilied at this point.
Ah yes, last_applied_revision is guaranteed to be atleast the dirty offset of the controller log at bootstrap.
src/v/storage/log_manager.cc
Outdated
ss::sstring topic_directory_path, | ||
model::topic_namespace nt, | ||
const log_manager::ntp_to_revision& topics_on_node, | ||
model::revision_id& last_applied_revision) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: revision_id is trivially copyable, pass by value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: revision_id is trivially copyable, pass by value.
agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/v/storage/log_manager.cc
Outdated
model::revision_id& last_applied_revision) { | ||
return directory_walker::walk( | ||
topic_directory_path, | ||
[&topics_on_node, topic_directory_path, nt, &last_applied_revision]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: pass revision by value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
co_return; | ||
} | ||
|
||
for (const auto& ns : namespaces) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think this loop may affect startup time on nodes with a lot of topics * partitions. We are recursing through every ns/topic/partition on the node. wondering if this can work in the background.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/v/cluster/controller_backend.cc
Outdated
@@ -473,6 +476,43 @@ ss::future<> controller_backend::fetch_deltas() { | |||
}); | |||
} | |||
|
|||
ss::future<> controller_backend::clear_orphan_topic_files() { | |||
if (ss::this_shard_id() != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a shard alias you can use instead of hard coding 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't found any alias for that.
It seems like common approach for such things
src/v/cluster/controller_backend.cc
Outdated
auto topics = _topics.local().all_topics(); | ||
storage::log_manager::ntp_to_revision topics_on_node; | ||
// Init with default namespace to clean if there is no topics | ||
absl::flat_hash_set<model::ns> namespaces = {{model::ns("kafka")}}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use existing pre-defined constant for namespace name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/v/cluster/controller_backend.cc
Outdated
} | ||
} | ||
} | ||
auto last_applied_revision = _topics.local().last_applied_revision(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar question: what are the guarantees that a node has the correct information it needs to make this orphan file removal decision independently?
src/v/storage/log_manager.cc
Outdated
ss::sstring topic_directory_path, | ||
model::topic_namespace nt, | ||
const log_manager::ntp_to_revision& topics_on_node, | ||
model::revision_id& last_applied_revision) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: revision_id is trivially copyable, pass by value.
agree
d63ee17
to
f0ecfcb
Compare
Ci failure #8383 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, just a clarifying question and some nits.
src/v/cluster/controller_backend.cc
Outdated
@@ -297,6 +298,10 @@ void controller_backend::setup_metrics() { | |||
ss::future<> controller_backend::start() { | |||
setup_metrics(); | |||
return bootstrap_controller_backend().then([this] { | |||
if (ss::this_shard_id() == ss::shard_id{0}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think Noah meant cluster::controller_stm_shard instead of 0
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/v/cluster/controller_backend.cc
Outdated
} | ||
} | ||
} | ||
auto last_applied_revision = _topics.local().last_applied_revision(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Orphan files might be left only for topics that existed before we have restarted node. So all deltas for topics that may have orphan files must be reconcilied at this point.
Ah yes, last_applied_revision is guaranteed to be atleast the dirty offset of the controller log at bootstrap.
src/v/cluster/controller_backend.cc
Outdated
for (const auto& t : topics) { | ||
namespaces.emplace(t.ns); | ||
auto meta = _topics.local().get_topic_assignments(t); | ||
if (meta) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can add a check if (_gate.closed()) break; for each topic iteration to abort early if needed.
Also can you inverse the check to reduce indents
if (!meta) {
continue;
}
src/v/cluster/controller_backend.cc
Outdated
@@ -473,6 +478,40 @@ ss::future<> controller_backend::fetch_deltas() { | |||
}); | |||
} | |||
|
|||
ss::future<> controller_backend::clear_orphan_topic_files() { | |||
auto topics = _topics.local().all_topics(); | |||
storage::log_manager::ntp_to_revision topics_on_node; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
q: This is a map of ntp -> revision of all the ntps in the system, can be potentially big. What is the use of constructing this up front? When recursing through the topic/partition directories can't we just lookup the topic table? Something like..
ntp foo = partition_path::parse_partition_directory()
if (foo.revision_id < last_applied_id_snapshot && !topic_table.contains() && !topic_table.previous_replicas.contains()) {
cleanup();
}
Just trying to understand the use of creating a snapshot of all the ntps in the system.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously I was thinking about creating this map to get snapshot of topictable right after bootstrap.
But I agree that we don't need it now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on using the topic table directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
f0ecfcb
to
1e9e4fc
Compare
Rebase dev |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Largely makes sense to me. The snapshotting of the topic table and the begging of the operation makes me a bit uneasy.
Also, I found myself wondering if this couldn't be done while replaying the controller log. Perhaps we could write a new message type to the controller log (something like delete_complete
) once deletion is complete. When looking for the starting point for the replay we could look for that instead of the deletion. If we don't find delete_complete
it means we might have left around orphaned files.
src/v/cluster/controller_backend.cc
Outdated
ssx::spawn_with_gate( | ||
_gate, [this] { return clear_orphan_topic_files(); }); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clear_topic_orphan_files
can throw std::filesystem::filesystem_error
. It would be nice to log something in that case. Also, if I recall correctly, seastar doesn't like background failed futures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/v/cluster/controller_backend.cc
Outdated
@@ -473,6 +478,40 @@ ss::future<> controller_backend::fetch_deltas() { | |||
}); | |||
} | |||
|
|||
ss::future<> controller_backend::clear_orphan_topic_files() { | |||
auto topics = _topics.local().all_topics(); | |||
storage::log_manager::ntp_to_revision topics_on_node; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on using the topic table directly.
We have discussed this multiple times with Michal and John.
|
Thanks for the context. The reasoning makes sense to me. I think this approach is fine. |
0befbee
to
a149c60
Compare
src/v/storage/CMakeLists.txt
Outdated
@@ -47,6 +47,7 @@ v_cc_library( | |||
v::compression | |||
v::rprandom | |||
v::resource_mgmt | |||
v::cluster |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think we should make storage depend on cluster, that creates a circular dependency.
I believe you added it to pass _topic_table into storage. To avoid this, how about making the function definition into
ss::future<> log_manager::remove_orphan_files(
ss::sstring data_directory_path,
absl::flat_hash_set<model::ns> namespaces,
ss::noncopyable_function<bool(model::ntp, partition_path::metadata)> orphan_filter
)
and then pass the filter as lambda from controller backend that captures all the needed context, something like.. (need to move some code around)
ss::future<> controller_backend::clear_orphan_topic_files(
model::revision_id bootstrap_revision) {
// Init with default namespace to clean if there is no topics
absl::flat_hash_set<model::ns> namespaces = {{model::kafka_namespace}};
for (const auto& t : _topics.local().all_topics()) {
namespaces.emplace(t.ns);
}
return _storage.local().log_mgr().remove_orphan_files(
_data_directory,
std::move(namespaces),
[&, bootstrap_revision](
model::ntp ntp, storage::partition_path::metadata p) {
return topic_files_are_orphan(
ntp, p, _topics, bootstrap_revision, _self);
});
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea!
Thank you
a149c60
to
fa5427c
Compare
/ci-repeat 3 |
fa5427c
to
0902a5d
Compare
/ci-repeat 5 |
@ZeDRoman Can you confirm if the following failures are related/unrelated to this patch? (check debug builds in the last repeat run, failed more than once). Release build failures are #8679
|
/ci-repeat 5 |
I have checked the logs. It doesn't seem that tests are related to changes in this pr. |
0902a5d
to
a3901c4
Compare
When redpanda is restarted while delete operation is not finish Partition files might be left on disk. We need to cleanup orphan partition files after bootstrap.
a3901c4
to
b044630
Compare
/ci-repeat 5 |
When node is restarted while it was executing partition delete operation, this operation may be not finished and we will have orphan files for that partition left on device.
On node restart we don't have that partition data in memory so we couldn't retry partition delete operation and won't clean up partition files on reconciliation.
This pr bring garbage collector mechanism that will force delete partition files after node bootstrap.
We gather all existing ntps and then go through all data directories and check if topic directory responsible for any exisitng ntp if it don't we delete it.
Fixes #7895
Backports Required
UX Changes
Release Notes
Bug Fixes