Skip to content

Commit

Permalink
Utilize curl pipelining when available.
Browse files Browse the repository at this point in the history
  • Loading branch information
bbockelm authored and simonmichal committed Dec 10, 2018
1 parent 0fb38b9 commit 6743c5c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
26 changes: 21 additions & 5 deletions src/XrdTpc/XrdTpcMultistream.cc
Expand Up @@ -227,17 +227,24 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
}
state.ResetAfterRequest();

handles.reserve(streams);
size_t concurrency = streams * m_pipelining_multiplier;

handles.reserve(concurrency);
handles.push_back(new State());
handles[0]->Move(state);
for (size_t idx = 1; idx < streams; idx++) {
for (size_t idx = 1; idx < concurrency; idx++) {
handles.push_back(handles[0]->Duplicate());
}

// Create the multi-handle and add in the current transfer to it.
MultiCurlHandler mch(handles);
MultiCurlHandler mch(handles, m_log);
CURLM *multi_handle = mch.Get();

#ifdef USE_PIPELINING
curl_multi_setopt(multi_handle, CURLMOPT_PIPELINING, 1);
curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, streams);
#endif

// Start response to client prior to the first call to curl_multi_perform
int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
if (retval) {
Expand Down Expand Up @@ -288,16 +295,24 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
}
} while (msg);
if (res != static_cast<CURLcode>(-1) && res != CURLE_OK) {
m_log.Emsg(log_prefix, "Breaking loop due to failed curl transfer.");
break;
}

if (running_handles < static_cast<int>(streams)) {
if (running_handles < static_cast<int>(concurrency)) {
// Issue new transfers if there is still pending work to do.
// Otherwise, continue running until there are no handles left.
if (current_offset != content_size) {
current_offset = mch.StartTransfers(current_offset, content_size,
m_block_size, running_handles);
if (!running_handles) {
std::stringstream ss;
ss << "No handles are able to run. Streams=" << streams << ", concurrency="
<< concurrency;
m_log.Emsg(log_prefix, ss.str().c_str());
}
} else if (running_handles == 0) {
m_log.Emsg(log_prefix, "Unable to start new transfers; breaking loop.");
break;
}
}
Expand Down Expand Up @@ -335,7 +350,8 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
if (msg && (msg->msg == CURLMSG_DONE)) {
CURL *easy_handle = msg->easy_handle;
mch.FinishCurlXfer(easy_handle);
res = msg->data.result; // Transfer result will be examined below.
if (res == CURLE_OK || res == static_cast<CURLcode>(-1))
res = msg->data.result; // Transfer result will be examined below.
}
} while (msg);

Expand Down
2 changes: 1 addition & 1 deletion src/XrdTpc/XrdTpcTPC.cc
Expand Up @@ -498,7 +498,7 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str());
}
curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
Stream stream(std::move(fh), streams, m_block_size);
Stream stream(std::move(fh), streams * m_pipelining_multiplier, m_block_size, m_log);
State state(0, stream, curl, false);
state.CopyHeaders(req);

Expand Down
9 changes: 9 additions & 0 deletions src/XrdTpc/XrdTpcTPC.hh
Expand Up @@ -75,5 +75,14 @@ private:
std::unique_ptr<XrdSfsFileSystem> m_sfs;
void *m_handle_base;
void *m_handle_chained;

// 16 blocks in flight at 16 MB each, meaning that there will be up to 256MB
// in flight; this is equal to the bandwidth delay product of a 200ms transcontinental
// connection at 10Gbps.
#ifdef USE_PIPELINING
static const int m_pipelining_multiplier = 16;
#else
static const int m_pipelining_multiplier = 1;
#endif
};
}

0 comments on commit 6743c5c

Please sign in to comment.