Skip to content

Commit

Permalink
Improve logging of internal state on failures.
Browse files Browse the repository at this point in the history
Helps determine the cause of failure, particularly internal ones
where we run out of buffers (should only happen in the case where
there is faulty logic).
  • Loading branch information
bbockelm authored and simonmichal committed Dec 10, 2018
1 parent 1f0633f commit 0fb38b9
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 9 deletions.
37 changes: 31 additions & 6 deletions src/XrdTpc/XrdTpcMultistream.cc
Expand Up @@ -30,9 +30,10 @@ class CurlHandlerSetupError : public std::runtime_error {
namespace {
class MultiCurlHandler {
public:
MultiCurlHandler(std::vector<State*> &states) :
MultiCurlHandler(std::vector<State*> &states, XrdSysError &log) :
m_handle(curl_multi_init()),
m_states(states)
m_states(states),
m_log(log)
{
if (m_handle == NULL) {
throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle");
Expand Down Expand Up @@ -102,6 +103,12 @@ class MultiCurlHandler {
size_t xfer_size = std::min(content_length - current_offset, static_cast<off_t>(block_size));
if (xfer_size == 0) {return current_offset;}
if (!(started_new_xfer = StartTransfer(current_offset, xfer_size))) {
// In this case, we need to start new transfers but weren't able to.
if (running_handles == 0) {
if (!CanStartTransfer(true)) {
m_log.Emsg("StartTransfers", "Unable to start transfers.");
}
}
break;
} else {
running_handles += 1;
Expand All @@ -114,7 +121,7 @@ class MultiCurlHandler {
private:

bool StartTransfer(off_t offset, size_t size) {
if (!CanStartTransfer()) {return false;}
if (!CanStartTransfer(false)) {return false;}
for (std::vector<CURL*>::const_iterator handle_it = m_avail_handles.begin();
handle_it != m_avail_handles.end();
handle_it++) {
Expand Down Expand Up @@ -153,7 +160,7 @@ class MultiCurlHandler {
}
}

bool CanStartTransfer() const {
bool CanStartTransfer(bool log_reason) const {
size_t idle_handles = m_avail_handles.size();
size_t transfer_in_progress = 0;
for (std::vector<State*>::const_iterator state_iter = m_states.begin();
Expand All @@ -169,19 +176,33 @@ class MultiCurlHandler {
}
}
if (!idle_handles) {
if (log_reason) {
m_log.Emsg("CanStartTransfer", "Unable to start transfers as no idle CURL handles are available.");
}
return false;
}
ssize_t available_buffers = m_states[0]->AvailableBuffers();
// To be conservative, set aside buffers for any transfers that have been activated
// but don't have their first responses back yet.
available_buffers -= (m_active_handles.size() - transfer_in_progress);
if (log_reason && (available_buffers == 0)) {
std::stringstream ss;
ss << "Unable to start transfers as no buffers are available. Available buffers: " <<
m_states[0]->AvailableBuffers() << ", Active curl handles: " << m_active_handles.size()
<< ", Transfers in progress: " << transfer_in_progress;
m_log.Emsg("CanStartTransfer", ss.str().c_str());
if (m_states[0]->AvailableBuffers() == 0) {
m_states[0]->DumpBuffers();
}
}
return available_buffers > 0;
}

CURLM *m_handle;
std::vector<CURL *> m_avail_handles;
std::vector<CURL *> m_active_handles;
std::vector<State*> &m_states;
XrdSysError &m_log;
};
}

Expand Down Expand Up @@ -294,6 +315,7 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
&fd_count);
#endif
if (mres != CURLM_OK) {
m_log.Emsg(log_prefix, "Breaking transfer due to failed curl multi wait.");
break;
}
} while (running_handles);
Expand Down Expand Up @@ -327,8 +349,9 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
m_log.Emsg(log_prefix, "request failed when processing", curl_easy_strerror(res));
ss << "failure: " << curl_easy_strerror(res);
} else if (current_offset != content_size) {
ss << "failure: Internal logic error led to early abort";
m_log.Emsg(log_prefix, "Internal logic error led to early abort");
ss << "failure: Internal logic error led to early abort; current offset is " <<
current_offset << " while full size is " << content_size;
m_log.Emsg(log_prefix, ss.str().c_str());
} else if (state.GetStatusCode() >= 400) {
ss << "failure: Remote side failed with status code " << state.GetStatusCode();
m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str());
Expand All @@ -341,6 +364,8 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
}
return req.ChunkResp(NULL, 0);
}


int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
const char *log_prefix, size_t streams)
{
Expand Down
5 changes: 5 additions & 0 deletions src/XrdTpc/XrdTpcState.cc
Expand Up @@ -238,3 +238,8 @@ int State::AvailableBuffers() const
{
return m_stream->AvailableBuffers();
}

void State::DumpBuffers() const
{
m_stream->DumpBuffers();
}
2 changes: 2 additions & 0 deletions src/XrdTpc/XrdTpcState.hh
Expand Up @@ -67,6 +67,8 @@ public:

int AvailableBuffers() const;

void DumpBuffers() const;

// Returns true if at least one byte of the response has been received,
// but not the entire contents of the response.
bool BodyTransferInProgress() const {return m_offset && (m_offset != m_content_length);}
Expand Down
22 changes: 22 additions & 0 deletions src/XrdTpc/XrdTpcStream.cc
@@ -1,7 +1,10 @@

#include <sstream>

#include "XrdTpcStream.hh"

#include "XrdSfs/XrdSfsInterface.hh"
#include "XrdSys/XrdSysError.hh"

using namespace TPC;

Expand Down Expand Up @@ -93,6 +96,25 @@ Stream::Write(off_t offset, const char *buf, size_t size)
return retval;
}


void
Stream::DumpBuffers() const
{
m_log.Emsg("Stream::DumpBuffers", "Beginning dump of stream buffers.");
size_t idx = 0;
for (std::vector<Entry*>::const_iterator entry_iter = m_buffers.begin();
entry_iter!= m_buffers.end();
entry_iter++) {
std::stringstream ss;
ss << "Buffer " << idx << ": Offset=" << (*entry_iter)->GetOffset() << ", Size="
<< (*entry_iter)->GetSize() << ", Capacity=" << (*entry_iter)->GetCapacity();
m_log.Emsg("Stream::DumpBuffers", ss.str().c_str());
idx ++;
}
m_log.Emsg("Stream::DumpBuffers", "Finish dump of stream buffers.");
}


int
Stream::Read(off_t offset, char *buf, size_t size)
{
Expand Down
13 changes: 11 additions & 2 deletions src/XrdTpc/XrdTpcStream.hh
Expand Up @@ -15,14 +15,16 @@
struct stat;

class XrdSfsFile;
class XrdSysError;

namespace TPC {
class Stream {
public:
Stream(std::unique_ptr<XrdSfsFile> fh, size_t max_blocks, size_t buffer_size)
Stream(std::unique_ptr<XrdSfsFile> fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
: m_avail_count(max_blocks),
m_fh(std::move(fh)),
m_offset(0)
m_offset(0),
m_log(log)
{
m_buffers.reserve(max_blocks);
for (size_t idx=0; idx < max_blocks; idx++) {
Expand All @@ -40,6 +42,8 @@ public:

size_t AvailableBuffers() const {return m_avail_count;}

void DumpBuffers() const;

private:

class Entry {
Expand Down Expand Up @@ -102,6 +106,10 @@ private:
m_size = other.m_size;
}

off_t GetOffset() const {return m_offset;}
size_t GetCapacity() const {return m_capacity;}
size_t GetSize() const {return m_size;}

private:

Entry(const Entry&) = delete;
Expand All @@ -120,5 +128,6 @@ private:
std::unique_ptr<XrdSfsFile> m_fh;
off_t m_offset;
std::vector<Entry*> m_buffers;
XrdSysError &m_log;
};
}
2 changes: 1 addition & 1 deletion src/XrdTpc/XrdTpcTPC.cc
Expand Up @@ -430,7 +430,7 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req)
}
curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());

Stream stream(std::move(fh), 0, 0);
Stream stream(std::move(fh), 0, 0, m_log);
State state(0, stream, curl, true);
state.CopyHeaders(req);

Expand Down

0 comments on commit 0fb38b9

Please sign in to comment.