Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[XrdTpc] Do not modify curl handle after curl_easy_cleanup() #1449

Merged
merged 6 commits into from
May 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 5 additions & 8 deletions src/XrdTpc/XrdTpcMultistream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ class MultiCurlHandler {
it != m_active_handles.end();
it++) {
curl_multi_remove_handle(m_handle, *it);
curl_easy_cleanup(*it);
}
for (std::vector<CURL *>::const_iterator it = m_avail_handles.begin();
it != m_avail_handles.end();
it++) {
curl_easy_cleanup(*it);
}
curl_multi_cleanup(m_handle);
}
Expand Down Expand Up @@ -263,7 +257,8 @@ class MultiCurlHandler {


int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
size_t streams, std::vector<State*> &handles, TPCLogRecord &rec)
size_t streams, std::vector<State*> &handles,
std::vector<ManagedCurlHandle> &curl_handles, TPCLogRecord &rec)
{
int result;
bool success;
Expand All @@ -283,6 +278,7 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
handles[0]->Move(state);
for (size_t idx = 1; idx < concurrency; idx++) {
handles.push_back(handles[0]->Duplicate());
curl_handles.emplace_back(handles.back()->GetHandle());
}

// Create the multi-handle and add in the current transfer to it.
Expand Down Expand Up @@ -496,9 +492,10 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
size_t streams, TPCLogRecord &rec)
{
std::vector<ManagedCurlHandle> curl_handles;
std::vector<State*> handles;
try {
int retval = RunCurlWithStreamsImpl(req, state, streams, handles, rec);
int retval = RunCurlWithStreamsImpl(req, state, streams, handles, curl_handles, rec);
for (std::vector<State*>::iterator state_iter = handles.begin();
state_iter != handles.end();
state_iter++) {
Expand Down
1 change: 1 addition & 0 deletions src/XrdTpc/XrdTpcState.hh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*
* Helper class for managing the state of a single TPC request.
*/
#pragma once

#include <memory>
#include <vector>
Expand Down
34 changes: 12 additions & 22 deletions src/XrdTpc/XrdTpcTPC.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ XrdSysMutex TPCHandler::m_monid_mutex;
XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC);


void CurlDeleter::operator()(CURL *curl)
{
if (curl) curl_easy_cleanup(curl);
}


// We need to utilize the full URL (including the query string), not just the
// resource name. The query portion is hidden in the `xrd-http-query` header;
// we take this out and combine it with the resource name.
Expand Down Expand Up @@ -272,22 +278,19 @@ int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state,
ss << "Remote server failed request: " << curl_easy_strerror(res);
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
curl_easy_cleanup(curl);
return req.SendSimpleResp(rec.status, NULL, NULL, const_cast<char *>(curl_easy_strerror(res)), 0);
} else if (state.GetStatusCode() >= 400) {
std::stringstream ss;
ss << "Remote side failed with status code " << state.GetStatusCode();
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
curl_easy_cleanup(curl);
return req.SendSimpleResp(rec.status, NULL, NULL, const_cast<char *>(ss.str().c_str()), 0);
} else if (res) {
std::stringstream ss;
ss << "HTTP library failed: " << curl_easy_strerror(res);
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
char msg[] = "Unknown internal transfer failure";
curl_easy_cleanup(curl);
return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0);
}
std::stringstream ss;
Expand Down Expand Up @@ -373,7 +376,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL",
"Failed to initialize a libcurl multi-handle");
char msg[] = "Failed to initialize internal server memory";
curl_easy_cleanup(curl);
return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0);
}

Expand All @@ -387,15 +389,13 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
ss << "Failed to add transfer to libcurl multi-handle: " << curl_multi_strerror(mres);
logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL", ss.str());
char msg[] = "Failed to initialize internal server handle";
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0);
}

// Start response to client prior to the first call to curl_multi_perform
int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
if (retval) {
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
"Failed to send the initial response to the TPC client");
Expand Down Expand Up @@ -425,7 +425,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
}
if (SendPerfMarker(req, rec, state)) {
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
"Failed to send a perf marker to the TPC client");
Expand All @@ -438,7 +437,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
ss << "Transfer failed because no bytes have been received in " << timeout << " seconds.";
state.SetErrorMessage(ss.str());
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
break;
}
Expand All @@ -465,7 +463,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
CURL *easy_handle = msg->easy_handle;
res = msg->data.result;
curl_multi_remove_handle(multi_handle, easy_handle);
curl_easy_cleanup(easy_handle);
}
} while (msg);

