Skip to content

Commit

Permalink
Downgrade the XrdTpc code from C++11 to C++03.
Browse files Browse the repository at this point in the history
  • Loading branch information
bbockelm committed Jun 7, 2018
1 parent 1001a81 commit f93b5a1
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 176 deletions.
2 changes: 1 addition & 1 deletion src/XrdTpc/CMakeLists.txt
Expand Up @@ -43,7 +43,7 @@ pkg_check_modules(CURL REQUIRED libcurl)

include_directories(${XROOTD_INCLUDES} ${XROOTD_PRIVATE_INCLUDES} ${CURL_INCLUDE_DIRS})

add_library(XrdHttpTPC SHARED src/tpc.cpp src/state.cpp src/configure.cpp src/stream.cpp src/multistream.cpp)
add_library(XrdHttpTPC SHARED src/tpc.cpp src/state.cpp src/configure.cpp src/stream.cpp src/multistream.cpp src/XrdTpcCurlMulti.cpp)
if ( XRD_CHUNK_RESP )
set_target_properties(XrdHttpTPC PROPERTIES COMPILE_DEFINITIONS "XRD_CHUNK_RESP" )
endif ()
Expand Down
30 changes: 15 additions & 15 deletions src/XrdTpc/configure.cpp
Expand Up @@ -18,27 +18,27 @@ using namespace TPC;


static XrdSfsFileSystem *load_sfs(void *handle, bool alt, XrdSysError &log, const std::string &libpath, const char *configfn, XrdOucEnv &myEnv, XrdSfsFileSystem *prior_sfs) {
XrdSfsFileSystem *sfs = nullptr;
XrdSfsFileSystem *sfs = NULL;
if (alt) {
auto ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *, XrdSysLogger *, const char *, XrdOucEnv *))
XrdSfsFileSystem2_t ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *, XrdSysLogger *, const char *, XrdOucEnv *))
(dlsym(handle, "XrdSfsGetFileSystem2"));
if (ep == nullptr) {
if (ep == NULL) {
log.Emsg("Config", "Failed to load XrdSfsGetFileSystem2 from library ", libpath.c_str(), dlerror());
return nullptr;
return NULL;
}
sfs = ep(prior_sfs, log.logger(), configfn, &myEnv);
} else {
auto ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *, XrdSysLogger *, const char *))
(dlsym(nullptr, "XrdSfsGetFileSystem"));
if (ep == nullptr) {
XrdSfsFileSystem_t ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *, XrdSysLogger *, const char *))
(dlsym(NULL, "XrdSfsGetFileSystem"));
if (ep == NULL) {
log.Emsg("Config", "Failed to load XrdSfsGetFileSystem from library ", libpath.c_str(), dlerror());
return nullptr;
return NULL;
}
sfs = ep(prior_sfs, log.logger(), configfn);
}
if (!sfs) {
log.Emsg("Config", "Failed to initialize filesystem library for TPC handler from ", libpath.c_str());
return nullptr;
return NULL;
}
return sfs;
}
Expand Down Expand Up @@ -138,10 +138,10 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv)
}
Config.Close();

