Skip to content

Commit

Permalink
sstables: delete_with_pending_deletion_log: batch sync_directory
Browse files Browse the repository at this point in the history
When deleting multiple sstables with the same prefix
the deletion atomicity is ensured by the pending_delete_log file,
so if scylla crashes in the middle, deletions will be replyed on
restart.

Therefore, we don't have to ensure atomicity of each individual
`unlink`.  We just need to sync the directory once, before
removing the pending_delete_log file.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes #14967
  • Loading branch information
bhalevy authored and avikivity committed Aug 6, 2023
1 parent 6c1e44e commit 6f03754
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 14 deletions.
4 changes: 3 additions & 1 deletion sstables/sstable_directory.cc
Expand Up @@ -527,9 +527,11 @@ future<> sstable_directory::delete_with_pending_deletion_log(std::vector<shared_
}

parallel_for_each(ssts, [] (shared_sstable sst) {
return sst->unlink();
return sst->unlink(sstables::storage::sync_dir::no);
}).get();

sync_directory(first->_storage->prefix()).get();

// Once all sstables are deleted, the log file can be removed.
// Note: the log file will be removed also if unlink failed to remove
// any sstable and ignored the error.
Expand Down
14 changes: 9 additions & 5 deletions sstables/sstables.cc
Expand Up @@ -2609,15 +2609,17 @@ std::optional<std::pair<uint64_t, uint64_t>> sstable::get_sample_indexes_for_ran
return std::nullopt;
}

future<> remove_by_toc_name(sstring sstable_toc_name) {
future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync) {
sstring prefix = sstable_toc_name.substr(0, sstable_toc_name.size() - sstable_version_constants::TOC_SUFFIX.size());
sstring new_toc_name = prefix + sstable_version_constants::TEMPORARY_TOC_SUFFIX;

sstlog.debug("Removing by TOC name: {}", sstable_toc_name);
if (co_await sstable_io_check(sstable_write_error_handler, file_exists, sstable_toc_name)) {
// If new_toc_name exists it will be atomically replaced. See rename(2)
co_await sstable_io_check(sstable_write_error_handler, rename_file, sstable_toc_name, new_toc_name);
co_await sstable_io_check(sstable_write_error_handler, sync_directory, parent_path(new_toc_name));
if (sync) {
co_await sstable_io_check(sstable_write_error_handler, sync_directory, parent_path(new_toc_name));
}
} else {
if (!co_await sstable_io_check(sstable_write_error_handler, file_exists, new_toc_name)) {
sstlog.warn("Unable to delete {} because it doesn't exist.", sstable_toc_name);
Expand Down Expand Up @@ -2653,7 +2655,9 @@ future<> remove_by_toc_name(sstring sstable_toc_name) {
sstlog.debug("Forgiving ENOENT when deleting file {}", fname);
}
});
co_await sstable_io_check(sstable_write_error_handler, sync_directory, parent_path(new_toc_name));
if (sync) {
co_await sstable_io_check(sstable_write_error_handler, sync_directory, parent_path(new_toc_name));
}
co_await sstable_io_check(sstable_write_error_handler, remove_file, new_toc_name);
}

Expand Down Expand Up @@ -2791,8 +2795,8 @@ utils::hashed_key sstable::make_hashed_key(const schema& s, const partition_key&
}

