Skip to content

Commit

Permalink
Merge pull request #1449 from jthiltges/curlstate
Browse files Browse the repository at this point in the history
[XrdTpc] Do not modify curl handle after curl_easy_cleanup()
  • Loading branch information
abh3 committed May 8, 2021
2 parents 8f84293 + 2e3166a commit 70066fe
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 30 deletions.
13 changes: 5 additions & 8 deletions src/XrdTpc/XrdTpcMultistream.cc
Expand Up @@ -58,12 +58,6 @@ class MultiCurlHandler {
it != m_active_handles.end();
it++) {
curl_multi_remove_handle(m_handle, *it);
curl_easy_cleanup(*it);
}
for (std::vector<CURL *>::const_iterator it = m_avail_handles.begin();
it != m_avail_handles.end();
it++) {
curl_easy_cleanup(*it);
}
curl_multi_cleanup(m_handle);
}
Expand Down Expand Up @@ -263,7 +257,8 @@ class MultiCurlHandler {


int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
size_t streams, std::vector<State*> &handles, TPCLogRecord &rec)
size_t streams, std::vector<State*> &handles,
std::vector<ManagedCurlHandle> &curl_handles, TPCLogRecord &rec)
{
int result;
bool success;
Expand All @@ -283,6 +278,7 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
handles[0]->Move(state);
for (size_t idx = 1; idx < concurrency; idx++) {
handles.push_back(handles[0]->Duplicate());
curl_handles.emplace_back(handles.back()->GetHandle());
}

// Create the multi-handle and add in the current transfer to it.
Expand Down Expand Up @@ -496,9 +492,10 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
size_t streams, TPCLogRecord &rec)
{
std::vector<ManagedCurlHandle> curl_handles;
std::vector<State*> handles;
try {
int retval = RunCurlWithStreamsImpl(req, state, streams, handles, rec);
int retval = RunCurlWithStreamsImpl(req, state, streams, handles, curl_handles, rec);
for (std::vector<State*>::iterator state_iter = handles.begin();
state_iter != handles.end();
state_iter++) {
Expand Down
1 change: 1 addition & 0 deletions src/XrdTpc/XrdTpcState.hh
Expand Up @@ -3,6 +3,7 @@
*
* Helper class for managing the state of a single TPC request.
*/
#pragma once

#include <memory>
#include <vector>
Expand Down
34 changes: 12 additions & 22 deletions src/XrdTpc/XrdTpcTPC.cc
Expand Up @@ -32,6 +32,12 @@ XrdSysMutex TPCHandler::m_monid_mutex;
XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC);


void CurlDeleter::operator()(CURL *curl)
{
if (curl) curl_easy_cleanup(curl);
}


// We need to utilize the full URL (including the query string), not just the
// resource name. The query portion is hidden in the `xrd-http-query` header;
// we take this out and combine it with the resource name.
Expand Down Expand Up @@ -272,22 +278,19 @@ int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state,
ss << "Remote server failed request: " << curl_easy_strerror(res);
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
curl_easy_cleanup(curl);
return req.SendSimpleResp(rec.status, NULL, NULL, const_cast<char *>(curl_easy_strerror(res)), 0);
} else if (state.GetStatusCode() >= 400) {
std::stringstream ss;
ss << "Remote side failed with status code " << state.GetStatusCode();
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
curl_easy_cleanup(curl);
return req.SendSimpleResp(rec.status, NULL, NULL, const_cast<char *>(ss.str().c_str()), 0);
} else if (res) {
std::stringstream ss;
ss << "HTTP library failed: " << curl_easy_strerror(res);
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
char msg[] = "Unknown internal transfer failure";
curl_easy_cleanup(curl);
return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0);
}
std::stringstream ss;
Expand Down Expand Up @@ -373,7 +376,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL",
"Failed to initialize a libcurl multi-handle");
char msg[] = "Failed to initialize internal server memory";
curl_easy_cleanup(curl);
return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0);
}

Expand All @@ -387,15 +389,13 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
ss << "Failed to add transfer to libcurl multi-handle: " << curl_multi_strerror(mres);
logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL", ss.str());
char msg[] = "Failed to initialize internal server handle";
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0);
}

// Start response to client prior to the first call to curl_multi_perform
int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
if (retval) {
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
"Failed to send the initial response to the TPC client");
Expand Down Expand Up @@ -425,7 +425,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
}
if (SendPerfMarker(req, rec, state)) {
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
"Failed to send a perf marker to the TPC client");
Expand All @@ -438,7 +437,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
ss << "Transfer failed because no bytes have been received in " << timeout << " seconds.";
state.SetErrorMessage(ss.str());
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
break;
}
Expand All @@ -465,7 +463,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
CURL *easy_handle = msg->easy_handle;
res = msg->data.result;
curl_multi_remove_handle(multi_handle, easy_handle);
curl_easy_cleanup(easy_handle);
}
} while (msg);

