diff --git a/src/XrdTpc/XrdTpcMultistream.cc b/src/XrdTpc/XrdTpcMultistream.cc index ba659d2757b..dd66a211d68 100644 --- a/src/XrdTpc/XrdTpcMultistream.cc +++ b/src/XrdTpc/XrdTpcMultistream.cc @@ -58,12 +58,6 @@ class MultiCurlHandler { it != m_active_handles.end(); it++) { curl_multi_remove_handle(m_handle, *it); - curl_easy_cleanup(*it); - } - for (std::vector::const_iterator it = m_avail_handles.begin(); - it != m_avail_handles.end(); - it++) { - curl_easy_cleanup(*it); } curl_multi_cleanup(m_handle); } @@ -263,7 +257,8 @@ class MultiCurlHandler { int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, - size_t streams, std::vector &handles, TPCLogRecord &rec) + size_t streams, std::vector &handles, + std::vector &curl_handles, TPCLogRecord &rec) { int result; bool success; @@ -283,6 +278,7 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, handles[0]->Move(state); for (size_t idx = 1; idx < concurrency; idx++) { handles.push_back(handles[0]->Duplicate()); + curl_handles.emplace_back(handles.back()->GetHandle()); } // Create the multi-handle and add in the current transfer to it. @@ -496,9 +492,10 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state, size_t streams, TPCLogRecord &rec) { + std::vector curl_handles; std::vector handles; try { - int retval = RunCurlWithStreamsImpl(req, state, streams, handles, rec); + int retval = RunCurlWithStreamsImpl(req, state, streams, handles, curl_handles, rec); for (std::vector::iterator state_iter = handles.begin(); state_iter != handles.end(); state_iter++) { diff --git a/src/XrdTpc/XrdTpcState.hh b/src/XrdTpc/XrdTpcState.hh index 9dacd982007..394279b69fd 100644 --- a/src/XrdTpc/XrdTpcState.hh +++ b/src/XrdTpc/XrdTpcState.hh @@ -3,6 +3,7 @@ * * Helper class for managing the state of a single TPC request. */ +#pragma once #include #include diff --git a/src/XrdTpc/XrdTpcTPC.cc b/src/XrdTpc/XrdTpcTPC.cc index 48062a089da..d800dfab9cd 100644 --- a/src/XrdTpc/XrdTpcTPC.cc +++ b/src/XrdTpc/XrdTpcTPC.cc @@ -32,6 +32,12 @@ XrdSysMutex TPCHandler::m_monid_mutex; XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC); +void CurlDeleter::operator()(CURL *curl) +{ + if (curl) curl_easy_cleanup(curl); +} + + // We need to utilize the full URL (including the query string), not just the // resource name. The query portion is hidden in the `xrd-http-query` header; // we take this out and combine it with the resource name. @@ -272,14 +278,12 @@ int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state, ss << "Remote server failed request: " << curl_easy_strerror(res); rec.status = 500; logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str()); - curl_easy_cleanup(curl); return req.SendSimpleResp(rec.status, NULL, NULL, const_cast(curl_easy_strerror(res)), 0); } else if (state.GetStatusCode() >= 400) { std::stringstream ss; ss << "Remote side failed with status code " << state.GetStatusCode(); rec.status = 500; logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str()); - curl_easy_cleanup(curl); return req.SendSimpleResp(rec.status, NULL, NULL, const_cast(ss.str().c_str()), 0); } else if (res) { std::stringstream ss; @@ -287,7 +291,6 @@ int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state, rec.status = 500; logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str()); char msg[] = "Unknown internal transfer failure"; - curl_easy_cleanup(curl); return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } std::stringstream ss; @@ -373,7 +376,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL", "Failed to initialize a libcurl multi-handle"); char msg[] = "Failed to initialize internal server memory"; - curl_easy_cleanup(curl); return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } @@ -387,7 +389,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, ss << "Failed to add transfer to libcurl multi-handle: " << curl_multi_strerror(mres); logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL", ss.str()); char msg[] = "Failed to initialize internal server handle"; - curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } @@ -395,7 +396,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, // Start response to client prior to the first call to curl_multi_perform int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain"); if (retval) { - curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL", "Failed to send the initial response to the TPC client"); @@ -425,7 +425,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, } if (SendPerfMarker(req, rec, state)) { curl_multi_remove_handle(multi_handle, curl); - curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL", "Failed to send a perf marker to the TPC client"); @@ -438,7 +437,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, ss << "Transfer failed because no bytes have been received in " << timeout << " seconds."; state.SetErrorMessage(ss.str()); curl_multi_remove_handle(multi_handle, curl); - curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); break; } @@ -465,7 +463,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, CURL *easy_handle = msg->easy_handle; res = msg->data.result; curl_multi_remove_handle(multi_handle, easy_handle); - curl_easy_cleanup(easy_handle); } } while (msg); @@ -491,7 +488,6 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, char msg[] = "Internal server error due to libcurl"; curl_multi_remove_handle(multi_handle, curl); - curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); if ((retval = req.ChunkResp(msg, 0))) { @@ -511,13 +507,11 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, CURL *easy_handle = msg->easy_handle; res = msg->data.result; curl_multi_remove_handle(multi_handle, easy_handle); - curl_easy_cleanup(easy_handle); } } while (msg); if (!state.GetErrorCode() && res == static_cast(-1)) { // No transfers returned?!? curl_multi_remove_handle(multi_handle, curl); - curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); char msg[] = "Internal state error in libcurl"; logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", msg); @@ -587,7 +581,6 @@ int TPCHandler::RunCurlBasic(CURL *curl, XrdHttpExtReq &req, State &state, const char *log_prefix) { CURLcode res; res = curl_easy_perform(curl); - curl_easy_cleanup(curl); state.Flush(); state.Finalize(); if (state.GetErrorCode()) { @@ -627,7 +620,9 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) char *name = req.GetSecEntity().name; if (name) rec.name = name; logTransferEvent(LogMask::Info, rec, "PUSH_START", "Starting a push request"); - CURL *curl = curl_easy_init(); + + ManagedCurlHandle curlPtr(curl_easy_init()); + auto curl = curlPtr.get(); if (!curl) { char msg[] = "Failed to initialize internal transfer resources"; rec.status = 500; @@ -647,7 +642,6 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) AtomicEnd(m_monid_mutex); std::unique_ptr fh(m_sfs->newFile(name, file_monid)); if (!fh.get()) { - curl_easy_cleanup(curl); rec.status = 500; logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", "Failed to initialize internal transfer file handle"); @@ -662,10 +656,8 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) req.GetSecEntity(), authz); if (SFS_REDIRECT == open_results) { int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec); - curl_easy_cleanup(curl); return result; } else if (SFS_OK != open_results) { - curl_easy_cleanup(curl); int code; char msg_generic[] = "Failed to open local resource"; const char *msg = fh->error.getErrText(code); @@ -701,7 +693,9 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) char *name = req.GetSecEntity().name; if (name) rec.name = name; logTransferEvent(LogMask::Info, rec, "PULL_START", "Starting a push request"); - CURL *curl = curl_easy_init(); + + ManagedCurlHandle curlPtr(curl_easy_init()); + auto curl = curlPtr.get(); if (!curl) { char msg[] = "Failed to initialize internal transfer resources"; rec.status = 500; @@ -711,7 +705,6 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1); std::unique_ptr fh(m_sfs->newFile(name, m_monid++)); if (!fh.get()) { - curl_easy_cleanup(curl); char msg[] = "Failed to initialize internal transfer file handle"; rec.status = 500; logTransferEvent(LogMask::Error, rec, "PULL_FAIL", msg); @@ -737,7 +730,6 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) } catch (...) { // Handled below } if (stream_req < 0 || stream_req > 100) { - curl_easy_cleanup(curl); char msg[] = "Invalid request for number of streams"; rec.status = 500; logTransferEvent(LogMask::Info, rec, "INVALID_REQUEST", msg); @@ -754,10 +746,8 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) req.GetSecEntity(), authz); if (SFS_REDIRECT == open_result) { int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec); - curl_easy_cleanup(curl); return result; } else if (SFS_OK != open_result) { - curl_easy_cleanup(curl); int code; char msg_generic[] = "Failed to open local resource"; const char *msg = fh->error.getErrText(code); diff --git a/src/XrdTpc/XrdTpcTPC.hh b/src/XrdTpc/XrdTpcTPC.hh index d9b1b8eeb2a..6bb2f972728 100644 --- a/src/XrdTpc/XrdTpcTPC.hh +++ b/src/XrdTpc/XrdTpcTPC.hh @@ -27,6 +27,13 @@ enum LogMask { All = 0xff }; + +struct CurlDeleter { + void operator()(CURL *curl); +}; +using ManagedCurlHandle = std::unique_ptr; + + class TPCHandler : public XrdHttpExtHandler { public: TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv); @@ -95,6 +102,7 @@ private: size_t streams, TPCLogRecord &rec); int RunCurlWithStreamsImpl(XrdHttpExtReq &req, TPC::State &state, size_t streams, std::vector &streams_handles, + std::vector &curl_handles, TPCLogRecord &rec); #else int RunCurlBasic(CURL *curl, XrdHttpExtReq &req, TPC::State &state,