future<>
sstable::unlink() noexcept {
auto remove_fut = _storage->wipe(*this);
sstable::unlink(storage::sync_dir sync) noexcept {
auto remove_fut = _storage->wipe(*this, sync);

try {
co_await get_large_data_handler().maybe_delete_large_data_entries(shared_from_this());
Expand Down
8 changes: 6 additions & 2 deletions sstables/sstables.hh
Expand Up @@ -388,7 +388,9 @@ public:

// Delete the sstable by unlinking all sstable files
// Ignores all errors.
future<> unlink() noexcept;
// Caller may pass sync_dir::no for batching multiple deletes in the same directory,
// and make sure the directory is sync'ed on or after the last call.
future<> unlink(storage::sync_dir sync = storage::sync_dir::yes) noexcept;

db::large_data_handler& get_large_data_handler() {
return _large_data_handler;
Expand Down Expand Up @@ -958,6 +960,8 @@ public:
future<> remove_table_directory_if_has_no_snapshots(fs::path table_dir);

// similar to sstable::unlink, but works on a TOC file name
future<> remove_by_toc_name(sstring sstable_toc_name);
// Caller may pass sync_dir::no for batching multiple deletes in the same directory,
// and make sure the directory is sync'ed on or after the last call.
future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync = storage::sync_dir::yes);

} // namespace sstables
10 changes: 5 additions & 5 deletions sstables/storage.cc
Expand Up @@ -62,7 +62,7 @@ class filesystem_storage final : public sstables::storage {
virtual future<> change_state(const sstable& sst, sstring to, generation_type generation, delayed_commit_changes* delay) override;
// runs in async context
virtual void open(sstable& sst) override;
virtual future<> wipe(const sstable& sst) noexcept override;
virtual future<> wipe(const sstable& sst, sync_dir) noexcept override;
virtual future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) override;
virtual future<data_sink> make_data_or_index_sink(sstable& sst, component_type type) override;
virtual future<data_sink> make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) override;
Expand Down Expand Up @@ -387,7 +387,7 @@ future<> filesystem_storage::change_state(const sstable& sst, sstring to, genera
co_await move(sst, path.native(), std::move(new_generation), delay_commit);
}

future<> filesystem_storage::wipe(const sstable& sst) noexcept {
future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
// We must be able to generate toc_filename()
// in order to delete the sstable.
// Running out of memory here will terminate.
Expand All @@ -397,7 +397,7 @@ future<> filesystem_storage::wipe(const sstable& sst) noexcept {
}();

try {
co_await remove_by_toc_name(name);
co_await remove_by_toc_name(name, sync);
} catch (...) {
// Log and ignore the failure since there is nothing much we can do about it at this point.
// a. Compaction will retry deleting the sstable in the next pass, and
Expand Down Expand Up @@ -446,7 +446,7 @@ class s3_storage : public sstables::storage {
virtual future<> change_state(const sstable& sst, sstring to, generation_type generation, delayed_commit_changes* delay) override;
// runs in async context
virtual void open(sstable& sst) override;
virtual future<> wipe(const sstable& sst) noexcept override;
virtual future<> wipe(const sstable& sst, sync_dir) noexcept override;
virtual future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) override;
virtual future<data_sink> make_data_or_index_sink(sstable& sst, component_type type) override;
virtual future<data_sink> make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) override;
Expand Down Expand Up @@ -518,7 +518,7 @@ future<> s3_storage::change_state(const sstable& sst, sstring to, generation_typ
co_await coroutine::return_exception(std::runtime_error("Moving S3 objects not implemented"));
}

future<> s3_storage::wipe(const sstable& sst) noexcept {
future<> s3_storage::wipe(const sstable& sst, sync_dir) noexcept {
auto& sys_ks = sst.manager().system_keyspace();

co_await sys_ks.sstables_registry_update_entry_status(_location, sst.generation(), status_removing);
Expand Down
3 changes: 2 additions & 1 deletion sstables/storage.hh
Expand Up @@ -48,13 +48,14 @@ public:
virtual ~storage() {}

using absolute_path = bool_class<class absolute_path_tag>; // FIXME -- should go away eventually
using sync_dir = bool_class<struct sync_dir_tag>; // meaningful only to filesystem storage

virtual future<> seal(const sstable& sst) = 0;
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs) const = 0;
virtual future<> change_state(const sstable& sst, sstring to, generation_type generation, delayed_commit_changes* delay) = 0;
// runs in async context
virtual void open(sstable& sst) = 0;
virtual future<> wipe(const sstable& sst) noexcept = 0;
virtual future<> wipe(const sstable& sst, sync_dir) noexcept = 0;
virtual future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) = 0;
virtual future<data_sink> make_data_or_index_sink(sstable& sst, component_type type) = 0;
virtual future<data_sink> make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) = 0;
Expand Down

0 comments on commit 6f03754

Please sign in to comment.