Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,4 @@ mooncake-wheel/mooncake/transfer_engine_bench

# Claude Code Memory
CLAUDE.md
_codeql_detected_source_root
56 changes: 45 additions & 11 deletions mooncake-store/src/transfer_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
#include <glog/logging.h>

#include <algorithm>
#include <cmath>
#include <chrono>
#include <cstdlib>
#include <thread>
#include "transfer_engine.h"

namespace mooncake {
Expand Down Expand Up @@ -299,26 +302,57 @@ void TransferEngineOperationState::wait_for_completion() {

const int64_t start_ts = getCurrentTimeInNano();

// Adaptive backoff parameters: start with 1ms, max at 100ms
constexpr auto kInitialBackoffMs = std::chrono::milliseconds(1);
constexpr auto kMaxBackoffMs = std::chrono::milliseconds(100);
constexpr double kBackoffMultiplier = 1.5;
auto current_backoff = kInitialBackoffMs;

while (true) {
if (getCurrentTimeInNano() - start_ts >
timeout_seconds * kOneSecondInNano) {
int64_t elapsed_ns = getCurrentTimeInNano() - start_ts;
if (elapsed_ns > timeout_seconds * kOneSecondInNano) {
LOG(ERROR) << "Failed to complete transfers after "
<< timeout_seconds << " seconds for batch " << batch_id_;
set_result_internal(ErrorCode::TRANSFER_FAIL);
return;
}

std::unique_lock<std::mutex> lock(mutex_);
check_task_status();
if (result_.has_value()) {
VLOG(1) << "Transfer engine operation completed for batch "
<< batch_id_
<< " with result: " << static_cast<int>(result_.value());
break;
{
std::unique_lock<std::mutex> lock(mutex_);
check_task_status();
if (result_.has_value()) {
VLOG(1) << "Transfer engine operation completed for batch "
<< batch_id_
<< " with result: " << static_cast<int>(result_.value());
return;
}

// Calculate remaining timeout for condition variable wait
auto remaining_timeout = std::chrono::nanoseconds(
timeout_seconds * kOneSecondInNano - elapsed_ns);
auto wait_duration = std::min(current_backoff,
std::chrono::duration_cast<std::chrono::milliseconds>(remaining_timeout));

// Wait with adaptive backoff using condition variable
// This allows early wake-up if set_result_internal is called
if (cv_.wait_for(lock, wait_duration,
[this] { return result_.has_value(); })) {
VLOG(1) << "Transfer engine operation completed for batch "
<< batch_id_
<< " with result: " << static_cast<int>(result_.value());
return;
}
}
// Continue polling

// Increase backoff for next iteration (exponential backoff with cap)
int64_t new_backoff_ms = static_cast<int64_t>(
std::ceil(current_backoff.count() * kBackoffMultiplier));
current_backoff = std::chrono::milliseconds(
std::min(new_backoff_ms, kMaxBackoffMs.count()));

VLOG(1) << "Transfer engine operation still pending for batch "
<< batch_id_;
<< batch_id_ << ", backing off for "
<< current_backoff.count() << "ms";
}
}

Expand Down
Loading