Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XrdTpc: Add a crude timeout mechanism. #1392

Merged
merged 2 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/XrdTpc/XrdTpcConfigure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <dlfcn.h>
#include <fcntl.h>

#include "XrdOuc/XrdOuca2x.hh"
#include "XrdOuc/XrdOucEnv.hh"
#include "XrdOuc/XrdOucStream.hh"
#include "XrdOuc/XrdOucPinPath.hh"
Expand Down Expand Up @@ -52,6 +53,17 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv)
Config.Close();
return false;
}
} else if (!strcmp("tpc.timeout", val)) {
if (!(val = Config.GetWord())) {
m_log.Emsg("Config","tpc.timeout value not specified."); return false;
}
if (XrdOuca2x::a2tm(m_log, "timeout value", val, &m_timeout, 0)) return false;
// First byte timeout can be set separately from the continuous timeout.
if ((val = Config.GetWord())) {
if (XrdOuca2x::a2tm(m_log, "first byte timeout value", val, &m_first_timeout, 0)) return false;
} else {
m_first_timeout = 2*m_timeout;
}
}
}
Config.Close();
Expand Down
28 changes: 26 additions & 2 deletions src/XrdTpc/XrdTpcMultistream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,18 @@ class MultiCurlHandler {
return m_error_code;
}

void SetErrorCode(int error_code) {
m_error_code = error_code;
}

std::string GetErrorMessage() const {
return m_error_message;
}

void SetErrorMessage(const std::string &error_msg) {
m_error_message = error_msg;
}

private:

bool StartTransfer(off_t offset, size_t size) {
Expand Down Expand Up @@ -304,17 +312,33 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
// Transfer loop: use curl to actually run the transfer, but periodically
// interrupt things to send back performance updates to the client.
time_t last_marker = 0;
// Track the time since the transfer last made progress
off_t last_advance_bytes = 0;
time_t last_advance_time = time(NULL);
time_t transfer_start = last_advance_time;
CURLcode res = static_cast<CURLcode>(-1);
CURLMcode mres;
CURLMcode mres = CURLM_OK;
do {
time_t now = time(NULL);
time_t next_marker = last_marker + m_marker_period;
if (now >= next_marker) {
if (current_offset > last_advance_bytes) {
last_advance_bytes = current_offset;
last_advance_time = now;
}
if (SendPerfMarker(req, rec, handles, current_offset)) {
logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
"Failed to send a perf marker to the TPC client");
return -1;
}
int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
if (now > last_advance_time + timeout) {
mch.SetErrorCode(10);
std::stringstream ss;
ss << "Transfer failed because no bytes have been received in " << timeout << " seconds.";
mch.SetErrorMessage(ss.str());
break;
}
last_marker = now;
}

Expand Down Expand Up @@ -406,7 +430,7 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
}
} while (msg);

if (res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
"Internal state error in libcurl");
throw std::runtime_error("Internal state error in libcurl");
Expand Down
4 changes: 4 additions & 0 deletions src/XrdTpc/XrdTpcState.hh
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,14 @@ public:

int GetErrorCode() const {return m_error_code;}

void SetErrorCode(int error_code) {m_error_code = error_code;}

int GetStatusCode() const {return m_status_code;}

std::string GetErrorMessage() const {return m_error_buf;}

void SetErrorMessage(const std::string &error_msg) {m_error_buf = error_msg;}

void ResetAfterRequest();

CURL *GetHandle() const {return m_curl;}
Expand Down
21 changes: 20 additions & 1 deletion src/XrdTpc/XrdTpcTPC.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ TPCHandler::~TPCHandler() {

TPCHandler::TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv) :
m_desthttps(false),
m_timeout(60),
m_first_timeout(120),
m_log(log->logger(), "TPC_"),
m_sfs(NULL)
{
Expand Down Expand Up @@ -389,11 +391,20 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
// interrupt things to send back performance updates to the client.
int running_handles = 1;
time_t last_marker = 0;
// Track how long it's been since the last time we recorded more bytes being transferred.
off_t last_advance_bytes = 0;
time_t last_advance_time = time(NULL);
time_t transfer_start = last_advance_time;
CURLcode res = static_cast<CURLcode>(-1);
do {
time_t now = time(NULL);
time_t next_marker = last_marker + m_marker_period;
if (now >= next_marker) {
off_t bytes_xfer = state.BytesTransferred();
if (bytes_xfer > last_advance_bytes) {
last_advance_bytes = bytes_xfer;
last_advance_time = now;
}
if (SendPerfMarker(req, rec, state)) {
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);
Expand All @@ -402,6 +413,14 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
"Failed to send a perf marker to the TPC client");
return -1;
}
int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
if (now > last_advance_time + timeout) {
state.SetErrorCode(10);
std::stringstream ss;
ss << "Transfer failed because no bytes have been received in " << timeout << " seconds.";
state.SetErrorMessage(ss.str());
break;
}
last_marker = now;
}
mres = curl_multi_perform(multi_handle, &running_handles);
Expand Down Expand Up @@ -475,7 +494,7 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
}
} while (msg);

if (res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
Expand Down
3 changes: 3 additions & 0 deletions src/XrdTpc/XrdTpcTPC.hh
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ private:
static size_t m_block_size;
static size_t m_small_block_size;
bool m_desthttps;
int m_timeout; // the 'timeout interval'; if no bytes have been received during this time period, abort the transfer.
int m_first_timeout; // the 'first timeout interval'; the amount of time we're willing to wait to get the first byte.
// Unless explicitly specified, this is 2x the timeout interval.
std::string m_cadir;
static XrdSysMutex m_monid_mutex;
static uint64_t m_monid;
Expand Down