XrdSfsFileSystem *base_sfs = nullptr;
XrdSfsFileSystem *base_sfs = NULL;
if (path1 == "default") {
m_log.Emsg("Config", "Loading the default filesystem");
base_sfs = XrdSfsGetDefaultFileSystem(nullptr, m_log.logger(), configfn, myEnv);
base_sfs = XrdSfsGetDefaultFileSystem(NULL, m_log.logger(), configfn, myEnv);
m_log.Emsg("Config", "Finished loading the default filesystem");
} else {
char resolvePath[2048];
Expand All @@ -151,17 +151,17 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv)
return false;
}
m_handle_base = dlopen(resolvePath, RTLD_LOCAL|RTLD_NOW);
if (m_handle_base == nullptr) {
if (m_handle_base == NULL) {
m_log.Emsg("Config", "Failed to base plugin ", resolvePath, dlerror());
return false;
}
base_sfs = load_sfs(m_handle_base, path1_alt, m_log, path1, configfn, *myEnv, nullptr);
base_sfs = load_sfs(m_handle_base, path1_alt, m_log, path1, configfn, *myEnv, NULL);
}
if (!base_sfs) {
m_log.Emsg("Config", "Failed to initialize filesystem library for TPC handler from ", path1.c_str());
return false;
}
XrdSfsFileSystem *chained_sfs = nullptr;
XrdSfsFileSystem *chained_sfs = NULL;
if (!path2.empty()) {
char resolvePath[2048];
bool usedAltPath{true};
Expand All @@ -170,7 +170,7 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv)
return false;
}
m_handle_chained = dlopen(resolvePath, RTLD_LOCAL|RTLD_NOW);
if (m_handle_chained == nullptr) {
if (m_handle_chained == NULL) {
m_log.Emsg("Config", "Failed to chained plugin ", resolvePath, dlerror());
return false;
}
Expand Down
136 changes: 93 additions & 43 deletions src/XrdTpc/multistream.cpp
Expand Up @@ -6,6 +6,7 @@

#include "tpc.hh"
#include "state.hh"
#include "XrdTpcCurlMulti.hh"

#include "XrdSys/XrdSysError.hh"

Expand All @@ -14,42 +15,50 @@
#include <sstream>
#include <stdexcept>


using namespace TPC;

class CurlHandlerSetupError : public std::runtime_error {
public:
CurlHandlerSetupError(const std::string &msg) :
std::runtime_error(msg)
{}
virtual ~CurlHandlerSetupError() {}

virtual ~CurlHandlerSetupError() throw () {}
};

namespace {
class MultiCurlHandler {
public:
MultiCurlHandler(std::vector<State> &states) :
MultiCurlHandler(std::vector<State*> &states) :
m_handle(curl_multi_init()),
m_states(states)
{
if (m_handle == nullptr) {
if (m_handle == NULL) {
throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle");
}
m_avail_handles.reserve(states.size());
m_active_handles.reserve(states.size());
for (State &state : states) {
m_avail_handles.push_back(state.GetHandle());
for (std::vector<State*>::const_iterator state_iter = states.begin();
state_iter != states.end();
state_iter++) {
m_avail_handles.push_back((*state_iter)->GetHandle());
}
}

~MultiCurlHandler()
{
if (!m_handle) {return;}
for (CURL * easy_handle : m_active_handles) {
curl_multi_remove_handle(m_handle, easy_handle);
curl_easy_cleanup(easy_handle);
for (std::vector<CURL *>::const_iterator it = m_active_handles.begin();
it != m_active_handles.end();
it++) {
curl_multi_remove_handle(m_handle, *it);
curl_easy_cleanup(*it);
}
for (auto & easy_handle : m_avail_handles) {
curl_easy_cleanup(easy_handle);
for (std::vector<CURL *>::const_iterator it = m_avail_handles.begin();
it != m_avail_handles.end();
it++) {
curl_easy_cleanup(*it);
}
curl_multi_cleanup(m_handle);
}
Expand All @@ -66,13 +75,15 @@ class MultiCurlHandler {
<< curl_multi_strerror(mres);
throw std::runtime_error(ss.str());
}
for (auto &state : m_states) {
if (curl == state.GetHandle()) {
state.ResetAfterRequest();
for (std::vector<State*>::iterator state_iter = m_states.begin();
state_iter != m_states.end();
state_iter++) {
if (curl == (*state_iter)->GetHandle()) {
(*state_iter)->ResetAfterRequest();
break;
}
}
for (auto iter = m_active_handles.begin();
for (std::vector<CURL *>::iterator iter = m_active_handles.begin();
iter != m_active_handles.end();
++iter)
{
Expand Down Expand Up @@ -104,11 +115,15 @@ class MultiCurlHandler {

bool StartTransfer(off_t offset, size_t size) {
if (!CanStartTransfer()) {return false;}
for (auto &handle : m_avail_handles) {
for (auto &state : m_states) {
if (state.GetHandle() == handle) { // This state object represents an idle handle.
state.SetTransferParameters(offset, size);
ActivateHandle(state);
for (std::vector<CURL*>::const_iterator handle_it = m_avail_handles.begin();
handle_it != m_avail_handles.end();
handle_it++) {
for (std::vector<State*>::iterator state_it = m_states.begin();
state_it != m_states.end();
state_it++) {
if ((*state_it)->GetHandle() == *handle_it) { // This state object represents an idle handle.
(*state_it)->SetTransferParameters(offset, size);
ActivateHandle(**state_it);
return true;
}
}
Expand Down Expand Up @@ -141,18 +156,22 @@ class MultiCurlHandler {
bool CanStartTransfer() const {
size_t idle_handles = m_avail_handles.size();
size_t transfer_in_progress = 0;
for (auto &state : m_states) {
for (const auto &handle : m_active_handles) {
if (handle == state.GetHandle()) {
transfer_in_progress += state.BodyTransferInProgress();
for (std::vector<State*>::const_iterator state_iter = m_states.begin();
state_iter != m_states.end();
state_iter++) {
for (std::vector<CURL*>::const_iterator handle_iter = m_active_handles.begin();
handle_iter != m_active_handles.end();
handle_iter++) {
if (*handle_iter == (*state_iter)->GetHandle()) {
transfer_in_progress += (*state_iter)->BodyTransferInProgress();
break;
}
}
}
if (!idle_handles) {
return false;
}
ssize_t available_buffers = m_states[0].AvailableBuffers();
ssize_t available_buffers = m_states[0]->AvailableBuffers();
// To be conservative, set aside buffers for any transfers that have been activated
// but don't have their first responses back yet.
available_buffers -= (m_active_handles.size() - transfer_in_progress);
Expand All @@ -162,14 +181,14 @@ class MultiCurlHandler {
CURLM *m_handle;
std::vector<CURL *> m_avail_handles;
std::vector<CURL *> m_active_handles;
std::vector<State> &m_states;
std::vector<State*> &m_states;
};
}


int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
const char *log_prefix, size_t streams)
try
int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
const char *log_prefix, size_t streams,
std::vector<State*> handles)
{
int result;
bool success;
Expand All @@ -187,11 +206,11 @@ try
}
state.ResetAfterRequest();

std::vector<State> handles;
handles.reserve(streams);
handles.emplace_back(std::move(state));
handles.push_back(new State());
handles[0]->Move(state);
for (size_t idx = 1; idx < streams; idx++) {
handles.emplace_back(handles[0].Duplicate()); // Makes a duplicate of the original state
handles.push_back(handles[0]->Duplicate());
}

// Create the multi-handle and add in the current transfer to it.
Expand Down Expand Up @@ -267,7 +286,13 @@ try
continue;
}
int fd_count;
mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count);
#ifdef HAS_CURL_MULTI
mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000,
&fd_count);
#else
mres = curl_multi_wait_impl(multi_handle, max_sleep_time*1000,
&fd_count);
#endif
if (mres != CURLM_OK) {
break;
}
Expand Down Expand Up @@ -314,20 +339,45 @@ try
if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
return retval;
}
return req.ChunkResp(nullptr, 0);
return req.ChunkResp(NULL, 0);
}
catch (CurlHandlerSetupError e) {
m_log.Emsg(log_prefix, e.what());
return req.SendSimpleResp(500, nullptr, nullptr, e.what(), 0);
} catch (std::runtime_error e) {
m_log.Emsg(log_prefix, e.what());
std::stringstream ss;
ss << "failure: " << e.what();
int retval;
if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
const char *log_prefix, size_t streams)
{
std::vector<State*> handles;
try {
int retval = RunCurlWithStreamsImpl(req, state, log_prefix, streams, handles);
for (std::vector<State*>::iterator state_iter = handles.begin();
state_iter != handles.end();
state_iter++) {
delete *state_iter;
}
return retval;
} catch (CurlHandlerSetupError e) {
for (std::vector<State*>::iterator state_iter = handles.begin();
state_iter != handles.end();
state_iter++) {
delete *state_iter;
}

m_log.Emsg(log_prefix, e.what());
return req.SendSimpleResp(500, NULL, NULL, e.what(), 0);
} catch (std::runtime_error e) {
for (std::vector<State*>::iterator state_iter = handles.begin();
state_iter != handles.end();
state_iter++) {
delete *state_iter;
}

m_log.Emsg(log_prefix, e.what());
std::stringstream ss;
ss << "failure: " << e.what();
int retval;
if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
return retval;
}
return req.ChunkResp(NULL, 0);
}
return req.ChunkResp(nullptr, 0);
}

#endif // XRD_CHUNK_RESP

0 comments on commit f93b5a1

Please sign in to comment.