Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XrdTpc: Close file handle before final HTTP response #1316

Merged
merged 2 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/XrdTpc/XrdTpcMultistream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <curl/curl.h>

#include <algorithm>
#include <sstream>
#include <stdexcept>

Expand Down Expand Up @@ -366,6 +367,7 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
}

state.Flush();
state.Finalize();

rec.bytes_transferred = state.BytesTransferred();
rec.tpc_status = state.GetStatusCode();
Expand All @@ -374,9 +376,23 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
std::stringstream ss;
success = false;
if (state.GetStatusCode() >= 400) {
ss << "failure: Remote side failed with status code " << state.GetStatusCode();
rec.status = state.GetStatusCode();
std::string err = state.GetErrorMessage();
std::stringstream ss2;
ss2 << "Remote side failed with status code " << state.GetStatusCode();
if (!err.empty()) {
std::replace(err.begin(), err.end(), '\n', ' ');
ss2 << "; error message: \"" << err << "\"";
}
logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str());
ss << "failure: " << ss2.str();
} else if (state.GetErrorCode()) {
std::string err = state.GetErrorMessage();
if (err.empty()) {err = "(no error message provided)";}
else {std::replace(err.begin(), err.end(), '\n', ' ');}
std::stringstream ss2;
ss2 << "Error when interacting with local filesystem: " << err;
logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
ss << "failure: " << ss2.str();
} else if (res != CURLE_OK) {
std::stringstream ss2;
ss2 << "Request failed when processing: " << curl_easy_strerror(res);
Expand Down
9 changes: 8 additions & 1 deletion src/XrdTpc/XrdTpcState.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ int State::Write(char *buffer, size_t size) {
int retval = m_stream->Write(m_start_offset + m_offset, buffer, size, false);
if (retval == SFS_ERROR) {
m_error_buf = m_stream->GetErrorMessage();
m_error_code = 1;
return -1;
}
m_offset += retval;
Expand All @@ -202,6 +203,7 @@ int State::Flush() {
int retval = m_stream->Write(m_start_offset + m_offset, nullptr, 0, true);
if (retval == SFS_ERROR) {
m_error_buf = m_stream->GetErrorMessage();
m_error_code = 2;
return -1;
}
m_offset += retval;
Expand Down Expand Up @@ -269,7 +271,12 @@ void State::DumpBuffers() const

bool State::Finalize()
{
return m_stream->Finalize();
if (!m_stream->Finalize()) {
m_error_buf = m_stream->GetErrorMessage();
m_error_code = 3;
return false;
}
return true;
}

std::string State::GetConnectionDescription()
Expand Down
5 changes: 5 additions & 0 deletions src/XrdTpc/XrdTpcState.hh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public:
m_offset(0),
m_start_offset(0),
m_status_code(-1),
m_error_code(0),
m_content_length(-1),
m_stream(NULL),
m_curl(NULL),
Expand All @@ -42,6 +43,7 @@ public:
m_offset(0),
m_start_offset(start_offset),
m_status_code(-1),
m_error_code(0),
m_content_length(-1),
m_stream(&stream),
m_curl(curl),
Expand All @@ -60,6 +62,8 @@ public:

off_t GetContentLength() const {return m_content_length;}

int GetErrorCode() const {return m_error_code;}

int GetStatusCode() const {return m_status_code;}

std::string GetErrorMessage() const {return m_error_buf;}
Expand Down Expand Up @@ -126,6 +130,7 @@ private:
off_t m_offset; // number of bytes we have received.
off_t m_start_offset; // offset where we started in the file.
int m_status_code; // status code from HTTP response.
int m_error_code; // error code from underlying stream operations.
off_t m_content_length; // value of Content-Length header, if we received one.
Stream *m_stream; // stream corresponding to this transfer.
CURL *m_curl; // libcurl handle
Expand Down
18 changes: 14 additions & 4 deletions src/XrdTpc/XrdTpcStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,24 @@ Stream::Finalize()
if (!m_open_for_write) {
return false;
}
m_open_for_write = false;

for (std::vector<Entry*>::iterator buffer_iter = m_buffers.begin();
buffer_iter != m_buffers.end();
buffer_iter++) {
delete *buffer_iter;
*buffer_iter = NULL;
}
m_fh->close();
m_open_for_write = false;

if (m_fh->close() == SFS_ERROR) {
std::stringstream ss;
const char *msg = m_fh->error.getErrText();
if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
ss << "Failure when closing file handle: " << msg << " (code=" << m_fh->error.getErrInfo() << ")";
m_error_buf = ss.str();
return false;
}

// If there are outstanding buffers to reorder, finalization failed
return m_avail_count == m_buffers.size();
}
Expand Down Expand Up @@ -75,7 +85,7 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force)
std::stringstream ss;
const char *msg = m_fh->error.getErrText();
if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
ss << m_fh->error.getErrText() << " (code=" << m_fh->error.getErrInfo() << ")";
ss << msg << " (code=" << m_fh->error.getErrInfo() << ")";
m_error_buf = ss.str();
}
// If there are no in-use buffers, then we don't need to
Expand Down Expand Up @@ -113,7 +123,7 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force)
} while ((avail_count != m_buffers.size()) && buffer_was_written);
m_avail_count = avail_count;

if (!buffer_accepted) { // No place for this data in allocated buffers
if (!buffer_accepted && size) { // No place for this data in allocated buffers
if (!avail_entry) { // No available buffers to allocate.
return SFS_ERROR;
}
Expand Down
25 changes: 24 additions & 1 deletion src/XrdTpc/XrdTpcTPC.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,12 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
rec.bytes_transferred = state.BytesTransferred();
rec.tpc_status = state.GetStatusCode();

// Explicitly finalize the stream (which will close the underlying file
// handle) before the response is sent. In some cases, subsequent HTTP
// requests can occur before the filesystem is done closing the handle -
// and those requests may occur against partial data.
state.Finalize();

// Generate the final response back to the client.
std::stringstream ss;
bool success = false;
Expand All @@ -506,6 +512,14 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
}
logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
ss << "failure: " << ss2.str();
} else if (state.GetErrorCode()) {
std::string err = state.GetErrorMessage();
if (err.empty()) {err = "(no error message provided)";}
else {std::replace(err.begin(), err.end(), '\n', ' ');}
std::stringstream ss2;
ss2 << "Error when interacting with local filesystem: " << err;
logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
ss << "failure: " << ss2.str();
} else if (res != CURLE_OK) {
std::stringstream ss2;
ss2 << "HTTP library failure: " << curl_easy_strerror(res);
Expand All @@ -532,7 +546,16 @@ int TPCHandler::RunCurlBasic(CURL *curl, XrdHttpExtReq &req, State &state,
res = curl_easy_perform(curl);
curl_easy_cleanup(curl);
state.Flush();
if (res == CURLE_HTTP_RETURNED_ERROR) {
state.Finalize();
if (state.GetErrorCode()) {
std::string err = state.GetErrorMessage();
if (err.empty()) {err = "(no error message provided)";}
else {std::replace(err.begin(), err.end(), '\n', ' ');}
std::stringstream ss2;
ss2 << "Error when interacting with local filesystem: " << err;
logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
ss << "failure: " << ss2.str();
} else if (res == CURLE_HTTP_RETURNED_ERROR) {
m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res));
return req.SendSimpleResp(500, NULL, NULL,
const_cast<char *>(curl_easy_strerror(res)), 0);
Expand Down