Expand All @@ -491,7 +488,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,

char msg[] = "Internal server error due to libcurl";
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);

if ((retval = req.ChunkResp(msg, 0))) {
Expand All @@ -511,13 +507,11 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
CURL *easy_handle = msg->easy_handle;
res = msg->data.result;
curl_multi_remove_handle(multi_handle, easy_handle);
curl_easy_cleanup(easy_handle);
}
} while (msg);

if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
char msg[] = "Internal state error in libcurl";
logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", msg);
Expand Down Expand Up @@ -587,7 +581,6 @@ int TPCHandler::RunCurlBasic(CURL *curl, XrdHttpExtReq &req, State &state,
const char *log_prefix) {
CURLcode res;
res = curl_easy_perform(curl);
curl_easy_cleanup(curl);
state.Flush();
state.Finalize();
if (state.GetErrorCode()) {
Expand Down Expand Up @@ -627,7 +620,9 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req)
char *name = req.GetSecEntity().name;
if (name) rec.name = name;
logTransferEvent(LogMask::Info, rec, "PUSH_START", "Starting a push request");
CURL *curl = curl_easy_init();

ManagedCurlHandle curlPtr(curl_easy_init());
auto curl = curlPtr.get();
if (!curl) {
char msg[] = "Failed to initialize internal transfer resources";
rec.status = 500;
Expand All @@ -647,7 +642,6 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req)
AtomicEnd(m_monid_mutex);
std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, file_monid));
if (!fh.get()) {
curl_easy_cleanup(curl);
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "OPEN_FAIL",
"Failed to initialize internal transfer file handle");
Expand All @@ -662,10 +656,8 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req)
req.GetSecEntity(), authz);
if (SFS_REDIRECT == open_results) {
int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
curl_easy_cleanup(curl);
return result;
} else if (SFS_OK != open_results) {
curl_easy_cleanup(curl);
int code;
char msg_generic[] = "Failed to open local resource";
const char *msg = fh->error.getErrText(code);
Expand Down Expand Up @@ -701,7 +693,9 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
char *name = req.GetSecEntity().name;
if (name) rec.name = name;
logTransferEvent(LogMask::Info, rec, "PULL_START", "Starting a push request");
CURL *curl = curl_easy_init();

ManagedCurlHandle curlPtr(curl_easy_init());
auto curl = curlPtr.get();
if (!curl) {
char msg[] = "Failed to initialize internal transfer resources";
rec.status = 500;
Expand All @@ -711,7 +705,6 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
if (!fh.get()) {
curl_easy_cleanup(curl);
char msg[] = "Failed to initialize internal transfer file handle";
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "PULL_FAIL", msg);
Expand All @@ -737,7 +730,6 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
} catch (...) { // Handled below
}
if (stream_req < 0 || stream_req > 100) {
curl_easy_cleanup(curl);
char msg[] = "Invalid request for number of streams";
rec.status = 500;
logTransferEvent(LogMask::Info, rec, "INVALID_REQUEST", msg);
Expand All @@ -754,10 +746,8 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
req.GetSecEntity(), authz);
if (SFS_REDIRECT == open_result) {
int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
curl_easy_cleanup(curl);
return result;
} else if (SFS_OK != open_result) {
curl_easy_cleanup(curl);
int code;
char msg_generic[] = "Failed to open local resource";
const char *msg = fh->error.getErrText(code);
Expand Down
8 changes: 8 additions & 0 deletions src/XrdTpc/XrdTpcTPC.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ enum LogMask {
All = 0xff
};


struct CurlDeleter {
void operator()(CURL *curl);
};
using ManagedCurlHandle = std::unique_ptr<CURL, CurlDeleter>;


class TPCHandler : public XrdHttpExtHandler {
public:
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv);
Expand Down Expand Up @@ -95,6 +102,7 @@ private:
size_t streams, TPCLogRecord &rec);
int RunCurlWithStreamsImpl(XrdHttpExtReq &req, TPC::State &state,
size_t streams, std::vector<TPC::State*> &streams_handles,
std::vector<ManagedCurlHandle> &curl_handles,
TPCLogRecord &rec);
#else
int RunCurlBasic(CURL *curl, XrdHttpExtReq &req, TPC::State &state,
Expand Down