diff --git a/.gitignore b/.gitignore index c2568c1af..6d3f92716 100644 --- a/.gitignore +++ b/.gitignore @@ -198,3 +198,4 @@ mooncake-wheel/mooncake/transfer_engine_bench # Claude Code Memory CLAUDE.md +_codeql_detected_source_root diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index 158487d5b..63986a49e 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -3,7 +3,10 @@ #include #include +#include +#include #include +#include #include "transfer_engine.h" namespace mooncake { @@ -299,26 +302,57 @@ void TransferEngineOperationState::wait_for_completion() { const int64_t start_ts = getCurrentTimeInNano(); + // Adaptive backoff parameters: start with 1ms, max at 100ms + constexpr auto kInitialBackoffMs = std::chrono::milliseconds(1); + constexpr auto kMaxBackoffMs = std::chrono::milliseconds(100); + constexpr double kBackoffMultiplier = 1.5; + auto current_backoff = kInitialBackoffMs; + while (true) { - if (getCurrentTimeInNano() - start_ts > - timeout_seconds * kOneSecondInNano) { + int64_t elapsed_ns = getCurrentTimeInNano() - start_ts; + if (elapsed_ns > timeout_seconds * kOneSecondInNano) { LOG(ERROR) << "Failed to complete transfers after " << timeout_seconds << " seconds for batch " << batch_id_; set_result_internal(ErrorCode::TRANSFER_FAIL); return; } - std::unique_lock lock(mutex_); - check_task_status(); - if (result_.has_value()) { - VLOG(1) << "Transfer engine operation completed for batch " - << batch_id_ - << " with result: " << static_cast(result_.value()); - break; + { + std::unique_lock lock(mutex_); + check_task_status(); + if (result_.has_value()) { + VLOG(1) << "Transfer engine operation completed for batch " + << batch_id_ + << " with result: " << static_cast(result_.value()); + return; + } + + // Calculate remaining timeout for condition variable wait + auto remaining_timeout = std::chrono::nanoseconds( + timeout_seconds * kOneSecondInNano - elapsed_ns); + auto wait_duration = std::min(current_backoff, + std::chrono::duration_cast(remaining_timeout)); + + // Wait with adaptive backoff using condition variable + // This allows early wake-up if set_result_internal is called + if (cv_.wait_for(lock, wait_duration, + [this] { return result_.has_value(); })) { + VLOG(1) << "Transfer engine operation completed for batch " + << batch_id_ + << " with result: " << static_cast(result_.value()); + return; + } } - // Continue polling + + // Increase backoff for next iteration (exponential backoff with cap) + int64_t new_backoff_ms = static_cast( + std::ceil(current_backoff.count() * kBackoffMultiplier)); + current_backoff = std::chrono::milliseconds( + std::min(new_backoff_ms, kMaxBackoffMs.count())); + VLOG(1) << "Transfer engine operation still pending for batch " - << batch_id_; + << batch_id_ << ", backing off for " + << current_backoff.count() << "ms"; } }