Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix error handling on stream write errors #1387

Merged
merged 5 commits into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/XrdTpc/XrdTpcState.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ size_t State::WriteCB(void *buffer, size_t size, size_t nitems, void *userdata)
return obj->Write(static_cast<char*>(buffer), size*nitems);
}

int State::Write(char *buffer, size_t size) {
int retval = m_stream->Write(m_start_offset + m_offset, buffer, size, false);
ssize_t State::Write(char *buffer, size_t size) {
ssize_t retval = m_stream->Write(m_start_offset + m_offset, buffer, size, false);
if (retval == SFS_ERROR) {
m_error_buf = m_stream->GetErrorMessage();
m_error_code = 1;
Expand All @@ -204,7 +204,7 @@ int State::Flush() {
return 0;
}

int retval = m_stream->Write(m_start_offset + m_offset, 0, 0, true);
ssize_t retval = m_stream->Write(m_start_offset + m_offset, 0, 0, true);
if (retval == SFS_ERROR) {
m_error_buf = m_stream->GetErrorMessage();
m_error_code = 2;
Expand Down
2 changes: 1 addition & 1 deletion src/XrdTpc/XrdTpcState.hh
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private:
void *userdata);
int Header(const std::string &header);
static size_t WriteCB(void *buffer, size_t size, size_t nitems, void *userdata);
int Write(char *buffer, size_t size);
ssize_t Write(char *buffer, size_t size);
static size_t ReadCB(void *buffer, size_t size, size_t nitems, void *userdata);
int Read(char *buffer, size_t size);

Expand Down
11 changes: 8 additions & 3 deletions src/XrdTpc/XrdTpcStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Stream::Stat(struct stat* buf)
return m_fh->stat(buf);
}

int
ssize_t
Stream::Write(off_t offset, const char *buf, size_t size, bool force)
{
/*
Expand Down Expand Up @@ -87,6 +87,9 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force)
if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
ss << msg << " (code=" << m_fh->error.getErrInfo() << ")";
m_error_buf = ss.str();
// Write failed; we don't care if there's any more buffer we
// can dump to disk later.
return retval;
}
// If there are no in-use buffers, then we don't need to
// do any accounting.
Expand All @@ -109,9 +112,11 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force)
entry_iter++) {
// Always try to dump from memory; when size == 0, then we are
// going to force a flush even if things are not MB-aligned.
if ((*entry_iter)->Write(*this, size == 0) > 0) {
buffer_was_written = true;
int retval2 = (*entry_iter)->Write(*this, size == 0);
if (retval2 == SFS_ERROR) {
return retval2;
}
buffer_was_written |= retval2 > 0;
if ((*entry_iter)->Available()) { // Empty buffer
if (!avail_entry) {avail_entry = *entry_iter;}
avail_count ++;
Expand Down
18 changes: 15 additions & 3 deletions src/XrdTpc/XrdTpcStream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,17 @@ public:

int Read(off_t offset, char *buffer, size_t size);

int Write(off_t offset, const char *buffer, size_t size, bool force);
// Writes a buffer of a given size to an offset.
// This will often keep the buffer in memory in to present the underlying
// filesystem with a single stream of data (required for HDFS); further,
// it will also buffer to align the writes on a 1MB boundary (required
// for some RADOS configurations). When force is set to true, it will
// skip the buffering and always write (this should only be done at the
// end of a stream!).
//
// Returns the number of bytes written; on error, returns -1 and sets
// the error code and error message for the stream
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force);

size_t AvailableBuffers() const {return m_avail_count;}

Expand Down Expand Up @@ -79,8 +89,10 @@ private:
size_desired -= (size_desired % (1024*1024));
if (!size_desired) {return 0;}
}
int retval = stream.Write(m_offset, &m_buffer[0], size_desired, force);
if (retval < 0 && (static_cast<size_t>(retval) != size_desired)) {
ssize_t retval = stream.Write(m_offset, &m_buffer[0], size_desired, force);
// Currently the only valid negative value is SFS_ERROR (-1); checking for
// all negative values to future-proof the code.
if ((retval < 0) || (static_cast<size_t>(retval) != size_desired)) {
return -1;
}
// If partial data remains, copy it to the beginning of the buffer.
Expand Down