diff --git a/CHANGELOG.md b/CHANGELOG.md index d7fa36526035..207bbdabd6d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Added `files_cleanup.max_committed_ledger_chunks` configuration option to limit the number of committed ledger chunk files retained in the main ledger directory. When the number of committed chunks exceeds this value, the oldest chunks (by sequence number) are automatically deleted, but only after verifying that an identical copy (by SHA-256 digest) exists in at least one `ledger.read_only_directories` entry. Committed ledger chunks that contain entries at or beyond the sequence number of the newest committed snapshot are never deleted, ensuring a complete ledger history from that snapshot for disaster recovery. At least one read-only ledger directory must be configured; the node will refuse to start otherwise. - Added `files_cleanup.max_snapshots` configuration option to limit the number of committed snapshot files retained on disk. When the number of committed snapshots exceeds this value, the oldest snapshots (by sequence number) are automatically deleted. The value must be at least 1 if set. - Added `files_cleanup.interval` configuration option (default `"30s"`) to periodically scan the snapshot directory and delete old committed snapshots exceeding `max_snapshots`. This ensures backup nodes (which receive snapshots via `backup_fetch`) also prune old snapshots. Only effective when `max_snapshots` is set. - Added `POST /node/snapshot:create`, gated by the `SnapshotCreate` RPC interface operator feature, to create a snapshot via an operator endpoint rather than a governance action. diff --git a/CMakeLists.txt b/CMakeLists.txt index 0a8efde959b4..f87ea9aafb63 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -643,6 +643,11 @@ if(BUILD_TESTS) ${CMAKE_CURRENT_SOURCE_DIR}/src/host/test/ledger.cpp ) + add_unit_test( + files_cleanup_test + ${CMAKE_CURRENT_SOURCE_DIR}/src/host/test/files_cleanup_test.cpp + ) + add_unit_test( raft_test ${CMAKE_CURRENT_SOURCE_DIR}/src/consensus/aft/test/main.cpp @@ -1231,6 +1236,7 @@ if(BUILD_TESTS) --historical-testdata ${CMAKE_SOURCE_DIR}/tests/testdata ) + set_tests_properties(schema_test PROPERTIES TIMEOUT 900) add_e2e_test( NAME snp_platform_tests diff --git a/doc/host_config_schema/cchost_config.json b/doc/host_config_schema/cchost_config.json index a0e13d951497..68c32ee6f933 100644 --- a/doc/host_config_schema/cchost_config.json +++ b/doc/host_config_schema/cchost_config.json @@ -555,10 +555,15 @@ "description": "Maximum number of committed snapshot files to retain. When the number of committed snapshots exceeds this value, the oldest snapshots are deleted. Must be at least 1 if set. If null or unset, no automated snapshot garbage collection is performed.", "minimum": 1 }, + "max_committed_ledger_chunks": { + "type": ["integer", "null"], + "default": null, + "description": "Maximum number of committed ledger chunk files to retain in the main ledger directory. When the number of committed chunks exceeds this value, the oldest chunks are deleted, but only after verifying that an identical copy (by SHA-256 digest) exists in at least one read-only ledger directory. Chunks whose entries extend to or beyond the sequence number of the newest committed snapshot are never deleted, ensuring a complete ledger history from that snapshot for disaster recovery. Requires at least one ledger.read_only_directories entry; the node will refuse to start otherwise. If null or unset, no automated ledger chunk garbage collection is performed." + }, "interval": { "type": "string", "default": "30s", - "description": "Time interval at which to scan the snapshot directory and delete old committed snapshots in excess of max_snapshots. This periodic cleanup executes regardless of the node's status (primary or backup)." + "description": "Time interval at which to scan and delete old committed files (snapshots and ledger chunks) that exceed the configured retention limits. This periodic cleanup executes regardless of the node's status (primary or backup)." } }, "description": "This section includes configuration for periodic cleanup of old files (snapshots, ledger chunks)", diff --git a/doc/operations/ledger_snapshot.rst b/doc/operations/ledger_snapshot.rst index 079635bd8f98..81f3909928aa 100644 --- a/doc/operations/ledger_snapshot.rst +++ b/doc/operations/ledger_snapshot.rst @@ -27,6 +27,8 @@ Ledger files that still contain some uncommitted entries are named ``ledger__.committed Uncommitted snapshot files, i.e. those whose evidence has not yet been committed, are named ``snapshot__``. These files will be ignored by CCF when joining or recovering a service as no evidence can attest of their validity. -.. note:: The ``files_cleanup.max_snapshots`` configuration entry can be used to limit the number of committed snapshot files retained on disk. When the number of committed snapshots exceeds this value, the oldest snapshots (by sequence number) are automatically deleted. This is useful to control the local persistent storage footprint of a node. The value must be at least 1 if set. +.. note:: The ``files_cleanup.max_snapshots`` configuration entry can be used to limit the number of committed snapshot files retained on disk. When the number of committed snapshots exceeds this value, the oldest snapshots (by sequence number) are automatically deleted. This is useful to control the local persistent storage footprint of a node. The value must be at least 1 if set. Snapshot cleanup runs as part of the same periodic cleanup cycle as ledger chunk cleanup (see :ref:`operations/ledger_snapshot:Periodic File Cleanup`). Join or Recover From Snapshot ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -301,3 +303,16 @@ Invariants 3. Snapshots are always generated for the ``seqno`` of a signature transaction (but not all signature transactions trigger the generation of snapshot). 4. When a snapshot is generated, it must coincide with the end of a ledger file. Since a node can join using solely a snapshot, the first ledger file on that node will start just after the ``seqno`` of the snapshot. By 2., all nodes must have the same ledger files, so the generation of that snapshot on the primary must trigger the creation of a new ledger file starting at the next ``seqno`` to ensure the primary's ledger files are consistent with the joining node's files. + +Periodic File Cleanup +--------------------- + +Both snapshot and committed ledger chunk retention are managed by a single periodic cleanup cycle, controlled by the ``files_cleanup`` configuration section. The cleanup interval is set by ``files_cleanup.interval`` (default: ``30s``). On each cycle, the node checks whether committed snapshots or committed ledger chunks exceed their configured retention limits (``files_cleanup.max_snapshots`` and ``files_cleanup.max_committed_ledger_chunks`` respectively) and deletes the oldest files that qualify for removal. + +Snapshots qualify for removal if their number is in excess of the limit, starting from the ones with the lowest sequence numbers. + +Ledger chunks qualify for removal if their number is in excess of the limit, and if two other conditions apply. First, there must be at least one identical file in a read only ledger directory (contents are captured in a SHA-256 digest and compared). Second, as a safety measure, ledger chunks whose entries extend to or beyond the sequence number of the newest committed snapshot never qualify. This ensures that a complete ledger history is always available from the newest snapshot onwards, which is required for disaster recovery. + +If no committed snapshots exist, no ledger chunks are protected by this rule, but the existing backup-verification requirement still applies. + +Only one cleanup cycle can run at a time. If a cleanup task is still in progress when the next timer fires, the new cycle is skipped and a failure-level log message is emitted. This prevents overlapping cleanup operations, which could be wasteful, cause contention on the filesystem and produce spurious failures in the log. Under normal conditions each cleanup cycle completes well within the configured interval, so skipped cycles indicate that the interval may be too short or the node has an unusually large number of files to process. diff --git a/include/ccf/node/startup_config.h b/include/ccf/node/startup_config.h index 241805559855..527e96461e61 100644 --- a/include/ccf/node/startup_config.h +++ b/include/ccf/node/startup_config.h @@ -120,6 +120,7 @@ namespace ccf struct FilesCleanup { std::optional max_snapshots = std::nullopt; + std::optional max_committed_ledger_chunks = std::nullopt; ccf::ds::TimeString interval = {"30s"}; bool operator==(const FilesCleanup&) const = default; diff --git a/src/common/configuration.h b/src/common/configuration.h index 45ec58d670ca..a46469bd33d1 100644 --- a/src/common/configuration.h +++ b/src/common/configuration.h @@ -117,7 +117,10 @@ namespace ccf DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig::FilesCleanup); DECLARE_JSON_REQUIRED_FIELDS(CCFConfig::FilesCleanup); DECLARE_JSON_OPTIONAL_FIELDS( - CCFConfig::FilesCleanup, max_snapshots, interval); + CCFConfig::FilesCleanup, + max_snapshots, + max_committed_ledger_chunks, + interval); DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig); DECLARE_JSON_REQUIRED_FIELDS(CCFConfig, network); diff --git a/src/host/files_cleanup_timer.h b/src/host/files_cleanup_timer.h new file mode 100644 index 000000000000..7b8123fc1513 --- /dev/null +++ b/src/host/files_cleanup_timer.h @@ -0,0 +1,544 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include "ccf/crypto/hash_provider.h" +#include "ccf/crypto/sha256_hash.h" +#include "ledger_filenames.h" +#include "snapshots/filenames.h" +#include "timer.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace asynchost +{ + // Pure helper functions for file cleanup, extracted for testability. + namespace files_cleanup + { + // Return type for check_digest_against_read_only_dirs(), distinguishing + // between a verified digest match, no match, and concurrent deletion. + enum class DigestCheckResult : std::uint8_t + { + match_found, // An identical copy exists in a read-only directory + no_match, // File exists locally but no matching copy was found + file_gone // Local file was concurrently deleted (benign) + }; + static constexpr size_t HASH_READ_CHUNK_SIZE = size_t{64} * 1024; // 64 KB + + // Returns committed ledger chunks in the given directory, sorted ascending + // by start index. Each entry is (start_idx, path). + inline std::vector> + find_committed_ledger_chunks(const std::filesystem::path& dir) + { + namespace fs = std::filesystem; + std::vector> result; + + for (const auto& entry : fs::directory_iterator(dir)) + { + if (!entry.is_regular_file()) + { + continue; + } + + auto file_name = entry.path().filename().string(); + + if (!is_ledger_file_name_committed(file_name)) + { + continue; + } + + try + { + auto start_idx = get_start_idx_from_file_name(file_name); + result.emplace_back(start_idx, entry.path()); + } + catch (const std::exception& e) + { + LOG_DEBUG_FMT( + "Skipping ledger file {} during cleanup: {}", file_name, e.what()); + } + } + + // Sort ascending by start index (oldest first) + std::sort(result.begin(), result.end(), [](const auto& a, const auto& b) { + return a.first < b.first; + }); + + return result; + } + + // Compute SHA-256 digest of a file by reading it in chunks, without + // loading the entire file into memory. + inline std::optional hash_file( + const std::filesystem::path& path) + { + std::ifstream f(path, std::ios::binary); + if (!f) + { + return std::nullopt; + } + + auto hasher = ccf::crypto::make_incremental_sha256(); + std::vector buf(HASH_READ_CHUNK_SIZE); + while (f.read(reinterpret_cast(buf.data()), buf.size()) || + f.gcount() > 0) + { + hasher->update_hash({buf.data(), static_cast(f.gcount())}); + if (f.eof()) + { + break; + } + } + + if (f.bad()) + { + return std::nullopt; + } + + return hasher->finalise(); + } + + inline DigestCheckResult check_digest_against_read_only_dirs( + const std::filesystem::path& local_path, + const std::vector& read_only_dirs) + { + namespace fs = std::filesystem; + + auto local_hash = hash_file(local_path); + if (!local_hash.has_value()) + { + // Distinguish between a concurrent deletion (benign) and a genuine + // read error on an existing file. Use non-throwing overloads to + // avoid exceptions from permission issues or broken mounts. + std::error_code ec; + const auto exists = fs::exists(local_path, ec); + if (ec) + { + LOG_FAIL_FMT( + "Failed to query existence of ledger chunk {}: {}. " + "Skipping deletion.", + local_path.filename(), + ec.message()); + return DigestCheckResult::no_match; + } + if (!exists) + { + LOG_INFO_FMT( + "Ledger chunk {} no longer exists, skipping", + local_path.filename()); + return DigestCheckResult::file_gone; + } + + ec.clear(); + const auto is_reg = fs::is_regular_file(local_path, ec); + if (ec) + { + LOG_FAIL_FMT( + "Failed to query type of ledger chunk {}: {}. " + "Skipping deletion.", + local_path.filename(), + ec.message()); + return DigestCheckResult::no_match; + } + if (!is_reg) + { + LOG_INFO_FMT( + "Ledger chunk {} is no longer a regular file, skipping", + local_path.filename()); + return DigestCheckResult::file_gone; + } + + LOG_FAIL_FMT( + "Ledger chunk {} exists but could not be read, skipping deletion", + local_path.filename()); + return DigestCheckResult::no_match; + } + + auto file_name = local_path.filename(); + + for (const auto& ro_dir : read_only_dirs) + { + auto candidate = ro_dir / file_name; + std::error_code ec; + if ( + !fs::exists(candidate, ec) || ec || + !fs::is_regular_file(candidate, ec) || ec) + { + continue; + } + + try + { + auto ro_hash = hash_file(candidate); + if (!ro_hash.has_value()) + { + LOG_DEBUG_FMT( + "Ledger chunk {} in read-only directory {} could not be read", + file_name, + ro_dir); + continue; + } + if (local_hash.value() == ro_hash.value()) + { + return DigestCheckResult::match_found; + } + + LOG_FAIL_FMT( + "Ledger chunk {} found in read-only directory {} but digest " + "does not match (local: {}, read-only: {}). Skipping deletion.", + file_name, + ro_dir, + local_hash.value().hex_str(), + ro_hash.value().hex_str()); + } + catch (const std::exception& e) + { + LOG_FAIL_FMT( + "Failed to read ledger chunk {} from read-only directory {}: " + "{}. Skipping deletion.", + file_name, + ro_dir, + e.what()); + } + } + + return DigestCheckResult::no_match; + } + + // Lists committed snapshots in the given directory. Returns them sorted + // descending by snapshot index (newest first). Returns nullopt on error + // to allow callers to distinguish "no snapshots" from "listing failed". + inline std::optional>> + find_committed_snapshots(const std::filesystem::path& dir) + { + std::vector directories{dir}; + try + { + return snapshots::find_committed_snapshots_in_directories(directories); + } + catch (const std::filesystem::filesystem_error& e) + { + LOG_FAIL_FMT( + "Failed to list committed snapshots in {}: {}", dir, e.what()); + } + catch (const std::exception& e) + { + LOG_FAIL_FMT( + "Unexpected error while listing committed snapshots in {}: {}", + dir, + e.what()); + } + return std::nullopt; + } + + // Returns the sequence number of the newest committed snapshot from a + // pre-gathered list, or nullopt if the list is empty. + inline std::optional highest_committed_snapshot_seqno( + const std::vector>& + committed_snapshots) + { + if (!committed_snapshots.empty()) + { + // Sorted descending by snapshot index; first is newest + return committed_snapshots.front().first; + } + return std::nullopt; + } + + inline void cleanup_old_snapshots( + const std::vector>& + committed_snapshots, + size_t max_retained) + { + if (committed_snapshots.size() > max_retained) + { + // committed_snapshots is sorted descending by snapshot index, so the + // oldest are at the end + for (auto it = committed_snapshots.rbegin(); + it != committed_snapshots.rend() - max_retained; + ++it) + { + const auto& path = it->second; + LOG_INFO_FMT( + "Deleting old snapshot {} (retaining {})", + path.filename(), + max_retained); + std::error_code ec; + std::filesystem::remove(path, ec); + if (ec) + { + LOG_FAIL_FMT( + "Failed to delete old snapshot {}: {}", + path.filename(), + ec.message()); + } + } + } + } + + inline void cleanup_old_ledger_chunks( + const std::filesystem::path& main_dir, + const std::vector& read_only_dirs, + size_t max_retained, + std::optional snapshot_watermark = std::nullopt) + { + std::vector> committed; + try + { + committed = find_committed_ledger_chunks(main_dir); + } + catch (const std::filesystem::filesystem_error& e) + { + LOG_FAIL_FMT( + "Failed to list committed ledger chunks in {}: {}", + main_dir, + e.what()); + return; + } + catch (const std::exception& e) + { + LOG_FAIL_FMT( + "Unexpected error while listing committed ledger chunks in {}: {}", + main_dir, + e.what()); + return; + } + + if (committed.size() <= max_retained) + { + return; + } + + if (snapshot_watermark.has_value()) + { + LOG_DEBUG_FMT( + "Ledger chunk cleanup: snapshot watermark is {}", + snapshot_watermark.value()); + } + + // committed is sorted ascending by start index; the oldest are at the + // front. Delete from front, keeping the last max_retained entries. + size_t to_delete = committed.size() - max_retained; + for (size_t i = 0; i < to_delete; ++i) + { + const auto& path = committed[i].second; + + // Never delete a chunk that ends at or after the newest committed + // snapshot - we must preserve a complete ledger from that snapshot + // onwards for disaster recovery. + if (snapshot_watermark.has_value()) + { + auto end_idx = get_last_idx_from_file_name(path.filename().string()); + if ( + end_idx.has_value() && + end_idx.value() >= snapshot_watermark.value()) + { + LOG_DEBUG_FMT( + "Keeping ledger chunk {} (end seqno {} >= snapshot " + "watermark {})", + path.filename(), + end_idx.value(), + snapshot_watermark.value()); + continue; + } + } + + auto digest_result = + check_digest_against_read_only_dirs(path, read_only_dirs); + if (digest_result == DigestCheckResult::file_gone) + { + // File was concurrently deleted — nothing to do. + continue; + } + if (digest_result == DigestCheckResult::no_match) + { + LOG_FAIL_FMT( + "Keeping ledger chunk {} because no matching copy was found " + "in any read-only ledger directory", + path.filename()); + continue; + } + + LOG_INFO_FMT( + "Deleting old committed ledger chunk {} (retaining {})", + path.filename(), + max_retained); + std::error_code ec; + std::filesystem::remove(path, ec); + if (ec) + { + if (ec == std::errc::no_such_file_or_directory) + { + LOG_INFO_FMT( + "Ledger chunk {} was already removed", path.filename()); + } + else + { + LOG_FAIL_FMT( + "Failed to delete committed ledger chunk {}: {}", + path.filename(), + ec.message()); + } + } + } + } + } // namespace files_cleanup + + class FilesCleanupImpl + { + private: + // Snapshot cleanup config + std::filesystem::path snapshots_dir; + std::optional max_snapshots; + + // Ledger chunk cleanup config + std::filesystem::path ledger_dir; + std::vector read_only_ledger_dirs; + std::optional max_committed_ledger_chunks; + + // Guard against overlapping cleanup tasks. Shared between the impl and + // any in-flight CleanupWork so the flag remains valid even if the timer + // is destroyed while a cleanup task is still running on the thread pool. + std::shared_ptr> cleanup_in_progress = + std::make_shared>(false); + + struct CleanupWork + { + std::filesystem::path snapshots_dir; + std::optional max_snapshots; + + std::filesystem::path ledger_dir; + std::vector read_only_ledger_dirs; + std::optional max_committed_ledger_chunks; + + std::shared_ptr> cleanup_in_progress; + }; + + static void on_cleanup_work(uv_work_t* req) + { + auto* work = static_cast(req->data); + LOG_DEBUG_FMT("Files cleanup started"); + + // Gather committed snapshots once - used by both snapshot cleanup + // and as a watermark for ledger chunk cleanup. + auto committed_snapshots_opt = + files_cleanup::find_committed_snapshots(work->snapshots_dir); + + if (!committed_snapshots_opt.has_value()) + { + // Snapshot listing failed. Skip both snapshot and ledger cleanup + // to avoid deleting ledger chunks without a valid watermark. + LOG_FAIL_FMT( + "Skipping all file cleanup because committed snapshot listing " + "failed"); + return; + } + + auto& committed_snapshots = committed_snapshots_opt.value(); + + if (work->max_snapshots.has_value()) + { + files_cleanup::cleanup_old_snapshots( + committed_snapshots, work->max_snapshots.value()); + } + if (work->max_committed_ledger_chunks.has_value()) + { + auto snapshot_watermark = + files_cleanup::highest_committed_snapshot_seqno(committed_snapshots); + files_cleanup::cleanup_old_ledger_chunks( + work->ledger_dir, + work->read_only_ledger_dirs, + work->max_committed_ledger_chunks.value(), + snapshot_watermark); + } + } + + static void on_cleanup_work_done(uv_work_t* req, int /*status*/) + { + auto* work = static_cast(req->data); + work->cleanup_in_progress->store(false); + LOG_DEBUG_FMT("Files cleanup completed"); + delete work; // NOLINT(cppcoreguidelines-owning-memory) + delete req; // NOLINT(cppcoreguidelines-owning-memory) + } + + public: + FilesCleanupImpl( + const std::string& snapshots_dir_, + std::optional max_snapshots_, + const std::string& ledger_dir_, + const std::vector& read_only_ledger_dirs_, + std::optional max_committed_ledger_chunks_) : + snapshots_dir(snapshots_dir_), + max_snapshots(max_snapshots_), + ledger_dir(ledger_dir_), + max_committed_ledger_chunks(max_committed_ledger_chunks_) + { + for (const auto& d : read_only_ledger_dirs_) + { + read_only_ledger_dirs.emplace_back(d); + } + + if (max_snapshots.has_value() && max_snapshots.value() < 1) + { + throw std::logic_error(fmt::format( + "files_cleanup.max_snapshots must be at least 1, got {}", + max_snapshots.value())); + } + if ( + max_committed_ledger_chunks.has_value() && + read_only_ledger_dirs.empty()) + { + throw std::logic_error( + "files_cleanup.max_committed_ledger_chunks requires at least one " + "ledger.read_only_directories entry. Committed ledger chunks are " + "only deleted after verifying an identical copy exists in a " + "read-only directory."); + } + } + + void on_timer() + { + bool expected = false; + if (!cleanup_in_progress->compare_exchange_strong(expected, true)) + { + LOG_FAIL_FMT( + "Skipping files cleanup: previous cleanup task is still running"); + return; + } + + // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) + auto* work = new CleanupWork{ + .snapshots_dir = snapshots_dir, + .max_snapshots = max_snapshots, + .ledger_dir = ledger_dir, + .read_only_ledger_dirs = read_only_ledger_dirs, + .max_committed_ledger_chunks = max_committed_ledger_chunks, + .cleanup_in_progress = cleanup_in_progress}; + // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) + auto* req = new uv_work_t; + req->data = work; + int rc = uv_queue_work( + uv_default_loop(), req, &on_cleanup_work, &on_cleanup_work_done); + if (rc < 0) + { + LOG_FAIL_FMT("Failed to queue files cleanup work: {}", uv_strerror(rc)); + cleanup_in_progress->store(false); + delete work; // NOLINT(cppcoreguidelines-owning-memory) + delete req; // NOLINT(cppcoreguidelines-owning-memory) + } + } + }; + + using FilesCleanupTimer = proxy_ptr>; +} diff --git a/src/host/ledger.h b/src/host/ledger.h index 474986f0baf5..fb2b821827e1 100644 --- a/src/host/ledger.h +++ b/src/host/ledger.h @@ -11,6 +11,7 @@ #include "ds/serialized.h" #include "kv/kv_types.h" #include "kv/serialised_entry_format.h" +#include "ledger_filenames.h" #include "time_bound_logger.h" #include @@ -30,76 +31,6 @@ namespace asynchost { static constexpr size_t ledger_max_read_cache_files_default = 5; - static constexpr auto ledger_committed_suffix = "committed"; - static constexpr auto ledger_start_idx_delimiter = "_"; - static constexpr auto ledger_last_idx_delimiter = "-"; - static constexpr auto ledger_recovery_file_suffix = "recovery"; - static constexpr auto ledger_ignored_file_suffix = "ignored"; - - static inline size_t get_start_idx_from_file_name( - const std::string& file_name) - { - auto pos = file_name.find(ledger_start_idx_delimiter); - if (pos == std::string::npos) - { - throw std::logic_error(fmt::format( - "Ledger file name {} does not contain a start seqno", file_name)); - } - - return std::stol(file_name.substr(pos + 1)); - } - - static inline std::optional get_last_idx_from_file_name( - const std::string& file_name) - { - auto pos = file_name.find(ledger_last_idx_delimiter); - if (pos == std::string::npos) - { - // Non-committed file names do not contain a last idx - return std::nullopt; - } - - return std::stol(file_name.substr(pos + 1)); - } - - static inline bool is_ledger_file_name_committed(const std::string& file_name) - { - return file_name.ends_with(ledger_committed_suffix); - } - - static inline bool is_ledger_file_name_recovery(const std::string& file_name) - { - return file_name.ends_with(ledger_recovery_file_suffix); - } - - static inline bool is_ledger_file_name_ignored(const std::string& file_name) - { - return file_name.ends_with(ledger_ignored_file_suffix); - } - - static inline bool is_ledger_file_ignored(const std::string& file_name) - { - // Catch-all for all files that should be ignored - return is_ledger_file_name_recovery(file_name) || - is_ledger_file_name_ignored(file_name); - } - - static inline fs::path remove_suffix( - std::string_view file_name, const std::string& suffix) - { - if (file_name.ends_with(suffix)) - { - file_name.remove_suffix(suffix.size()); - } - return file_name; - } - - static inline fs::path remove_recovery_suffix(std::string_view file_name) - { - return remove_suffix( - file_name, fmt::format(".{}", ledger_recovery_file_suffix)); - } - static std::optional get_file_name_with_idx( const std::string& dir, size_t idx, bool allow_recovery_files) { @@ -184,7 +115,7 @@ namespace asynchost if (recovery) { file_name = - fmt::format("{}.{}", file_name.string(), ledger_recovery_file_suffix); + fmt::format("{}{}", file_name.string(), ledger_recovery_file_suffix); } auto file_path = dir / file_name; @@ -690,7 +621,7 @@ namespace asynchost } auto committed_file_name = fmt::format( - "{}_{}-{}.{}", + "{}_{}-{}{}", file_name_prefix, start_idx, get_last_idx(), @@ -698,8 +629,8 @@ namespace asynchost if (recovery) { - committed_file_name = fmt::format( - "{}.{}", committed_file_name, ledger_recovery_file_suffix); + committed_file_name = + fmt::format("{}{}", committed_file_name, ledger_recovery_file_suffix); } if (!rename(committed_file_name)) @@ -978,7 +909,7 @@ namespace asynchost } auto ignored_file_name = - fmt::format("{}.{}", file_name, ledger_ignored_file_suffix); + fmt::format("{}{}", file_name, ledger_ignored_file_suffix); files::rename(ledger_dir / file_name, ledger_dir / ignored_file_name); } @@ -1267,7 +1198,7 @@ namespace asynchost remove_suffix( file_name.string(), fmt::format( - "{}{}.{}", + "{}{}{}", ledger_last_idx_delimiter, last_idx_file.value(), ledger_committed_suffix))); diff --git a/src/host/ledger_filenames.h b/src/host/ledger_filenames.h new file mode 100644 index 000000000000..d544ea78f600 --- /dev/null +++ b/src/host/ledger_filenames.h @@ -0,0 +1,84 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace asynchost +{ + namespace fs = std::filesystem; + + static constexpr auto ledger_committed_suffix = ".committed"; + static constexpr auto ledger_start_idx_delimiter = "_"; + static constexpr auto ledger_last_idx_delimiter = "-"; + static constexpr auto ledger_recovery_file_suffix = ".recovery"; + static constexpr auto ledger_ignored_file_suffix = ".ignored"; + + static inline size_t get_start_idx_from_file_name( + const std::string& file_name) + { + auto pos = file_name.find(ledger_start_idx_delimiter); + if (pos == std::string::npos) + { + throw std::logic_error(fmt::format( + "Ledger file name {} does not contain a start seqno", file_name)); + } + + return std::stoull(file_name.substr(pos + 1)); + } + + static inline std::optional get_last_idx_from_file_name( + const std::string& file_name) + { + auto pos = file_name.find(ledger_last_idx_delimiter); + if (pos == std::string::npos) + { + // Non-committed file names do not contain a last idx + return std::nullopt; + } + + return std::stoull(file_name.substr(pos + 1)); + } + + static inline bool is_ledger_file_name_committed(const std::string& file_name) + { + return file_name.ends_with(ledger_committed_suffix); + } + + static inline bool is_ledger_file_name_recovery(const std::string& file_name) + { + return file_name.ends_with(ledger_recovery_file_suffix); + } + + static inline bool is_ledger_file_name_ignored(const std::string& file_name) + { + return file_name.ends_with(ledger_ignored_file_suffix); + } + + static inline bool is_ledger_file_ignored(const std::string& file_name) + { + // Catch-all for all files that should be ignored + return is_ledger_file_name_recovery(file_name) || + is_ledger_file_name_ignored(file_name); + } + + static inline fs::path remove_suffix( + std::string_view file_name, const std::string& suffix) + { + if (file_name.ends_with(suffix)) + { + file_name.remove_suffix(suffix.size()); + } + return file_name; + } + + static inline fs::path remove_recovery_suffix(std::string_view file_name) + { + return remove_suffix(file_name, ledger_recovery_file_suffix); + } +} diff --git a/src/host/run.cpp b/src/host/run.cpp index 0513e9e3e256..594298fa9264 100644 --- a/src/host/run.cpp +++ b/src/host/run.cpp @@ -30,7 +30,7 @@ #include "enclave/entry_points.h" #include "handle_ring_buffer.h" #include "host/env.h" -#include "host/snapshot_cleanup_timer.h" +#include "host/files_cleanup_timer.h" #include "http/curl.h" #include "json_schema.h" #include "lfs_file_handler.h" @@ -621,16 +621,20 @@ namespace ccf config.snapshots.read_only_directory); snapshots.register_message_handlers(buffer_processor.get_dispatcher()); - std::optional snapshot_cleanup; + std::optional files_cleanup; if ( - config.files_cleanup.max_snapshots.has_value() && + (config.files_cleanup.max_snapshots.has_value() || + config.files_cleanup.max_committed_ledger_chunks.has_value()) && std::chrono::milliseconds(config.files_cleanup.interval) > std::chrono::milliseconds::zero()) { - snapshot_cleanup.emplace( + files_cleanup.emplace( std::chrono::milliseconds(config.files_cleanup.interval), config.snapshots.directory, - config.files_cleanup.max_snapshots.value()); + config.files_cleanup.max_snapshots, + config.ledger.directory, + config.ledger.read_only_directories, + config.files_cleanup.max_committed_ledger_chunks); } // handle LFS-related messages from the enclave diff --git a/src/host/snapshot_cleanup_timer.h b/src/host/snapshot_cleanup_timer.h deleted file mode 100644 index e56a415ca55e..000000000000 --- a/src/host/snapshot_cleanup_timer.h +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the Apache 2.0 License. -#pragma once - -#include "snapshots/filenames.h" -#include "timer.h" - -#include -#include - -namespace asynchost -{ - class SnapshotCleanupImpl - { - private: - std::filesystem::path dir; - size_t max_retained; - - struct CleanupWork - { - std::filesystem::path dir; - size_t max_retained; - }; - - static void cleanup_old_snapshots( - const std::filesystem::path& dir, size_t max_retained) - { - std::vector directories{dir}; - decltype(snapshots::find_committed_snapshots_in_directories( - directories)) committed; - try - { - committed = - snapshots::find_committed_snapshots_in_directories(directories); - } - catch (const std::filesystem::filesystem_error& e) - { - LOG_FAIL_FMT( - "Failed to list committed snapshots in {}: {}", dir, e.what()); - return; - } - catch (const std::exception& e) - { - LOG_FAIL_FMT( - "Unexpected error while listing committed snapshots in {}: {}", - dir, - e.what()); - return; - } - - if (committed.size() > max_retained) - { - // committed is sorted descending by snapshot index, so the - // oldest are at the end - for (auto it = committed.rbegin(); - it != committed.rend() - max_retained; - ++it) - { - const auto& path = it->second; - LOG_INFO_FMT( - "Deleting old snapshot {} (retaining {})", - path.filename(), - max_retained); - std::error_code ec; - std::filesystem::remove(path, ec); - if (ec) - { - LOG_FAIL_FMT( - "Failed to delete old snapshot {}: {}", - path.filename(), - ec.message()); - } - } - } - } - - static void on_cleanup_work(uv_work_t* req) - { - auto* work = static_cast(req->data); - cleanup_old_snapshots(work->dir, work->max_retained); - } - - static void on_cleanup_work_done(uv_work_t* req, int /*status*/) - { - auto* work = static_cast(req->data); - LOG_DEBUG_FMT("Snapshot cleanup completed"); - delete work; // NOLINT(cppcoreguidelines-owning-memory) - delete req; // NOLINT(cppcoreguidelines-owning-memory) - } - - public: - SnapshotCleanupImpl(const std::string& dir_, size_t max_retained_) : - dir(dir_), - max_retained(max_retained_) - { - if (max_retained < 1) - { - throw std::logic_error(fmt::format( - "files_cleanup.max_snapshots must be at least 1, got {}", - max_retained)); - } - } - - void on_timer() - { - // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) - auto* work = new CleanupWork{.dir = dir, .max_retained = max_retained}; - // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) - auto* req = new uv_work_t; - req->data = work; - int rc = uv_queue_work( - uv_default_loop(), req, &on_cleanup_work, &on_cleanup_work_done); - if (rc < 0) - { - LOG_FAIL_FMT( - "Failed to queue snapshot cleanup work: {}", uv_strerror(rc)); - delete work; // NOLINT(cppcoreguidelines-owning-memory) - delete req; // NOLINT(cppcoreguidelines-owning-memory) - } - } - }; - - using SnapshotCleanupTimer = proxy_ptr>; -} diff --git a/src/host/test/files_cleanup_test.cpp b/src/host/test/files_cleanup_test.cpp new file mode 100644 index 000000000000..c1951b28630b --- /dev/null +++ b/src/host/test/files_cleanup_test.cpp @@ -0,0 +1,724 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. + +#include "ds/files.h" +#include "ds/internal_logger.h" +#include "host/files_cleanup_timer.h" + +#define DOCTEST_CONFIG_IMPLEMENT +#include +#include +#include +#include + +namespace fs = std::filesystem; +using namespace asynchost; +using namespace asynchost::files_cleanup; + +// Creates a unique temporary directory using mkdtemp to avoid cross-test +// interference when tests run in parallel or a prior run left files behind. +static fs::path make_unique_test_dir(const std::string& prefix) +{ + auto pattern = (fs::temp_directory_path() / (prefix + "_XXXXXX")).string(); + auto* result = mkdtemp(pattern.data()); + REQUIRE(result != nullptr); + return fs::path(result); +} + +static void write_file(const fs::path& path, const std::string& content) +{ + std::ofstream f(path, std::ios::binary); + REQUIRE(f.good()); + f << content; +} + +static fs::path create_committed_chunk( + const fs::path& dir, + size_t start_idx, + size_t end_idx, + const std::string& content = "data") +{ + auto name = fmt::format("ledger_{}-{}.committed", start_idx, end_idx); + auto path = dir / name; + write_file(path, content); + return path; +} + +// ---- find_committed_ledger_chunks tests ---- + +TEST_CASE("find_committed_ledger_chunks: empty directory") +{ + auto tmp = make_unique_test_dir("test_cleanup_empty"); + + auto result = find_committed_ledger_chunks(tmp); + CHECK(result.empty()); + + fs::remove_all(tmp); +} + +TEST_CASE( + "find_committed_ledger_chunks: returns only committed chunks sorted " + "ascending") +{ + auto tmp = make_unique_test_dir("test_cleanup_sorted"); + + // Create committed chunks in non-sorted order + create_committed_chunk(tmp, 300, 400); + create_committed_chunk(tmp, 100, 200); + create_committed_chunk(tmp, 200, 300); + + auto result = find_committed_ledger_chunks(tmp); + REQUIRE(result.size() == 3); + CHECK(result[0].first == 100); + CHECK(result[1].first == 200); + CHECK(result[2].first == 300); + + fs::remove_all(tmp); +} + +TEST_CASE("find_committed_ledger_chunks: skips non-committed and special files") +{ + auto tmp = make_unique_test_dir("test_cleanup_skip"); + + // Committed chunk (should be included) + create_committed_chunk(tmp, 1, 100); + + // Uncommitted file (no .committed suffix) + write_file(tmp / "ledger_101", "data"); + + // Recovery file + write_file(tmp / "ledger_1-100.committed.recovery", "data"); + + // Ignored file + write_file(tmp / "ledger_1-100.committed.ignored", "data"); + + // Subdirectory + fs::create_directories(tmp / "subdir"); + + // Non-ledger file + write_file(tmp / "random_file.txt", "data"); + + auto result = find_committed_ledger_chunks(tmp); + REQUIRE(result.size() == 1); + CHECK(result[0].first == 1); + + fs::remove_all(tmp); +} + +TEST_CASE("find_committed_ledger_chunks: nonexistent directory throws") +{ + auto tmp = make_unique_test_dir("test_cleanup_nonexistent"); + fs::remove_all(tmp); // mkdtemp creates it; remove so we test a missing dir + + CHECK_THROWS_AS( + find_committed_ledger_chunks(tmp), std::filesystem::filesystem_error); +} + +// ---- hash_file tests ---- + +TEST_CASE("hash_file: normal file returns a hash") +{ + auto tmp = make_unique_test_dir("test_hash_normal"); + auto path = tmp / "test_file"; + write_file(path, "hello world"); + + auto result = hash_file(path); + REQUIRE(result.has_value()); + + // Hash same content again - should be deterministic + auto result2 = hash_file(path); + REQUIRE(result2.has_value()); + CHECK(result.value() == result2.value()); + + fs::remove_all(tmp); +} + +TEST_CASE("hash_file: different content produces different hash") +{ + auto tmp = make_unique_test_dir("test_hash_different"); + + auto path_a = tmp / "file_a"; + auto path_b = tmp / "file_b"; + write_file(path_a, "content A"); + write_file(path_b, "content B"); + + auto hash_a = hash_file(path_a); + auto hash_b = hash_file(path_b); + REQUIRE(hash_a.has_value()); + REQUIRE(hash_b.has_value()); + CHECK(hash_a.value() != hash_b.value()); + + fs::remove_all(tmp); +} + +TEST_CASE("hash_file: empty file returns a hash") +{ + auto tmp = make_unique_test_dir("test_hash_empty"); + auto path = tmp / "empty_file"; + write_file(path, ""); + + auto result = hash_file(path); + REQUIRE(result.has_value()); + + fs::remove_all(tmp); +} + +TEST_CASE("hash_file: nonexistent file returns nullopt") +{ + auto tmp = make_unique_test_dir("test_hash_nosuch"); + auto path = tmp / "no_such_file"; + // path doesn't exist within the unique dir + + auto result = hash_file(path); + CHECK_FALSE(result.has_value()); + + fs::remove_all(tmp); +} + +// ---- check_digest_against_read_only_dirs tests ---- + +TEST_CASE("check_digest_against_read_only_dirs: matching copy in read-only dir") +{ + auto tmp = make_unique_test_dir("test_digest_match"); + auto main_dir = tmp / "main"; + auto ro_dir = tmp / "ro"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir); + + auto local_path = + create_committed_chunk(main_dir, 1, 100, "identical content"); + // Copy to read-only dir with same name and content + write_file(ro_dir / local_path.filename(), "identical content"); + + std::vector ro_dirs = {ro_dir}; + CHECK( + check_digest_against_read_only_dirs(local_path, ro_dirs) == + DigestCheckResult::match_found); + + fs::remove_all(tmp); +} + +TEST_CASE("check_digest_against_read_only_dirs: mismatched digest") +{ + auto tmp = make_unique_test_dir("test_digest_mismatch"); + auto main_dir = tmp / "main"; + auto ro_dir = tmp / "ro"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir); + + auto local_path = create_committed_chunk(main_dir, 1, 100, "local content"); + write_file(ro_dir / local_path.filename(), "different content"); + + std::vector ro_dirs = {ro_dir}; + CHECK( + check_digest_against_read_only_dirs(local_path, ro_dirs) == + DigestCheckResult::no_match); + + fs::remove_all(tmp); +} + +TEST_CASE("check_digest_against_read_only_dirs: no copy in read-only dir") +{ + auto tmp = make_unique_test_dir("test_digest_no_copy"); + auto main_dir = tmp / "main"; + auto ro_dir = tmp / "ro"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir); + + auto local_path = create_committed_chunk(main_dir, 1, 100, "content"); + // ro_dir is empty - no matching file + + std::vector ro_dirs = {ro_dir}; + CHECK( + check_digest_against_read_only_dirs(local_path, ro_dirs) == + DigestCheckResult::no_match); + + fs::remove_all(tmp); +} + +TEST_CASE( + "check_digest_against_read_only_dirs: deleted local file returns file_gone") +{ + auto tmp = make_unique_test_dir("test_digest_deleted"); + auto main_dir = tmp / "main"; + auto ro_dir = tmp / "ro"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir); + + auto local_path = main_dir / "ledger_1-100.committed"; + // Do not create the file - simulate concurrent deletion + + std::vector ro_dirs = {ro_dir}; + CHECK( + check_digest_against_read_only_dirs(local_path, ro_dirs) == + DigestCheckResult::file_gone); + + fs::remove_all(tmp); +} + +TEST_CASE( + "check_digest_against_read_only_dirs: match found in second read-only dir") +{ + auto tmp = make_unique_test_dir("test_digest_multi_ro"); + auto main_dir = tmp / "main"; + auto ro_dir1 = tmp / "ro1"; + auto ro_dir2 = tmp / "ro2"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir1); + fs::create_directories(ro_dir2); + + auto local_path = create_committed_chunk(main_dir, 1, 100, "my data"); + // Only in second read-only dir + write_file(ro_dir2 / local_path.filename(), "my data"); + + std::vector ro_dirs = {ro_dir1, ro_dir2}; + CHECK( + check_digest_against_read_only_dirs(local_path, ro_dirs) == + DigestCheckResult::match_found); + + fs::remove_all(tmp); +} + +TEST_CASE("check_digest_against_read_only_dirs: empty read-only dirs list") +{ + auto tmp = make_unique_test_dir("test_digest_no_ro_dirs"); + auto main_dir = tmp / "main"; + fs::create_directories(main_dir); + + auto local_path = create_committed_chunk(main_dir, 1, 100, "content"); + + std::vector ro_dirs = {}; + CHECK( + check_digest_against_read_only_dirs(local_path, ro_dirs) == + DigestCheckResult::no_match); + + fs::remove_all(tmp); +} + +// ---- cleanup_old_ledger_chunks tests ---- + +TEST_CASE("cleanup_old_ledger_chunks: empty directory is a no-op") +{ + auto tmp = make_unique_test_dir("test_ledger_cleanup_empty"); + auto main_dir = tmp / "main"; + auto ro_dir = tmp / "ro"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir); + + std::vector ro_dirs = {ro_dir}; + // Should not throw or crash + cleanup_old_ledger_chunks(main_dir, ro_dirs, 3); + + fs::remove_all(tmp); +} + +TEST_CASE("cleanup_old_ledger_chunks: deletes oldest chunks when backed up") +{ + auto tmp = make_unique_test_dir("test_ledger_cleanup_delete"); + auto main_dir = tmp / "main"; + auto ro_dir = tmp / "ro"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir); + + // Create 5 committed chunks + for (size_t i = 0; i < 5; ++i) + { + auto start = i * 100 + 1; + auto end = (i + 1) * 100; + auto content = fmt::format("chunk_{}", i); + create_committed_chunk(main_dir, start, end, content); + // Also copy to read-only dir + create_committed_chunk(ro_dir, start, end, content); + } + + std::vector ro_dirs = {ro_dir}; + // Keep only 2 - should delete 3 oldest + cleanup_old_ledger_chunks(main_dir, ro_dirs, 2); + + auto remaining = find_committed_ledger_chunks(main_dir); + REQUIRE(remaining.size() == 2); + // Retained should be the newest (start_idx 301 and 401) + CHECK(remaining[0].first == 301); + CHECK(remaining[1].first == 401); + + fs::remove_all(tmp); +} + +TEST_CASE("cleanup_old_ledger_chunks: keeps chunks not backed up in read-only") +{ + auto tmp = make_unique_test_dir("test_ledger_cleanup_keep"); + auto main_dir = tmp / "main"; + auto ro_dir = tmp / "ro"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir); + + // Create 4 committed chunks + for (size_t i = 0; i < 4; ++i) + { + auto start = i * 100 + 1; + auto end = (i + 1) * 100; + create_committed_chunk(main_dir, start, end, fmt::format("chunk_{}", i)); + } + + // Only back up chunk 0 (oldest) to read-only dir + create_committed_chunk(ro_dir, 1, 100, "chunk_0"); + + std::vector ro_dirs = {ro_dir}; + // Keep 2 - should try to delete 2 oldest, but only chunk 0 is backed up + cleanup_old_ledger_chunks(main_dir, ro_dirs, 2); + + auto remaining = find_committed_ledger_chunks(main_dir); + // Chunk 0 deleted (backed up), chunk 1 kept (not backed up), + // chunks 2-3 kept (within retention) + REQUIRE(remaining.size() == 3); + CHECK(remaining[0].first == 101); // chunk 1 (not backed up, kept) + CHECK(remaining[1].first == 201); // chunk 2 + CHECK(remaining[2].first == 301); // chunk 3 + + fs::remove_all(tmp); +} + +TEST_CASE("cleanup_old_ledger_chunks: max_retained = 0 deletes all backed up") +{ + auto tmp = make_unique_test_dir("test_ledger_cleanup_zero"); + auto main_dir = tmp / "main"; + auto ro_dir = tmp / "ro"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir); + + // Create 3 committed chunks, all backed up + for (size_t i = 0; i < 3; ++i) + { + auto start = i * 100 + 1; + auto end = (i + 1) * 100; + auto content = fmt::format("chunk_{}", i); + create_committed_chunk(main_dir, start, end, content); + create_committed_chunk(ro_dir, start, end, content); + } + + std::vector ro_dirs = {ro_dir}; + cleanup_old_ledger_chunks(main_dir, ro_dirs, 0); + + auto remaining = find_committed_ledger_chunks(main_dir); + CHECK(remaining.empty()); + + fs::remove_all(tmp); +} + +TEST_CASE("cleanup_old_ledger_chunks: count within limit is a no-op") +{ + auto tmp = make_unique_test_dir("test_ledger_cleanup_within"); + auto main_dir = tmp / "main"; + auto ro_dir = tmp / "ro"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir); + + // Create 2 committed chunks + create_committed_chunk(main_dir, 1, 100, "a"); + create_committed_chunk(main_dir, 101, 200, "b"); + + std::vector ro_dirs = {ro_dir}; + // max_retained = 5, only 2 chunks - no deletions + cleanup_old_ledger_chunks(main_dir, ro_dirs, 5); + + auto remaining = find_committed_ledger_chunks(main_dir); + CHECK(remaining.size() == 2); + + fs::remove_all(tmp); +} + +TEST_CASE("cleanup_old_ledger_chunks: digest mismatch prevents deletion") +{ + auto tmp = make_unique_test_dir("test_ledger_cleanup_mismatch"); + auto main_dir = tmp / "main"; + auto ro_dir = tmp / "ro"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir); + + // Create 3 committed chunks + for (size_t i = 0; i < 3; ++i) + { + auto start = i * 100 + 1; + auto end = (i + 1) * 100; + create_committed_chunk(main_dir, start, end, fmt::format("chunk_{}", i)); + } + + // Back up chunk 0 with corrupted content + create_committed_chunk(ro_dir, 1, 100, "CORRUPTED"); + + std::vector ro_dirs = {ro_dir}; + cleanup_old_ledger_chunks(main_dir, ro_dirs, 1); + + auto remaining = find_committed_ledger_chunks(main_dir); + // chunk 0 and 1 should both be kept (0: digest mismatch, 1: not backed up) + // chunk 2 is within retention limit + REQUIRE(remaining.size() == 3); + + fs::remove_all(tmp); +} + +// ---- find_committed_snapshots / highest_committed_snapshot_seqno tests ---- + +static fs::path create_committed_snapshot( + const fs::path& dir, size_t seqno, size_t evidence_seqno) +{ + auto name = fmt::format("snapshot_{}_{}.committed", seqno, evidence_seqno); + auto path = dir / name; + write_file(path, fmt::format("snapshot_data_{}", seqno)); + return path; +} + +TEST_CASE("highest_committed_snapshot_seqno: returns newest snapshot seqno") +{ + auto tmp = make_unique_test_dir("test_snap_watermark"); + + create_committed_snapshot(tmp, 100, 105); + create_committed_snapshot(tmp, 300, 310); + create_committed_snapshot(tmp, 200, 210); + + auto committed_opt = find_committed_snapshots(tmp); + REQUIRE(committed_opt.has_value()); + auto& committed = committed_opt.value(); + auto result = highest_committed_snapshot_seqno(committed); + REQUIRE(result.has_value()); + CHECK(result.value() == 300); + + fs::remove_all(tmp); +} + +TEST_CASE( + "highest_committed_snapshot_seqno: returns nullopt for empty directory") +{ + auto tmp = make_unique_test_dir("test_snap_watermark_empty"); + + auto committed_opt = find_committed_snapshots(tmp); + REQUIRE(committed_opt.has_value()); + auto& committed = committed_opt.value(); + auto result = highest_committed_snapshot_seqno(committed); + CHECK_FALSE(result.has_value()); + + fs::remove_all(tmp); +} + +TEST_CASE("highest_committed_snapshot_seqno: ignores uncommitted snapshots") +{ + auto tmp = make_unique_test_dir("test_snap_watermark_uncommitted"); + + // Uncommitted snapshot (no .committed suffix) + write_file(tmp / "snapshot_500_510", "data"); + create_committed_snapshot(tmp, 200, 210); + + auto committed_opt = find_committed_snapshots(tmp); + REQUIRE(committed_opt.has_value()); + auto& committed = committed_opt.value(); + auto result = highest_committed_snapshot_seqno(committed); + REQUIRE(result.has_value()); + CHECK(result.value() == 200); + + fs::remove_all(tmp); +} + +// ---- snapshot watermark in cleanup_old_ledger_chunks tests ---- + +TEST_CASE( + "cleanup_old_ledger_chunks: watermark prevents deletion of recent chunks") +{ + auto tmp = make_unique_test_dir("test_ledger_watermark"); + auto main_dir = tmp / "main"; + auto ro_dir = tmp / "ro"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir); + + // Create 5 committed chunks: 1-100, 101-200, 201-300, 301-400, 401-500 + for (size_t i = 0; i < 5; ++i) + { + auto start = i * 100 + 1; + auto end = (i + 1) * 100; + auto content = fmt::format("chunk_{}", i); + create_committed_chunk(main_dir, start, end, content); + create_committed_chunk(ro_dir, start, end, content); + } + + std::vector ro_dirs = {ro_dir}; + // Keep only 1, but snapshot watermark at 250 protects chunks ending >= 250 + // Chunks 1-100 and 101-200 end below 250, so eligible for deletion + // Chunks 201-300, 301-400, 401-500 end >= 250, protected + cleanup_old_ledger_chunks(main_dir, ro_dirs, 1, 250); + + auto remaining = find_committed_ledger_chunks(main_dir); + // 1-100 deleted, 101-200 deleted, 201-300 kept (watermark), 301-400 kept, + // 401-500 kept (within retention) + REQUIRE(remaining.size() == 3); + CHECK(remaining[0].first == 201); + CHECK(remaining[1].first == 301); + CHECK(remaining[2].first == 401); + + fs::remove_all(tmp); +} + +TEST_CASE( + "cleanup_old_ledger_chunks: watermark at exact chunk boundary protects it") +{ + auto tmp = make_unique_test_dir("test_ledger_watermark_exact"); + auto main_dir = tmp / "main"; + auto ro_dir = tmp / "ro"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir); + + for (size_t i = 0; i < 4; ++i) + { + auto start = i * 100 + 1; + auto end = (i + 1) * 100; + auto content = fmt::format("chunk_{}", i); + create_committed_chunk(main_dir, start, end, content); + create_committed_chunk(ro_dir, start, end, content); + } + + std::vector ro_dirs = {ro_dir}; + // Watermark at 200 (exactly matching end of chunk 101-200) + // Chunk 1-100 ends at 100 < 200, eligible for deletion + // Chunk 101-200 ends at 200 >= 200, protected + cleanup_old_ledger_chunks(main_dir, ro_dirs, 1, 200); + + auto remaining = find_committed_ledger_chunks(main_dir); + REQUIRE(remaining.size() == 3); + CHECK(remaining[0].first == 101); // kept by watermark + CHECK(remaining[1].first == 201); + CHECK(remaining[2].first == 301); // kept by retention + + fs::remove_all(tmp); +} + +TEST_CASE("cleanup_old_ledger_chunks: no watermark allows normal deletion") +{ + auto tmp = make_unique_test_dir("test_ledger_no_watermark"); + auto main_dir = tmp / "main"; + auto ro_dir = tmp / "ro"; + fs::create_directories(main_dir); + fs::create_directories(ro_dir); + + for (size_t i = 0; i < 4; ++i) + { + auto start = i * 100 + 1; + auto end = (i + 1) * 100; + auto content = fmt::format("chunk_{}", i); + create_committed_chunk(main_dir, start, end, content); + create_committed_chunk(ro_dir, start, end, content); + } + + std::vector ro_dirs = {ro_dir}; + // No watermark - all backed-up chunks eligible + cleanup_old_ledger_chunks(main_dir, ro_dirs, 1, std::nullopt); + + auto remaining = find_committed_ledger_chunks(main_dir); + REQUIRE(remaining.size() == 1); + CHECK(remaining[0].first == 301); + + fs::remove_all(tmp); +} + +// ---- FilesCleanupImpl constructor tests ---- + +TEST_CASE( + "FilesCleanupImpl: constructor rejects ledger cleanup without read-only dirs") +{ + CHECK_THROWS_AS( + FilesCleanupImpl( + "/tmp/snapshots", + std::nullopt, + "/tmp/ledger", + {}, // no read-only dirs + 3 // but max_committed_ledger_chunks is set + ), + std::logic_error); +} + +TEST_CASE( + "FilesCleanupImpl: constructor accepts ledger cleanup with read-only dirs") +{ + CHECK_NOTHROW(FilesCleanupImpl( + "/tmp/snapshots", std::nullopt, "/tmp/ledger", {"/tmp/ro"}, 3)); +} + +TEST_CASE("FilesCleanupImpl: constructor rejects max_snapshots < 1") +{ + CHECK_THROWS_AS( + FilesCleanupImpl( + "/tmp/snapshots", + 0, // max_snapshots = 0 + "/tmp/ledger", + {}, + std::nullopt), + std::logic_error); +} + +TEST_CASE("FilesCleanupImpl: constructor accepts both cleanup options together") +{ + CHECK_NOTHROW( + FilesCleanupImpl("/tmp/snapshots", 2, "/tmp/ledger", {"/tmp/ro"}, 3)); +} + +TEST_CASE("FilesCleanupImpl: constructor accepts all nullopt (no cleanup)") +{ + CHECK_NOTHROW(FilesCleanupImpl( + "/tmp/snapshots", std::nullopt, "/tmp/ledger", {}, std::nullopt)); +} + +// ---- ledger_filenames.h tests ---- + +TEST_CASE("get_start_idx_from_file_name: parses start index") +{ + CHECK(get_start_idx_from_file_name("ledger_42-100.committed") == 42); + CHECK(get_start_idx_from_file_name("ledger_1") == 1); + CHECK(get_start_idx_from_file_name("ledger_0") == 0); +} + +TEST_CASE("get_start_idx_from_file_name: throws on missing delimiter") +{ + CHECK_THROWS_AS( + get_start_idx_from_file_name("nodelimiter"), std::logic_error); +} + +TEST_CASE("get_last_idx_from_file_name: parses last index") +{ + auto result = get_last_idx_from_file_name("ledger_1-100.committed"); + REQUIRE(result.has_value()); + CHECK(result.value() == 100); +} + +TEST_CASE("get_last_idx_from_file_name: returns nullopt for uncommitted files") +{ + auto result = get_last_idx_from_file_name("ledger_1"); + CHECK_FALSE(result.has_value()); +} + +TEST_CASE("is_ledger_file_name_committed: detects committed suffix") +{ + CHECK(is_ledger_file_name_committed("ledger_1-100.committed")); + CHECK_FALSE(is_ledger_file_name_committed("ledger_1")); + CHECK_FALSE(is_ledger_file_name_committed("ledger_1-100.committed.recovery")); + CHECK_FALSE(is_ledger_file_name_committed("ledger_1-100.committed.ignored")); +} + +TEST_CASE("is_ledger_file_name_recovery: detects recovery suffix") +{ + CHECK(is_ledger_file_name_recovery("ledger_1-100.committed.recovery")); + CHECK_FALSE(is_ledger_file_name_recovery("ledger_1-100.committed")); +} + +TEST_CASE("is_ledger_file_name_ignored: detects ignored suffix") +{ + CHECK(is_ledger_file_name_ignored("ledger_1-100.committed.ignored")); + CHECK_FALSE(is_ledger_file_name_ignored("ledger_1-100.committed")); +} + +int main(int argc, char** argv) +{ + ccf::logger::config::default_init(); + doctest::Context context; + context.applyCommandLine(argc, argv); + int res = context.run(); + if (context.shouldExit()) + return res; + return res; +} diff --git a/tests/config.jinja b/tests/config.jinja index 447807d32557..d6a55eed8afb 100644 --- a/tests/config.jinja +++ b/tests/config.jinja @@ -73,10 +73,11 @@ "target_rpc_interface": "{{ backup_snapshot_fetch_target_rpc_interface }}"{% endif %}{% if backup_snapshot_fetch_max_size %}, "max_size": "{{ backup_snapshot_fetch_max_size }}"{% endif %} }{% endif %} - },{% if files_cleanup_max_snapshots or files_cleanup_interval %} + },{% if files_cleanup_max_snapshots or files_cleanup_max_committed_ledger_chunks or files_cleanup_interval %} "files_cleanup": { - {% if files_cleanup_max_snapshots %}"max_snapshots": {{ files_cleanup_max_snapshots }}{% endif %}{% if files_cleanup_max_snapshots and files_cleanup_interval %}, + {% if files_cleanup_max_snapshots %}"max_snapshots": {{ files_cleanup_max_snapshots }}{% endif %}{% if files_cleanup_max_snapshots and (files_cleanup_max_committed_ledger_chunks or files_cleanup_interval) %}, + {% endif %}{% if files_cleanup_max_committed_ledger_chunks %}"max_committed_ledger_chunks": {{ files_cleanup_max_committed_ledger_chunks }}{% endif %}{% if files_cleanup_max_committed_ledger_chunks and files_cleanup_interval %}, {% endif %}{% if files_cleanup_interval %}"interval": "{{ files_cleanup_interval }}"{% endif %} },{% endif %} "logging": diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index fe132ab51111..fe75ac4f6914 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -3492,12 +3492,533 @@ def run_backup_snapshot_cleanup(const_args): test_backup_snapshot_cleanup(network, args) +def test_max_committed_ledger_chunk_files(network, args, read_only_ledger_dir): + """ + Verify that the periodic cleanup timer deletes committed ledger chunks + from the main ledger directory down to the configured retention limit. + Chunks are copied to the read-only dir so the digest check passes. + """ + max_retained = args.files_cleanup_max_committed_ledger_chunks + primary, _ = network.find_primary() + main_ledger_dir = primary.get_main_ledger_dir() + + def get_committed_chunks(): + return sorted( + [ + f + for f in os.listdir(main_ledger_dir) + if f.startswith("ledger_") and ccf.ledger.is_ledger_chunk_committed(f) + ], + key=lambda f: ccf.ledger.range_from_filename(f)[0], + ) + + def copy_new_committed_to_readonly(): + """Copy any committed chunks from main dir to read-only dir.""" + for f in os.listdir(main_ledger_dir): + if f.startswith("ledger_") and ccf.ledger.is_ledger_chunk_committed(f): + dst = os.path.join(read_only_ledger_dir, f) + if not os.path.exists(dst): + shutil.copy2(os.path.join(main_ledger_dir, f), dst) + + def wait_for_cleanup(max_count, timeout=15): + end_time = time.time() + timeout + while time.time() < end_time: + committed = get_committed_chunks() + if len(committed) <= max_count: + return committed + time.sleep(0.5) + current = get_committed_chunks() + raise TimeoutError( + f"Timed out waiting for committed chunk count to drop to {max_count} in " + f"{main_ledger_dir}. Found {len(current)}: {sorted(current)}" + ) + + initial_committed = get_committed_chunks() + LOG.info(f"Initial committed chunks: {len(initial_committed)}") + + # Generate more committed chunks than the retention limit + num_chunks_to_create = max_retained + 4 + for i in range(num_chunks_to_create): + LOG.info(f"Forcing ledger chunk {i + 1}/{num_chunks_to_create}") + network.txs.issue(network, number_txs=3) + network.consortium.force_ledger_chunk(primary) + network.txs.issue(network, number_txs=3) + # Copy committed chunks to read-only dir so digest check passes + copy_new_committed_to_readonly() + + # Wait for committed chunks to appear then for cleanup to prune + time.sleep(2) + committed_after = get_committed_chunks() + LOG.info(f"Committed chunks after issuing: {len(committed_after)}") + + if len(committed_after) > max_retained: + committed_after = wait_for_cleanup(max_retained) + + assert len(committed_after) <= max_retained, ( + f"Expected at most {max_retained} committed chunks, " + f"found {len(committed_after)}: {committed_after}" + ) + + # Verify the retained chunks are the newest + all_start_seqnos = [ccf.ledger.range_from_filename(f)[0] for f in committed_after] + assert all_start_seqnos == sorted( + all_start_seqnos + ), f"Retained chunks are not sorted: {all_start_seqnos}" + + return network + + +def run_max_committed_ledger_chunk_files(const_args): + args = copy.deepcopy(const_args) + args.label = f"{args.label}_max_committed_ledger_chunks" + args.ledger_chunk_bytes = "20KB" + args.files_cleanup_max_committed_ledger_chunks = 3 + args.files_cleanup_interval = "1s" + + with tempfile.TemporaryDirectory() as tmp_dir: + args.common_read_only_ledger_dir = tmp_dir + + with infra.network.network( + args.nodes, + args.binary_dir, + args.debug_nodes, + pdb=args.pdb, + txs=app.LoggingTxs("user0"), + ) as network: + network.start_and_open(args) + + # Copy all existing committed chunks to read-only dir so they can be pruned + primary, _ = network.find_primary() + main_ledger_dir = primary.get_main_ledger_dir() + for f in os.listdir(main_ledger_dir): + if f.startswith("ledger_") and ccf.ledger.is_ledger_chunk_committed(f): + shutil.copy2( + os.path.join(main_ledger_dir, f), + os.path.join(tmp_dir, f), + ) + + test_max_committed_ledger_chunk_files(network, args, tmp_dir) + + +def test_ledger_chunk_cleanup_with_read_only_dir(network, args): + """ + When read-only ledger directories are configured, only committed ledger + chunks that have an identical copy (by SHA-256 digest) in a read-only + directory should be deleted. Chunks not present there must be kept. + """ + max_retained = args.files_cleanup_max_committed_ledger_chunks + primary, _ = network.find_primary() + main_ledger_dir = primary.get_main_ledger_dir() + + # The read-only ledger dir for this node + read_only_ledger_dir = os.path.join( + primary.remote.remote.root, + primary.remote.read_only_ledger_dirs_names[0], + ) + os.makedirs(read_only_ledger_dir, exist_ok=True) + + def get_committed_chunks(d): + return sorted( + [ + f + for f in os.listdir(d) + if f.startswith("ledger_") and ccf.ledger.is_ledger_chunk_committed(f) + ], + key=lambda f: ccf.ledger.range_from_filename(f)[0], + ) + + # Generate lots of committed chunks + num_chunks = max_retained + 5 + for i in range(num_chunks): + network.txs.issue(network, number_txs=3) + network.consortium.force_ledger_chunk(primary) + network.txs.issue(network, number_txs=3) + + time.sleep(1) + + # Pause cleanup by collecting committed chunks before they're pruned. + # Copy only some of the oldest chunks to read-only dir. + committed = get_committed_chunks(main_ledger_dir) + LOG.info(f"Committed chunks before backup: {len(committed)}") + + # Copy the 2 oldest committed chunks to read-only dir + num_to_backup = 2 + backed_up = [] + for f in committed[:num_to_backup]: + src = os.path.join(main_ledger_dir, f) + dst = os.path.join(read_only_ledger_dir, f) + shutil.copy2(src, dst) + backed_up.append(f) + LOG.info(f"Backed up {f} to read-only dir") + + # Wait for the cleanup timer to process + timeout = 20 + end_time = time.time() + timeout + while time.time() < end_time: + current = get_committed_chunks(main_ledger_dir) + if len(current) <= max_retained + ( + len(committed) - max_retained - num_to_backup + ): + break + time.sleep(0.5) + + current = get_committed_chunks(main_ledger_dir) + LOG.info(f"Committed chunks after cleanup: {len(current)}, files: {current}") + + # The backed-up chunks should have been deleted from main dir + for f in backed_up: + assert f not in current, ( + f"Chunk {f} was backed up to read-only dir but was not deleted " + f"from main dir. Current: {current}" + ) + + # Chunks not in read-only dir that were over the limit should still be present + # (they couldn't be safely deleted) + not_backed_up_over_limit = [f for f in committed if f not in backed_up][ + : len(committed) - max_retained - num_to_backup + ] + for f in not_backed_up_over_limit: + assert f in current, ( + f"Chunk {f} was NOT in read-only dir and should have been kept, " + f"but was deleted. Current: {current}" + ) + + # Clean up read-only dir so leftover files don't interfere with shutdown + # ledger validation + for f in os.listdir(read_only_ledger_dir): + try: + os.remove(os.path.join(read_only_ledger_dir, f)) + except OSError as e: + LOG.warning(f"Failed to clean up read-only ledger file {f}: {e}") + + return network + + +def test_ledger_chunk_cleanup_digest_mismatch(network, args): + """ + When a committed chunk exists in the read-only directory but its digest + does not match the local copy, the chunk must NOT be deleted. + """ + max_retained = args.files_cleanup_max_committed_ledger_chunks + primary, _ = network.find_primary() + main_ledger_dir = primary.get_main_ledger_dir() + + read_only_ledger_dir = os.path.join( + primary.remote.remote.root, + primary.remote.read_only_ledger_dirs_names[0], + ) + os.makedirs(read_only_ledger_dir, exist_ok=True) + + def get_committed_chunks(d): + return sorted( + [ + f + for f in os.listdir(d) + if f.startswith("ledger_") and ccf.ledger.is_ledger_chunk_committed(f) + ], + key=lambda f: ccf.ledger.range_from_filename(f)[0], + ) + + # Generate committed chunks + num_chunks = max_retained + 3 + for i in range(num_chunks): + network.txs.issue(network, number_txs=3) + network.consortium.force_ledger_chunk(primary) + network.txs.issue(network, number_txs=3) + + time.sleep(1) + committed = get_committed_chunks(main_ledger_dir) + LOG.info(f"Committed chunks: {len(committed)}") + + if len(committed) <= max_retained: + LOG.warning("Not enough committed chunks to test cleanup, skipping") + return network + + # Copy oldest chunk to read-only dir, but corrupt it + target_chunk = committed[0] + src = os.path.join(main_ledger_dir, target_chunk) + dst = os.path.join(read_only_ledger_dir, target_chunk) + shutil.copy2(src, dst) + + # Corrupt the read-only copy by flipping a byte + with open(dst, "r+b") as f: + f.seek(0) + original_byte = f.read(1) + f.seek(0) + f.write(bytes([original_byte[0] ^ 0xFF])) + + LOG.info(f"Corrupted read-only copy of {target_chunk}") + + # Wait for cleanup timer + time.sleep(3) + + current = get_committed_chunks(main_ledger_dir) + LOG.info(f"Committed chunks after cleanup: {len(current)}") + + # The target chunk should still exist because the digest didn't match + assert target_chunk in current, ( + f"Chunk {target_chunk} was deleted despite digest mismatch with " + f"read-only copy. Current: {current}" + ) + + # Clean up corrupted read-only copy so it doesn't interfere with shutdown + # ledger validation + for f in os.listdir(read_only_ledger_dir): + try: + os.remove(os.path.join(read_only_ledger_dir, f)) + except OSError as e: + LOG.warning(f"Failed to clean up read-only ledger file {f}: {e}") + + return network + + +def test_post_snapshot_chunks_retained(network, args, read_only_ledger_dir): + """ + Verify that committed ledger chunks created after the latest snapshot are + NOT deleted even when the total count exceeds max_committed_ledger_chunks. + The cleanup logic must preserve all chunks newer than the snapshot watermark + so that a node can still recover from that snapshot. + """ + max_retained = args.files_cleanup_max_committed_ledger_chunks + primary, _ = network.find_primary() + main_ledger_dir = primary.get_main_ledger_dir() + snapshots_dir = os.path.join( + primary.remote.remote.root, + primary.remote.snapshots_dir_name, + ) + + def get_committed_chunks(): + return sorted( + [ + f + for f in os.listdir(main_ledger_dir) + if f.startswith("ledger_") and ccf.ledger.is_ledger_chunk_committed(f) + ], + key=lambda f: ccf.ledger.range_from_filename(f)[0], + ) + + def copy_new_committed_to_readonly(): + for f in os.listdir(main_ledger_dir): + if f.startswith("ledger_") and ccf.ledger.is_ledger_chunk_committed(f): + dst = os.path.join(read_only_ledger_dir, f) + if not os.path.exists(dst): + shutil.copy2(os.path.join(main_ledger_dir, f), dst) + + def get_latest_committed_snapshot_seqno(): + best = None + for f in os.listdir(snapshots_dir): + if f.startswith("snapshot_") and ccf.ledger.is_snapshot_file_committed(f): + seqno, _ = ccf.ledger.snapshot_index_from_filename(f) + if best is None or seqno > best: + best = seqno + return best + + # Step 1: Generate some chunks and trigger a snapshot so we have a watermark + LOG.info("Generating initial chunks and triggering a snapshot") + for _ in range(3): + network.txs.issue(network, number_txs=3) + network.consortium.force_ledger_chunk(primary) + network.txs.issue(network, number_txs=3) + copy_new_committed_to_readonly() + + primary.trigger_snapshot() + # Issue enough txs to advance commit past the snapshot + network.txs.issue(network, number_txs=3) + network.consortium.force_ledger_chunk(primary) + network.txs.issue(network, number_txs=3) + + # Wait for the snapshot to appear + timeout = 10 + end_time = time.time() + timeout + snapshot_seqno = None + while time.time() < end_time: + snapshot_seqno = get_latest_committed_snapshot_seqno() + if snapshot_seqno is not None: + break + time.sleep(0.5) + assert ( + snapshot_seqno is not None + ), f"Timed out waiting for a committed snapshot in {snapshots_dir}" + LOG.info(f"Latest committed snapshot seqno: {snapshot_seqno}") + + # Step 2: Generate many chunks AFTER the snapshot so they exceed max_retained + num_post_snapshot_chunks = max_retained + 3 + LOG.info( + f"Generating {num_post_snapshot_chunks} chunks after snapshot " + f"(max_retained={max_retained})" + ) + for i in range(num_post_snapshot_chunks): + network.txs.issue(network, number_txs=3) + network.consortium.force_ledger_chunk(primary) + network.txs.issue(network, number_txs=3) + copy_new_committed_to_readonly() + + # Step 3: Re-query the latest snapshot seqno, since new snapshots may have + # been auto-triggered during Step 2. The C++ cleanup uses the latest + # snapshot as its watermark, so the test must compare against the same value. + snapshot_seqno = get_latest_committed_snapshot_seqno() + assert snapshot_seqno is not None + LOG.info(f"Latest committed snapshot seqno (post Step 2): {snapshot_seqno}") + + # Step 4: Let the cleanup timer run + time.sleep(3) + + # Step 5: Verify that all chunks after the snapshot watermark are retained + committed = get_committed_chunks() + LOG.info(f"Committed chunks after cleanup: {len(committed)}") + + post_snapshot_chunks = [ + f + for f in committed + if ccf.ledger.range_from_filename(f)[1] is not None + and ccf.ledger.range_from_filename(f)[1] >= snapshot_seqno + ] + + LOG.info( + f"Post-snapshot chunks (end >= {snapshot_seqno}): " + f"{len(post_snapshot_chunks)}" + ) + + # There must be more post-snapshot chunks than max_retained to prove + # that the watermark prevented deletion + assert len(post_snapshot_chunks) > max_retained, ( + f"Expected more than {max_retained} post-snapshot chunks to be retained, " + f"but only found {len(post_snapshot_chunks)}: {post_snapshot_chunks}. " + f"Snapshot watermark: {snapshot_seqno}" + ) + + # All post-snapshot chunks must still be present + for chunk in post_snapshot_chunks: + chunk_start, chunk_end = ccf.ledger.range_from_filename(chunk) + assert chunk_end >= snapshot_seqno, ( + f"Post-snapshot chunk {chunk} (end={chunk_end}) should not have " + f"been deleted (snapshot watermark={snapshot_seqno})" + ) + + return network + + +def run_post_snapshot_chunk_retention(const_args): + args = copy.deepcopy(const_args) + args.label = f"{args.label}_post_snapshot_chunk_retention" + args.ledger_chunk_bytes = "20KB" + # Use a very low retention limit so cleanup would aggressively delete + # if the snapshot watermark were not respected + args.files_cleanup_max_committed_ledger_chunks = 1 + args.files_cleanup_interval = "1s" + args.snapshot_tx_interval = 30 + + with tempfile.TemporaryDirectory() as tmp_dir: + args.common_read_only_ledger_dir = tmp_dir + + with infra.network.network( + args.nodes, + args.binary_dir, + args.debug_nodes, + pdb=args.pdb, + txs=app.LoggingTxs("user0"), + ) as network: + network.start_and_open(args) + + primary, _ = network.find_primary() + main_ledger_dir = primary.get_main_ledger_dir() + for f in os.listdir(main_ledger_dir): + if f.startswith("ledger_") and ccf.ledger.is_ledger_chunk_committed(f): + shutil.copy2( + os.path.join(main_ledger_dir, f), + os.path.join(tmp_dir, f), + ) + + test_post_snapshot_chunks_retained(network, args, tmp_dir) + + +def run_ledger_cleanup_no_read_only_dir_check(const_args): + """ + Verify that a node fails to start when max_committed_ledger_chunks is + configured but no read-only ledger directory is provided. + """ + args = copy.deepcopy(const_args) + args.label = f"{args.label}_ledger_cleanup_no_ro_dir" + args.nodes = infra.e2e_args.nodes(args, 1) + args.files_cleanup_max_committed_ledger_chunks = 3 + args.files_cleanup_interval = "1s" + + with infra.network.network( + args.nodes, + args.binary_dir, + args.debug_nodes, + pdb=args.pdb, + ) as network: + try: + network.start(args) + except Exception: + pass + + network.skip_verify_chunking = True + network.ignore_errors_on_shutdown() + + node = network.nodes[0] + expected_msg = ( + "files_cleanup.max_committed_ledger_chunks requires at least one " + "ledger.read_only_directories entry" + ) + _, err_path = node.get_logs() + found = False + if err_path is not None: + with open(err_path, "r") as f: + for line in f: + if expected_msg in line: + found = True + break + if not found: + raise AssertionError( + "Expected node error message about missing read-only ledger " + "directory when max_committed_ledger_chunks is configured" + ) + + LOG.success( + "Node correctly refused to start without read-only ledger directory" + ) + + +def run_ledger_chunk_cleanup_tests(const_args): + args = copy.deepcopy(const_args) + args.label = f"{args.label}_ledger_chunk_cleanup" + args.ledger_chunk_bytes = "20KB" + args.files_cleanup_max_committed_ledger_chunks = 3 + args.files_cleanup_interval = "1s" + + # Use a common read-only ledger dir for the read-only dir tests + with tempfile.TemporaryDirectory() as tmp_dir: + args.common_read_only_ledger_dir = tmp_dir + + with infra.network.network( + args.nodes, + args.binary_dir, + args.debug_nodes, + pdb=args.pdb, + txs=app.LoggingTxs("user0"), + ) as network: + network.start_and_open(args) + # These tests intentionally produce [fail] log lines when chunks + # are kept because no matching read-only copy exists, or because + # the digest does not match. + network.ignore_error_pattern_on_shutdown("Keeping ledger chunk") + network.ignore_error_pattern_on_shutdown("digest does not match") + test_ledger_chunk_cleanup_with_read_only_dir(network, args) + test_ledger_chunk_cleanup_digest_mismatch(network, args) + + def run(args): run_max_uncommitted_tx_count(args) run_file_operations(args) run_manual_snapshot_tests(args) run_max_retained_snapshot_files(args) run_backup_snapshot_cleanup(args) + run_max_committed_ledger_chunk_files(args) + run_post_snapshot_chunk_retention(args) + run_ledger_cleanup_no_read_only_dir_check(args) + run_ledger_chunk_cleanup_tests(args) run_tls_san_checks(args) run_config_timeout_check(args) run_configuration_file_checks(args) diff --git a/tests/infra/network.py b/tests/infra/network.py index 2d2e635f0526..b489a60ea5e8 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -195,6 +195,7 @@ class Network: "snapshot_min_tx_interval", "snapshot_time_interval", "files_cleanup_max_snapshots", + "files_cleanup_max_committed_ledger_chunks", "files_cleanup_interval", "max_open_sessions", "max_open_sessions_hard",