Expand All @@ -491,7 +488,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,

char msg[] = "Internal server error due to libcurl";
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);

if ((retval = req.ChunkResp(msg, 0))) {
Expand All @@ -511,13 +507,11 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
CURL *easy_handle = msg->easy_handle;
res = msg->data.result;
curl_multi_remove_handle(multi_handle, easy_handle);
curl_easy_cleanup(easy_handle);
}
} while (msg);

if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
char msg[] = "Internal state error in libcurl";
logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", msg);
Expand Down Expand Up @@ -587,7 +581,6 @@ int TPCHandler::RunCurlBasic(CURL *curl, XrdHttpExtReq &req, State &state,
const char *log_prefix) {
CURLcode res;
res = curl_easy_perform(curl);
curl_easy_cleanup(curl);
state.Flush();
state.Finalize();
if (state.GetErrorCode()) {
Expand Down Expand Up @@ -627,7 +620,9 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req)
char *name = req.GetSecEntity().name;
if (name) rec.name = name;
logTransferEvent(LogMask::Info, rec, "PUSH_START", "Starting a push request");
CURL *curl = curl_easy_init();

ManagedCurlHandle curlPtr(curl_easy_init());
auto curl = curlPtr.get();
if (!curl) {
char msg[] = "Failed to initialize internal transfer resources";
rec.status = 500;
Expand All @@ -647,7 +642,6 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req)
AtomicEnd(m_monid_mutex);
std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, file_monid));
if (!fh.get()) {
curl_easy_cleanup(curl);
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "OPEN_FAIL",
"Failed to initialize internal transfer file handle");
Expand All @@ -662,10 +656,8 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req)
req.GetSecEntity(), authz);
if (SFS_REDIRECT == open_results) {
int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
curl_easy_cleanup(curl);
return result;
} else if (SFS_OK != open_results) {
curl_easy_cleanup(curl);
int code;
char msg_generic[] = "Failed to open local resource";
const char *msg = fh->error.getErrText(code);
Expand Down Expand Up @@ -701,7 +693,9 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
char *name = req.GetSecEntity().name;
if (name) rec.name = name;
logTransferEvent(LogMask::Info, rec, "PULL_START", "Starting a push request");
CURL *curl = curl_easy_init();

ManagedCurlHandle curlPtr(curl_easy_init());
auto curl = curlPtr.get();
if (!curl) {
char msg[] = "Failed to initialize internal transfer resources";
rec.status = 500;
Expand All @@ -711,7 +705,6 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
if (!fh.get()) {
curl_easy_cleanup(curl);
char msg[] = "Failed to initialize internal transfer file handle";
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "PULL_FAIL", msg);
Expand All @@ -737,7 +730,6 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
} catch (...) { // Handled below
}
if (stream_req < 0 || stream_req > 100) {
curl_easy_cleanup(curl);
char msg[] = "Invalid request for number of streams";
rec.status = 500;
logTransferEvent(LogMask::Info, rec, "INVALID_REQUEST", msg);
Expand All @@ -754,10 +746,8 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
req.GetSecEntity(), authz);
if (SFS_REDIRECT == open_result) {
int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
curl_easy_cleanup(curl);
return result;
} else if (SFS_OK != open_result) {
curl_easy_cleanup(curl);
int code;
char msg_generic[] = "Failed to open local resource";
const char *msg = fh->error.getErrText(code);
Expand Down
8 changes: 8 additions & 0 deletions src/XrdTpc/XrdTpcTPC.hh
Expand Up @@ -27,6 +27,13 @@ enum LogMask {
All = 0xff
};


struct CurlDeleter {
void operator()(CURL *curl);
};
using ManagedCurlHandle = std::unique_ptr<CURL, CurlDeleter>;


class TPCHandler : public XrdHttpExtHandler {
public:
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv);
Expand Down Expand Up @@ -95,6 +102,7 @@ private:
size_t streams, TPCLogRecord &rec);
int RunCurlWithStreamsImpl(XrdHttpExtReq &req, TPC::State &state,
size_t streams, std::vector<TPC::State*> &streams_handles,
std::vector<ManagedCurlHandle> &curl_handles,
TPCLogRecord &rec);
#else
int RunCurlBasic(CURL *curl, XrdHttpExtReq &req, TPC::State &state,
Expand Down

0 comments on commit 70066fe

Please sign in to comment.