Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul logging approach for TPC #1156

Merged
merged 2 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 54 additions & 0 deletions src/XrdTpc/XrdTpcConfigure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv)
{
XrdOucStream Config(&m_log, getenv("XRDINSTANCE"), myEnv, "=====> ");

m_log.setMsgMask(LogMask::Info | LogMask::Warning | LogMask::Error);

std::string authLib;
std::string authLibParms;
int cfgFD = open(configfn, O_RDONLY, 0);
Expand Down Expand Up @@ -51,6 +53,11 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv)
return false;
}
m_cadir = val;
} else if (!strcmp("tpc.trace", val)) {
if (!ConfigureLogger(Config)) {
Config.Close();
return false;
}
}
}
Config.Close();
Expand All @@ -66,3 +73,50 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv)
}
return true;
}

bool TPCHandler::ConfigureLogger(XrdOucStream &config_obj)
{
char *val = config_obj.GetWord();
if (!val || !val[0])
{
m_log.Emsg("Config", "tpc.trace requires at least one directive [all | error | warning | info | debug | none]");
return false;
}
// If the config option is given, reset the log mask.
m_log.setMsgMask(0);

do {
if (!strcasecmp(val, "all"))
{
m_log.setMsgMask(m_log.getMsgMask() | LogMask::All);
}
else if (!strcasecmp(val, "error"))
{
m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error);
}
else if (!strcasecmp(val, "warning"))
{
m_log.setMsgMask(m_log.getMsgMask() | LogMask::Warning);
}
else if (!strcasecmp(val, "info"))
{
m_log.setMsgMask(m_log.getMsgMask() | LogMask::Info);
}
else if (!strcasecmp(val, "debug"))
{
m_log.setMsgMask(m_log.getMsgMask() | LogMask::Debug);
}
else if (!strcasecmp(val, "none"))
{
m_log.setMsgMask(0);
}
else
{
m_log.Emsg("Config", "tpc.trace encountered an unknown directive (valid values: [all | error | warning | info | debug | none]):", val);
return false;
}
val = config_obj.GetWord();
} while (val);

return true;
}
70 changes: 45 additions & 25 deletions src/XrdTpc/XrdTpcMultistream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,23 +208,17 @@ class MultiCurlHandler {


int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
const char *log_prefix, size_t streams,
std::vector<State*> handles)
size_t streams, std::vector<State*> handles, TPCLogRecord &rec)
{
int result;
bool success;
CURL *curl = state.GetHandle();
if ((result = DetermineXferSize(curl, req, state, success)) || !success) {
if ((result = DetermineXferSize(curl, req, state, success, rec)) || !success) {
return result;
}
off_t content_size = state.GetContentLength();
off_t current_offset = 0;

{
std::stringstream ss;
ss << "Successfully determined remote size for pull request: " << content_size;
m_log.Emsg("ProcessPullReq", ss.str().c_str());
}
state.ResetAfterRequest();

size_t concurrency = streams * m_pipelining_multiplier;
Expand All @@ -248,7 +242,12 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
// Start response to client prior to the first call to curl_multi_perform
int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
if (retval) {
logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
"Failed to send the initial response to the TPC client");
return retval;
} else {
logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
"Initial transfer response sent to the TPC client");
}

// Start assigning transfers
Expand All @@ -264,7 +263,9 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
time_t now = time(NULL);
time_t next_marker = last_marker + m_marker_period;
if (now >= next_marker) {
if (SendPerfMarker(req, handles, current_offset)) {
if (SendPerfMarker(req, rec, handles, current_offset)) {
logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
"Failed to send a perf marker to the TPC client");
return -1;
}
last_marker = now;
Expand Down Expand Up @@ -295,7 +296,8 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
}
} while (msg);
if (res != static_cast<CURLcode>(-1) && res != CURLE_OK) {
m_log.Emsg(log_prefix, "Breaking loop due to failed curl transfer.");
logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_CURL_FAILURE",
"Breaking loop due to failed curl transfer");
break;
}

Expand All @@ -309,10 +311,12 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
std::stringstream ss;
ss << "No handles are able to run. Streams=" << streams << ", concurrency="
<< concurrency;
m_log.Emsg(log_prefix, ss.str().c_str());

logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE", ss.str());
}
} else if (running_handles == 0) {
m_log.Emsg(log_prefix, "Unable to start new transfers; breaking loop.");
logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE",
"Unable to start new transfers; breaking loop.");
break;
}
}
Expand All @@ -330,7 +334,6 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
&fd_count);
#endif
if (mres != CURLM_OK) {
m_log.Emsg(log_prefix, "Breaking transfer due to failed curl multi wait.");
break;
}
} while (running_handles);
Expand All @@ -339,6 +342,7 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
std::stringstream ss;
ss << "Internal libcurl multi-handle error: "
<< curl_multi_strerror(mres);
logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", ss.str());
throw std::runtime_error(ss.str());
}

Expand All @@ -356,43 +360,58 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
} while (msg);

if (res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
"Internal state error in libcurl");
throw std::runtime_error("Internal state error in libcurl");
}

rec.bytes_transferred = state.BytesTransferred();
rec.tpc_status = state.GetStatusCode();

// Generate the final response back to the client.
std::stringstream ss;
if (res != CURLE_OK) {
m_log.Emsg(log_prefix, "request failed when processing", curl_easy_strerror(res));
success = false;
if (state.GetStatusCode() >= 400) {
ss << "failure: Remote side failed with status code " << state.GetStatusCode();
rec.status = state.GetStatusCode();
logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str());
} else if (res != CURLE_OK) {
std::stringstream ss2;
ss2 << "Request failed when processing: " << curl_easy_strerror(res);
logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str());
ss << "failure: " << curl_easy_strerror(res);
} else if (current_offset != content_size) {
ss << "failure: Internal logic error led to early abort; current offset is " <<
current_offset << " while full size is " << content_size;
m_log.Emsg(log_prefix, ss.str().c_str());
} else if (state.GetStatusCode() >= 400) {
ss << "failure: Remote side failed with status code " << state.GetStatusCode();
m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str());
logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str());
} else {
if (!handles[0]->Finalize()) {
ss << "failure: Failed to finalize and close file handle.";
m_log.Emsg(log_prefix, "Failed to finalize file handle");
logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
"Failed to finalize and close file handle");
} else {
ss << "success: Created";
success = true;
}
}

if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
"Failed to send last update to remote client");
return retval;
} else if (success) {
logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
}
return req.ChunkResp(NULL, 0);
}


int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
const char *log_prefix, size_t streams)
size_t streams, TPCLogRecord &rec)
{
std::vector<State*> handles;
try {
int retval = RunCurlWithStreamsImpl(req, state, log_prefix, streams, handles);
int retval = RunCurlWithStreamsImpl(req, state, streams, handles, rec);
for (std::vector<State*>::iterator state_iter = handles.begin();
state_iter != handles.end();
state_iter++) {
Expand All @@ -406,16 +425,17 @@ int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
delete *state_iter;
}

m_log.Emsg(log_prefix, e.what());
return req.SendSimpleResp(500, NULL, NULL, e.what(), 0);
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
return req.SendSimpleResp(rec.status, NULL, NULL, e.what(), 0);
} catch (std::runtime_error &e) {
for (std::vector<State*>::iterator state_iter = handles.begin();
state_iter != handles.end();
state_iter++) {
delete *state_iter;
}

m_log.Emsg(log_prefix, e.what());
logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
std::stringstream ss;
ss << "failure: " << e.what();
int retval;
Expand Down