diff --git a/src/XrdTpc/XrdTpcConfigure.cc b/src/XrdTpc/XrdTpcConfigure.cc index a06af36cceb..fe54574ab5b 100644 --- a/src/XrdTpc/XrdTpcConfigure.cc +++ b/src/XrdTpc/XrdTpcConfigure.cc @@ -17,6 +17,8 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv) { XrdOucStream Config(&m_log, getenv("XRDINSTANCE"), myEnv, "=====> "); + m_log.setMsgMask(LogMask::Info | LogMask::Warning | LogMask::Error); + std::string authLib; std::string authLibParms; int cfgFD = open(configfn, O_RDONLY, 0); @@ -51,6 +53,11 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv) return false; } m_cadir = val; + } else if (!strcmp("tpc.trace", val)) { + if (!ConfigureLogger(Config)) { + Config.Close(); + return false; + } } } Config.Close(); @@ -66,3 +73,50 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv) } return true; } + +bool TPCHandler::ConfigureLogger(XrdOucStream &config_obj) +{ + char *val = config_obj.GetWord(); + if (!val || !val[0]) + { + m_log.Emsg("Config", "tpc.trace requires at least one directive [all | error | warning | info | debug | none]"); + return false; + } + // If the config option is given, reset the log mask. + m_log.setMsgMask(0); + + do { + if (!strcasecmp(val, "all")) + { + m_log.setMsgMask(m_log.getMsgMask() | LogMask::All); + } + else if (!strcasecmp(val, "error")) + { + m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error); + } + else if (!strcasecmp(val, "warning")) + { + m_log.setMsgMask(m_log.getMsgMask() | LogMask::Warning); + } + else if (!strcasecmp(val, "info")) + { + m_log.setMsgMask(m_log.getMsgMask() | LogMask::Info); + } + else if (!strcasecmp(val, "debug")) + { + m_log.setMsgMask(m_log.getMsgMask() | LogMask::Debug); + } + else if (!strcasecmp(val, "none")) + { + m_log.setMsgMask(0); + } + else + { + m_log.Emsg("Config", "tpc.trace encountered an unknown directive (valid values: [all | error | warning | info | debug | none]):", val); + return false; + } + val = config_obj.GetWord(); + } while (val); + + return true; +} diff --git a/src/XrdTpc/XrdTpcMultistream.cc b/src/XrdTpc/XrdTpcMultistream.cc index d4387293ac0..900fd3198fe 100644 --- a/src/XrdTpc/XrdTpcMultistream.cc +++ b/src/XrdTpc/XrdTpcMultistream.cc @@ -208,23 +208,17 @@ class MultiCurlHandler { int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, - const char *log_prefix, size_t streams, - std::vector handles) + size_t streams, std::vector handles, TPCLogRecord &rec) { int result; bool success; CURL *curl = state.GetHandle(); - if ((result = DetermineXferSize(curl, req, state, success)) || !success) { + if ((result = DetermineXferSize(curl, req, state, success, rec)) || !success) { return result; } off_t content_size = state.GetContentLength(); off_t current_offset = 0; - { - std::stringstream ss; - ss << "Successfully determined remote size for pull request: " << content_size; - m_log.Emsg("ProcessPullReq", ss.str().c_str()); - } state.ResetAfterRequest(); size_t concurrency = streams * m_pipelining_multiplier; @@ -248,7 +242,12 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, // Start response to client prior to the first call to curl_multi_perform int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain"); if (retval) { + logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL", + "Failed to send the initial response to the TPC client"); return retval; + } else { + logTransferEvent(LogMask::Debug, rec, "RESPONSE_START", + "Initial transfer response sent to the TPC client"); } // Start assigning transfers @@ -264,7 +263,9 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, time_t now = time(NULL); time_t next_marker = last_marker + m_marker_period; if (now >= next_marker) { - if (SendPerfMarker(req, handles, current_offset)) { + if (SendPerfMarker(req, rec, handles, current_offset)) { + logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL", + "Failed to send a perf marker to the TPC client"); return -1; } last_marker = now; @@ -295,7 +296,8 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, } } while (msg); if (res != static_cast(-1) && res != CURLE_OK) { - m_log.Emsg(log_prefix, "Breaking loop due to failed curl transfer."); + logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_CURL_FAILURE", + "Breaking loop due to failed curl transfer"); break; } @@ -309,10 +311,12 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, std::stringstream ss; ss << "No handles are able to run. Streams=" << streams << ", concurrency=" << concurrency; - m_log.Emsg(log_prefix, ss.str().c_str()); + + logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE", ss.str()); } } else if (running_handles == 0) { - m_log.Emsg(log_prefix, "Unable to start new transfers; breaking loop."); + logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE", + "Unable to start new transfers; breaking loop."); break; } } @@ -330,7 +334,6 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, &fd_count); #endif if (mres != CURLM_OK) { - m_log.Emsg(log_prefix, "Breaking transfer due to failed curl multi wait."); break; } } while (running_handles); @@ -339,6 +342,7 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, std::stringstream ss; ss << "Internal libcurl multi-handle error: " << curl_multi_strerror(mres); + logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", ss.str()); throw std::runtime_error(ss.str()); } @@ -356,43 +360,58 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, } while (msg); if (res == static_cast(-1)) { // No transfers returned?!? + logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", + "Internal state error in libcurl"); throw std::runtime_error("Internal state error in libcurl"); } + rec.bytes_transferred = state.BytesTransferred(); + rec.tpc_status = state.GetStatusCode(); + // Generate the final response back to the client. std::stringstream ss; - if (res != CURLE_OK) { - m_log.Emsg(log_prefix, "request failed when processing", curl_easy_strerror(res)); + success = false; + if (state.GetStatusCode() >= 400) { + ss << "failure: Remote side failed with status code " << state.GetStatusCode(); + rec.status = state.GetStatusCode(); + logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str()); + } else if (res != CURLE_OK) { + std::stringstream ss2; + ss2 << "Request failed when processing: " << curl_easy_strerror(res); + logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str()); ss << "failure: " << curl_easy_strerror(res); } else if (current_offset != content_size) { ss << "failure: Internal logic error led to early abort; current offset is " << current_offset << " while full size is " << content_size; - m_log.Emsg(log_prefix, ss.str().c_str()); - } else if (state.GetStatusCode() >= 400) { - ss << "failure: Remote side failed with status code " << state.GetStatusCode(); - m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); + logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str()); } else { if (!handles[0]->Finalize()) { ss << "failure: Failed to finalize and close file handle."; - m_log.Emsg(log_prefix, "Failed to finalize file handle"); + logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", + "Failed to finalize and close file handle"); } else { ss << "success: Created"; + success = true; } } if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { + logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR", + "Failed to send last update to remote client"); return retval; + } else if (success) { + logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS"); } return req.ChunkResp(NULL, 0); } int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state, - const char *log_prefix, size_t streams) + size_t streams, TPCLogRecord &rec) { std::vector handles; try { - int retval = RunCurlWithStreamsImpl(req, state, log_prefix, streams, handles); + int retval = RunCurlWithStreamsImpl(req, state, streams, handles, rec); for (std::vector::iterator state_iter = handles.begin(); state_iter != handles.end(); state_iter++) { @@ -406,8 +425,9 @@ int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state, delete *state_iter; } - m_log.Emsg(log_prefix, e.what()); - return req.SendSimpleResp(500, NULL, NULL, e.what(), 0); + rec.status = 500; + logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what()); + return req.SendSimpleResp(rec.status, NULL, NULL, e.what(), 0); } catch (std::runtime_error &e) { for (std::vector::iterator state_iter = handles.begin(); state_iter != handles.end(); @@ -415,7 +435,7 @@ int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state, delete *state_iter; } - m_log.Emsg(log_prefix, e.what()); + logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what()); std::stringstream ss; ss << "failure: " << e.what(); int retval; diff --git a/src/XrdTpc/XrdTpcTPC.cc b/src/XrdTpc/XrdTpcTPC.cc index 0d5358848ed..b5ac507ac30 100644 --- a/src/XrdTpc/XrdTpcTPC.cc +++ b/src/XrdTpc/XrdTpcTPC.cc @@ -93,7 +93,6 @@ int TPCHandler::ProcessReq(XrdHttpExtReq &req) { header = req.headers.find("Source"); if (header != req.headers.end()) { std::string src = PrepareURL(header->second); - m_log.Emsg("ProcessReq", "Pull request from", src.c_str()); return ProcessPullReq(src, req); } header = req.headers.find("Destination"); @@ -110,7 +109,7 @@ TPCHandler::~TPCHandler() { TPCHandler::TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv) : m_desthttps(false), - m_log(*log), + m_log(log->logger(), "TPC_"), m_sfs(NULL) { if (!Configure(config, myEnv)) { @@ -138,12 +137,16 @@ std::string TPCHandler::GetAuthz(XrdHttpExtReq &req) { return authz; } -int TPCHandler::RedirectTransfer(const std::string &redirect_resource, XrdHttpExtReq &req, XrdOucErrInfo &error) { +int TPCHandler::RedirectTransfer(const std::string &redirect_resource, + XrdHttpExtReq &req, XrdOucErrInfo &error, TPCLogRecord &rec) +{ int port; const char *ptr = error.getErrText(port); if ((ptr == NULL) || (*ptr == '\0') || (port == 0)) { + rec.status = 500; char msg[] = "Internal error: redirect without hostname"; - return req.SendSimpleResp(500, NULL, NULL, msg, 0); + logTransferEvent(LogMask::Error, rec, "REDIRECT_INTERNAL_ERROR", msg); + return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } // Construct redirection URL taking into consideration any opaque info @@ -163,7 +166,10 @@ int TPCHandler::RedirectTransfer(const std::string &redirect_resource, XrdHttpEx ss << "?" << opaque; } - return req.SendSimpleResp(307, NULL, const_cast(ss.str().c_str()), NULL, 0); + rec.status = 307; + logTransferEvent(LogMask::Info, rec, "REDIRECT", ss.str()); + return req.SendSimpleResp(rec.status, NULL, const_cast(ss.str().c_str()), + NULL, 0); } int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource, @@ -203,33 +209,44 @@ int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource, * Determine size at remote end. */ int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state, - bool &success) { + bool &success, TPCLogRecord &rec) { success = false; curl_easy_setopt(curl, CURLOPT_NOBODY, 1); CURLcode res; res = curl_easy_perform(curl); if (res == CURLE_HTTP_RETURNED_ERROR) { - m_log.Emsg("DetermineXferSize", "Remote server failed request", curl_easy_strerror(res)); + std::stringstream ss; + ss << "Remote server failed request: " << curl_easy_strerror(res); + rec.status = 500; + logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str()); curl_easy_cleanup(curl); - return req.SendSimpleResp(500, NULL, NULL, const_cast(curl_easy_strerror(res)), 0); + return req.SendSimpleResp(rec.status, NULL, NULL, const_cast(curl_easy_strerror(res)), 0); } else if (state.GetStatusCode() >= 400) { std::stringstream ss; ss << "Remote side failed with status code " << state.GetStatusCode(); - m_log.Emsg("DetermineXferSize", "Remote server failed request", ss.str().c_str()); + rec.status = 500; + logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str()); curl_easy_cleanup(curl); - return req.SendSimpleResp(500, NULL, NULL, const_cast(ss.str().c_str()), 0); + return req.SendSimpleResp(rec.status, NULL, NULL, const_cast(ss.str().c_str()), 0); } else if (res) { - m_log.Emsg("DetermineXferSize", "Curl failed", curl_easy_strerror(res)); + std::stringstream ss; + ss << "HTTP library failed: " << curl_easy_strerror(res); + rec.status = 500; + logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str()); char msg[] = "Unknown internal transfer failure"; curl_easy_cleanup(curl); - return req.SendSimpleResp(500, NULL, NULL, msg, 0); + return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } + std::stringstream ss; + ss << "Successfully determined remote size for pull request: " + << state.GetContentLength(); + logTransferEvent(LogMask::Debug, rec, "SIZE_SUCCESS", ss.str()); curl_easy_setopt(curl, CURLOPT_NOBODY, 0); success = true; return 0; } -int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPC::State &state) { +int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state) { std::stringstream ss; const std::string crlf = "\n"; ss << "Perf Marker" << crlf; @@ -243,11 +260,13 @@ int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPC::State &state) { if (!desc.empty()) ss << "RemoteConnections: " << desc << crlf; ss << "End" << crlf; + rec.bytes_transferred = state.BytesTransferred(); + logTransferEvent(LogMask::Debug, rec, "PERF_MARKER"); return req.ChunkResp(ss.str().c_str(), 0); } -int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, std::vector &state, +int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector &state, off_t bytes_transferred) { // The 'performance marker' format is largely derived from how GridFTP works @@ -285,31 +304,37 @@ int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, std::vector &state, if (!first) ss << "RemoteConnections: " << ss2.str() << crlf; ss << "End" << crlf; + rec.bytes_transferred = bytes_transferred; + logTransferEvent(LogMask::Debug, rec, "PERF_MARKER"); return req.ChunkResp(ss.str().c_str(), 0); } int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, - const char *log_prefix) + TPCLogRecord &rec) { // Create the multi-handle and add in the current transfer to it. CURLM *multi_handle = curl_multi_init(); if (!multi_handle) { - m_log.Emsg(log_prefix, "Failed to initialize a libcurl multi-handle"); + rec.status = 500; + logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL", + "Failed to initialize a libcurl multi-handle"); char msg[] = "Failed to initialize internal server memory"; curl_easy_cleanup(curl); - return req.SendSimpleResp(500, NULL, NULL, msg, 0); + return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } CURLMcode mres; mres = curl_multi_add_handle(multi_handle, curl); if (mres) { - m_log.Emsg(log_prefix, "Failed to add transfer to libcurl multi-handle", - curl_multi_strerror(mres)); + rec.status = 500; + std::stringstream ss; + ss << "Failed to add transfer to libcurl multi-handle: " << curl_multi_strerror(mres); + logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL", ss.str()); char msg[] = "Failed to initialize internal server handle"; curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); - return req.SendSimpleResp(500, NULL, NULL, msg, 0); + return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } // Start response to client prior to the first call to curl_multi_perform @@ -317,7 +342,12 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, if (retval) { curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); + logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL", + "Failed to send the initial response to the TPC client"); return retval; + } else { + logTransferEvent(LogMask::Debug, rec, "RESPONSE_START", + "Initial transfer response sent to the TPC client"); } // Transfer loop: use curl to actually run the transfer, but periodically @@ -329,10 +359,12 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, time_t now = time(NULL); time_t next_marker = last_marker + m_marker_period; if (now >= next_marker) { - if (SendPerfMarker(req, state)) { + if (SendPerfMarker(req, rec, state)) { curl_multi_remove_handle(multi_handle, curl); curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); + logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL", + "Failed to send a perf marker to the TPC client"); return -1; } last_marker = now; @@ -378,14 +410,18 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, } while (running_handles); if (mres != CURLM_OK) { - m_log.Emsg(log_prefix, "Internal libcurl multi-handle error", - curl_multi_strerror(mres)); + std::stringstream ss; + ss << "Internal libcurl multi-handle error: " << curl_multi_strerror(mres); + logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", ss.str()); + char msg[] = "Internal server error due to libcurl"; curl_multi_remove_handle(multi_handle, curl); curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); if ((retval = req.ChunkResp(msg, 0))) { + logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL", + "Failed to send error message to the TPC client"); return retval; } return req.ChunkResp(NULL, 0); @@ -409,34 +445,49 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); char msg[] = "Internal state error in libcurl"; - m_log.Emsg(log_prefix, msg); + logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", msg); if ((retval = req.ChunkResp(msg, 0))) { + logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL", + "Failed to send error message to the TPC client"); return retval; } return req.ChunkResp(NULL, 0); } curl_multi_cleanup(multi_handle); + rec.bytes_transferred = state.BytesTransferred(); + rec.tpc_status = state.GetStatusCode(); + // Generate the final response back to the client. std::stringstream ss; + bool success = false; if (state.GetStatusCode() >= 400) { std::string err = state.GetErrorMessage(); - ss << "failure: Remote side failed with status code " << state.GetStatusCode(); + std::stringstream ss2; + ss2 << "Remote side failed with status code " << state.GetStatusCode(); if (!err.empty()) { std::replace(err.begin(), err.end(), '\n', ' '); - ss << "; error message: \"" << err << "\""; + ss2 << "; error message: \"" << err << "\""; } - m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); + logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str()); + ss << "failure: " << ss2.str(); } else if (res != CURLE_OK) { - m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res)); + std::stringstream ss2; + ss2 << "HTTP library failure: " << curl_easy_strerror(res); + logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str()); ss << "failure: " << curl_easy_strerror(res); } else { ss << "success: Created"; + success = true; } if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { + logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR", + "Failed to send last update to remote client"); return retval; + } else if (success) { + logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS"); } return req.ChunkResp(NULL, 0); } @@ -468,12 +519,21 @@ int TPCHandler::RunCurlBasic(CURL *curl, XrdHttpExtReq &req, State &state, #endif int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) { - m_log.Emsg("ProcessPushReq", "Starting a push request for resource", resource.c_str()); + TPCLogRecord rec; + rec.log_prefix = "PushRequest"; + rec.local = req.resource; + rec.remote = resource; + rec.name = req.GetSecEntity().name; + char *name = req.GetSecEntity().name; + if (name) rec.name = name; + logTransferEvent(LogMask::Info, rec, "PUSH_START", "Starting a push request"); CURL *curl = curl_easy_init(); curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1); if (!curl) { char msg[] = "Failed to initialize internal transfer resources"; - return req.SendSimpleResp(500, NULL, NULL, msg, 0); + rec.status = 500; + logTransferEvent(LogMask::Error, rec, "PUSH_FAIL", msg); + return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } auto query_header = req.headers.find("xrd-http-fullresource"); std::string redirect_resource = req.resource; @@ -481,15 +541,17 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) redirect_resource = query_header->second; } - char *name = req.GetSecEntity().name; AtomicBeg(m_monid_mutex); uint64_t file_monid = AtomicInc(m_monid); AtomicEnd(m_monid_mutex); std::unique_ptr fh(m_sfs->newFile(name, file_monid)); if (!fh.get()) { curl_easy_cleanup(curl); + rec.status = 500; + logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", + "Failed to initialize internal transfer file handle"); char msg[] = "Failed to initialize internal transfer file handle"; - return req.SendSimpleResp(500, NULL, NULL, msg, 0); + return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } std::string full_url = prepareURL(req); @@ -499,16 +561,18 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) req.GetSecEntity(), authz); if (SFS_REDIRECT == open_results) { curl_easy_cleanup(curl); - return RedirectTransfer(redirect_resource, req, fh->error); + return RedirectTransfer(redirect_resource, req, fh->error, rec); } else if (SFS_OK != open_results) { curl_easy_cleanup(curl); int code; char msg_generic[] = "Failed to open local resource"; const char *msg = fh->error.getErrText(code); if (msg == NULL) msg = msg_generic; - int status_code = 400; - if (code == EACCES) status_code = 401; - int resp_result = req.SendSimpleResp(status_code, NULL, NULL, + rec.status = 400; + if (code == EACCES) rec.status = 401; + else if (code == EEXIST) rec.status = 412; + logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", msg); + int resp_result = req.SendSimpleResp(rec.status, NULL, NULL, const_cast(msg), 0); fh->close(); return resp_result; @@ -523,25 +587,35 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) state.CopyHeaders(req); #ifdef XRD_CHUNK_RESP - return RunCurlWithUpdates(curl, req, state, "ProcessPushReq"); + return RunCurlWithUpdates(curl, req, state, rec); #else return RunCurlBasic(curl, req, state, "ProcessPushReq"); #endif } int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) { + TPCLogRecord rec; + rec.log_prefix = "PullRequest"; + rec.local = req.resource; + rec.remote = resource; + char *name = req.GetSecEntity().name; + if (name) rec.name = name; + logTransferEvent(LogMask::Info, rec, "PULL_START", "Starting a push request"); CURL *curl = curl_easy_init(); curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1); if (!curl) { char msg[] = "Failed to initialize internal transfer resources"; - return req.SendSimpleResp(500, NULL, NULL, msg, 0); + rec.status = 500; + logTransferEvent(LogMask::Error, rec, "PULL_FAIL", msg); + return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } - char *name = req.GetSecEntity().name; std::unique_ptr fh(m_sfs->newFile(name, m_monid++)); if (!fh.get()) { curl_easy_cleanup(curl); char msg[] = "Failed to initialize internal transfer file handle"; - return req.SendSimpleResp(500, NULL, NULL, msg, 0); + rec.status = 500; + logTransferEvent(LogMask::Error, rec, "PULL_FAIL", msg); + return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } auto query_header = req.headers.find("xrd-http-fullresource"); std::string redirect_resource = req.resource; @@ -565,12 +639,14 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) if (stream_req < 0 || stream_req > 100) { curl_easy_cleanup(curl); char msg[] = "Invalid request for number of streams"; - m_log.Emsg("ProcessPullReq", msg); - return req.SendSimpleResp(500, NULL, NULL, msg, 0); + rec.status = 500; + logTransferEvent(LogMask::Info, rec, "INVALID_REQUEST", msg); + return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } streams = streams == 0 ? 1 : stream_req; } } + rec.streams = streams; std::string full_url = prepareURL(req); std::string authz = GetAuthz(req); @@ -578,17 +654,18 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) req.GetSecEntity(), authz); if (SFS_REDIRECT == open_result) { curl_easy_cleanup(curl); - return RedirectTransfer(redirect_resource, req, fh->error); + return RedirectTransfer(redirect_resource, req, fh->error, rec); } else if (SFS_OK != open_result) { curl_easy_cleanup(curl); int code; char msg_generic[] = "Failed to open local resource"; const char *msg = fh->error.getErrText(code); if ((msg == NULL) || (*msg == '\0')) msg = msg_generic; - int status_code = 400; - if (code == EACCES) status_code = 401; - if (code == EEXIST) status_code = 412; - int resp_result = req.SendSimpleResp(status_code, NULL, NULL, + rec.status = 400; + if (code == EACCES) rec.status = 401; + else if (code == EEXIST) rec.status = 412; + logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", msg); + int resp_result = req.SendSimpleResp(rec.status, NULL, NULL, const_cast(msg), 0); fh->close(); return resp_result; @@ -603,9 +680,9 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) #ifdef XRD_CHUNK_RESP if (streams > 1) { - return RunCurlWithStreams(req, state, "ProcessPullReq", streams); + return RunCurlWithStreams(req, state, streams, rec); } else { - return RunCurlWithUpdates(curl, req, state, "ProcessPullReq"); + return RunCurlWithUpdates(curl, req, state, rec); } #else return RunCurlBasic(curl, req, state, "ProcessPullReq"); @@ -613,24 +690,49 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) } +void TPCHandler::logTransferEvent(LogMask mask, const TPCLogRecord &rec, + const std::string &event, const std::string &message) +{ + if (!(m_log.getMsgMask() & mask)) {return;} + + std::stringstream ss; + ss << "event=" << event << ", local=" << rec.local << ", remote=" << rec.remote; + if (rec.name.empty()) + ss << ", user=(anonymous)"; + else + ss << ", user=" << rec.name; + if (rec.streams != 1) + ss << ", streams=" << rec.streams; + if (rec.bytes_transferred >= 0) + ss << ", bytes_transferred=" << rec.bytes_transferred; + if (rec.status >= 0) + ss << ", status=" << rec.status; + if (rec.tpc_status >= 0) + ss << ", tpc_status=" << rec.tpc_status; + if (!message.empty()) + ss << "; " << message; + m_log.Log(mask, rec.log_prefix.c_str(), ss.str().c_str()); +} + + extern "C" { XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, const char * /*parms*/, XrdOucEnv *myEnv) { if (curl_global_init(CURL_GLOBAL_DEFAULT)) { - log->Emsg("Initialize", "libcurl failed to initialize"); + log->Emsg("TPCInitialize", "libcurl failed to initialize"); return NULL; } TPCHandler *retval{NULL}; if (!config) { - log->Emsg("Initialize", "TPC handler requires a config filename in order to load"); + log->Emsg("TPCInitialize", "TPC handler requires a config filename in order to load"); return NULL; } try { - log->Emsg("Initialize", "Will load configuration for the TPC handler from", config); + log->Emsg("TPCInitialize", "Will load configuration for the TPC handler from", config); retval = new TPCHandler(log, config, myEnv); } catch (std::runtime_error &re) { - log->Emsg("Initialize", "Encountered a runtime failure when loading ", re.what()); + log->Emsg("TPCInitialize", "Encountered a runtime failure when loading ", re.what()); //printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*")); } return retval; diff --git a/src/XrdTpc/XrdTpcTPC.hh b/src/XrdTpc/XrdTpcTPC.hh index f3484290467..5df7568be45 100644 --- a/src/XrdTpc/XrdTpcTPC.hh +++ b/src/XrdTpc/XrdTpcTPC.hh @@ -17,6 +17,14 @@ typedef void CURL; namespace TPC { class State; +enum LogMask { + Debug = 0x01, + Info = 0x02, + Warning = 0x04, + Error = 0x08, + All = 0xff +}; + class TPCHandler : public XrdHttpExtHandler { public: TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv); @@ -28,11 +36,25 @@ public: virtual int Init(const char *cfgfile) {return 0;} private: + + struct TPCLogRecord { + std::string log_prefix; + std::string local; + std::string remote; + std::string name; + int status{-1}; + int tpc_status{-1}; + unsigned streams{1}; + off_t bytes_transferred{-1}; + }; + int ProcessOptionsReq(XrdHttpExtReq &req); static std::string GetAuthz(XrdHttpExtReq &req); - int RedirectTransfer(const std::string &redirect_resource, XrdHttpExtReq &req, XrdOucErrInfo &error); + // Redirect the transfer according to the contents of an XrdOucErrInfo object. + int RedirectTransfer(const std::string &redirect_resource, XrdHttpExtReq &req, + XrdOucErrInfo &error, TPCLogRecord &); int OpenWaitStall(XrdSfsFile &fh, const std::string &resource, int mode, int openMode, const XrdSecEntity &sec, @@ -40,26 +62,26 @@ private: #ifdef XRD_CHUNK_RESP int DetermineXferSize(CURL *curl, XrdHttpExtReq &req, TPC::State &state, - bool &success); + bool &success, TPCLogRecord &); // Send a 'performance marker' back to the TPC client, informing it of our // progress. The TPC client will use this information to determine whether // the transfer is making sufficient progress and/or other monitoring info // (such as whether the transfer is happening over IPv4, IPv6, or both). - int SendPerfMarker(XrdHttpExtReq &req, TPC::State &state); - int SendPerfMarker(XrdHttpExtReq &req, std::vector &state, + int SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state); + int SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector &state, off_t bytes_transferred); // Perform the libcurl transfer, periodically sending back chunked updates. int RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, TPC::State &state, - const char *log_prefix); + TPCLogRecord &rec); // Experimental multi-stream version of RunCurlWithUpdates int RunCurlWithStreams(XrdHttpExtReq &req, TPC::State &state, - const char *log_prefix, size_t streams); + size_t streams, TPCLogRecord &rec); int RunCurlWithStreamsImpl(XrdHttpExtReq &req, TPC::State &state, - const char *log_prefix, size_t streams, - std::vector streams_handles); + size_t streams, std::vector streams_handles, + TPCLogRecord &rec); #else int RunCurlBasic(CURL *curl, XrdHttpExtReq &req, TPC::State &state, const char *log_prefix); @@ -71,6 +93,11 @@ private: bool ConfigureFSLib(XrdOucStream &Config, std::string &path1, bool &path1_alt, std::string &path2, bool &path2_alt); bool Configure(const char *configfn, XrdOucEnv *myEnv); + bool ConfigureLogger(XrdOucStream &Config); + + // Generate a consistently-formatted log message. + void logTransferEvent(LogMask lvl, const TPCLogRecord &record, + const std::string &event, const std::string &message=""); static int m_marker_period; static size_t m_block_size; @@ -78,7 +105,7 @@ private: std::string m_cadir; static XrdSysMutex m_monid_mutex; static uint64_t m_monid; - XrdSysError &m_log; + XrdSysError m_log; XrdSfsFileSystem *m_sfs; // 16 blocks in flight at 16 MB each, meaning that there will be up to 256MB