diff --git a/src/XrdTpc/XrdTpcMultistream.cc b/src/XrdTpc/XrdTpcMultistream.cc index 1d1ac7a4593..84b7fc8703b 100644 --- a/src/XrdTpc/XrdTpcMultistream.cc +++ b/src/XrdTpc/XrdTpcMultistream.cc @@ -12,6 +12,7 @@ #include +#include #include #include @@ -366,6 +367,7 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, } state.Flush(); + state.Finalize(); rec.bytes_transferred = state.BytesTransferred(); rec.tpc_status = state.GetStatusCode(); @@ -374,9 +376,23 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, std::stringstream ss; success = false; if (state.GetStatusCode() >= 400) { - ss << "failure: Remote side failed with status code " << state.GetStatusCode(); - rec.status = state.GetStatusCode(); + std::string err = state.GetErrorMessage(); + std::stringstream ss2; + ss2 << "Remote side failed with status code " << state.GetStatusCode(); + if (!err.empty()) { + std::replace(err.begin(), err.end(), '\n', ' '); + ss2 << "; error message: \"" << err << "\""; + } logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str()); + ss << "failure: " << ss2.str(); + } else if (state.GetErrorCode()) { + std::string err = state.GetErrorMessage(); + if (err.empty()) {err = "(no error message provided)";} + else {std::replace(err.begin(), err.end(), '\n', ' ');} + std::stringstream ss2; + ss2 << "Error when interacting with local filesystem: " << err; + logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str()); + ss << "failure: " << ss2.str(); } else if (res != CURLE_OK) { std::stringstream ss2; ss2 << "Request failed when processing: " << curl_easy_strerror(res); diff --git a/src/XrdTpc/XrdTpcState.cc b/src/XrdTpc/XrdTpcState.cc index ae8bbd3bcb1..39707dfbe43 100644 --- a/src/XrdTpc/XrdTpcState.cc +++ b/src/XrdTpc/XrdTpcState.cc @@ -192,6 +192,7 @@ int State::Write(char *buffer, size_t size) { int retval = m_stream->Write(m_start_offset + m_offset, buffer, size, false); if (retval == SFS_ERROR) { m_error_buf = m_stream->GetErrorMessage(); + m_error_code = 1; return -1; } m_offset += retval; @@ -202,6 +203,7 @@ int State::Flush() { int retval = m_stream->Write(m_start_offset + m_offset, nullptr, 0, true); if (retval == SFS_ERROR) { m_error_buf = m_stream->GetErrorMessage(); + m_error_code = 2; return -1; } m_offset += retval; @@ -269,7 +271,12 @@ void State::DumpBuffers() const bool State::Finalize() { - return m_stream->Finalize(); + if (!m_stream->Finalize()) { + m_error_buf = m_stream->GetErrorMessage(); + m_error_code = 3; + return false; + } + return true; } std::string State::GetConnectionDescription() diff --git a/src/XrdTpc/XrdTpcState.hh b/src/XrdTpc/XrdTpcState.hh index 6cade1483c4..302cb7faa5a 100644 --- a/src/XrdTpc/XrdTpcState.hh +++ b/src/XrdTpc/XrdTpcState.hh @@ -26,6 +26,7 @@ public: m_offset(0), m_start_offset(0), m_status_code(-1), + m_error_code(0), m_content_length(-1), m_stream(NULL), m_curl(NULL), @@ -42,6 +43,7 @@ public: m_offset(0), m_start_offset(start_offset), m_status_code(-1), + m_error_code(0), m_content_length(-1), m_stream(&stream), m_curl(curl), @@ -60,6 +62,8 @@ public: off_t GetContentLength() const {return m_content_length;} + int GetErrorCode() const {return m_error_code;} + int GetStatusCode() const {return m_status_code;} std::string GetErrorMessage() const {return m_error_buf;} @@ -126,6 +130,7 @@ private: off_t m_offset; // number of bytes we have received. off_t m_start_offset; // offset where we started in the file. int m_status_code; // status code from HTTP response. + int m_error_code; // error code from underlying stream operations. off_t m_content_length; // value of Content-Length header, if we received one. Stream *m_stream; // stream corresponding to this transfer. CURL *m_curl; // libcurl handle diff --git a/src/XrdTpc/XrdTpcStream.cc b/src/XrdTpc/XrdTpcStream.cc index ba78d9cdfb9..29f27b40ea3 100644 --- a/src/XrdTpc/XrdTpcStream.cc +++ b/src/XrdTpc/XrdTpcStream.cc @@ -27,14 +27,24 @@ Stream::Finalize() if (!m_open_for_write) { return false; } + m_open_for_write = false; + for (std::vector::iterator buffer_iter = m_buffers.begin(); buffer_iter != m_buffers.end(); buffer_iter++) { delete *buffer_iter; *buffer_iter = NULL; } - m_fh->close(); - m_open_for_write = false; + + if (m_fh->close() == SFS_ERROR) { + std::stringstream ss; + const char *msg = m_fh->error.getErrText(); + if (!msg || (*msg == '\0')) {msg = "(no error message provided)";} + ss << "Failure when closing file handle: " << msg << " (code=" << m_fh->error.getErrInfo() << ")"; + m_error_buf = ss.str(); + return false; + } + // If there are outstanding buffers to reorder, finalization failed return m_avail_count == m_buffers.size(); } @@ -75,7 +85,7 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force) std::stringstream ss; const char *msg = m_fh->error.getErrText(); if (!msg || (*msg == '\0')) {msg = "(no error message provided)";} - ss << m_fh->error.getErrText() << " (code=" << m_fh->error.getErrInfo() << ")"; + ss << msg << " (code=" << m_fh->error.getErrInfo() << ")"; m_error_buf = ss.str(); } // If there are no in-use buffers, then we don't need to @@ -113,7 +123,7 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force) } while ((avail_count != m_buffers.size()) && buffer_was_written); m_avail_count = avail_count; - if (!buffer_accepted) { // No place for this data in allocated buffers + if (!buffer_accepted && size) { // No place for this data in allocated buffers if (!avail_entry) { // No available buffers to allocate. return SFS_ERROR; } diff --git a/src/XrdTpc/XrdTpcTPC.cc b/src/XrdTpc/XrdTpcTPC.cc index fcaf17387b3..0fcf9e262d5 100644 --- a/src/XrdTpc/XrdTpcTPC.cc +++ b/src/XrdTpc/XrdTpcTPC.cc @@ -493,6 +493,12 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, rec.bytes_transferred = state.BytesTransferred(); rec.tpc_status = state.GetStatusCode(); + // Explicitly finalize the stream (which will close the underlying file + // handle) before the response is sent. In some cases, subsequent HTTP + // requests can occur before the filesystem is done closing the handle - + // and those requests may occur against partial data. + state.Finalize(); + // Generate the final response back to the client. std::stringstream ss; bool success = false; @@ -506,6 +512,14 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, } logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str()); ss << "failure: " << ss2.str(); + } else if (state.GetErrorCode()) { + std::string err = state.GetErrorMessage(); + if (err.empty()) {err = "(no error message provided)";} + else {std::replace(err.begin(), err.end(), '\n', ' ');} + std::stringstream ss2; + ss2 << "Error when interacting with local filesystem: " << err; + logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str()); + ss << "failure: " << ss2.str(); } else if (res != CURLE_OK) { std::stringstream ss2; ss2 << "HTTP library failure: " << curl_easy_strerror(res); @@ -532,7 +546,16 @@ int TPCHandler::RunCurlBasic(CURL *curl, XrdHttpExtReq &req, State &state, res = curl_easy_perform(curl); curl_easy_cleanup(curl); state.Flush(); - if (res == CURLE_HTTP_RETURNED_ERROR) { + state.Finalize(); + if (state.GetErrorCode()) { + std::string err = state.GetErrorMessage(); + if (err.empty()) {err = "(no error message provided)";} + else {std::replace(err.begin(), err.end(), '\n', ' ');} + std::stringstream ss2; + ss2 << "Error when interacting with local filesystem: " << err; + logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str()); + ss << "failure: " << ss2.str(); + } else if (res == CURLE_HTTP_RETURNED_ERROR) { m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res)); return req.SendSimpleResp(500, NULL, NULL, const_cast(curl_easy_strerror(res)), 0);