Skip to content

Commit

Permalink
Merge pull request #1387 from bbockelm/xrdtpc_stream_write_errors
Browse files Browse the repository at this point in the history
Fix error handling on stream write errors
  • Loading branch information
abh3 committed Jan 29, 2021
2 parents c6e3d96 + 1f4a2db commit 9e7d834
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 10 deletions.
6 changes: 3 additions & 3 deletions src/XrdTpc/XrdTpcState.cc
Expand Up @@ -188,8 +188,8 @@ size_t State::WriteCB(void *buffer, size_t size, size_t nitems, void *userdata)
return obj->Write(static_cast<char*>(buffer), size*nitems);
}

int State::Write(char *buffer, size_t size) {
int retval = m_stream->Write(m_start_offset + m_offset, buffer, size, false);
ssize_t State::Write(char *buffer, size_t size) {
ssize_t retval = m_stream->Write(m_start_offset + m_offset, buffer, size, false);
if (retval == SFS_ERROR) {
m_error_buf = m_stream->GetErrorMessage();
m_error_code = 1;
Expand All @@ -204,7 +204,7 @@ int State::Flush() {
return 0;
}

int retval = m_stream->Write(m_start_offset + m_offset, 0, 0, true);
ssize_t retval = m_stream->Write(m_start_offset + m_offset, 0, 0, true);
if (retval == SFS_ERROR) {
m_error_buf = m_stream->GetErrorMessage();
m_error_code = 2;
Expand Down
2 changes: 1 addition & 1 deletion src/XrdTpc/XrdTpcState.hh
Expand Up @@ -120,7 +120,7 @@ private:
void *userdata);
int Header(const std::string &header);
static size_t WriteCB(void *buffer, size_t size, size_t nitems, void *userdata);
int Write(char *buffer, size_t size);
ssize_t Write(char *buffer, size_t size);
static size_t ReadCB(void *buffer, size_t size, size_t nitems, void *userdata);
int Read(char *buffer, size_t size);

Expand Down
11 changes: 8 additions & 3 deletions src/XrdTpc/XrdTpcStream.cc
Expand Up @@ -56,7 +56,7 @@ Stream::Stat(struct stat* buf)
return m_fh->stat(buf);
}

int
ssize_t
Stream::Write(off_t offset, const char *buf, size_t size, bool force)
{
/*
Expand Down Expand Up @@ -87,6 +87,9 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force)
if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
ss << msg << " (code=" << m_fh->error.getErrInfo() << ")";
m_error_buf = ss.str();
// Write failed; we don't care if there's any more buffer we
// can dump to disk later.
return retval;
}
// If there are no in-use buffers, then we don't need to
// do any accounting.
Expand All @@ -109,9 +112,11 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force)
entry_iter++) {
// Always try to dump from memory; when size == 0, then we are
// going to force a flush even if things are not MB-aligned.
if ((*entry_iter)->Write(*this, size == 0) > 0) {
buffer_was_written = true;
int retval2 = (*entry_iter)->Write(*this, size == 0);
if (retval2 == SFS_ERROR) {
return retval2;
}
buffer_was_written |= retval2 > 0;
if ((*entry_iter)->Available()) { // Empty buffer
if (!avail_entry) {avail_entry = *entry_iter;}
avail_count ++;
Expand Down
18 changes: 15 additions & 3 deletions src/XrdTpc/XrdTpcStream.hh
Expand Up @@ -41,7 +41,17 @@ public:

int Read(off_t offset, char *buffer, size_t size);

int Write(off_t offset, const char *buffer, size_t size, bool force);
// Writes a buffer of a given size to an offset.
// This will often keep the buffer in memory in to present the underlying
// filesystem with a single stream of data (required for HDFS); further,
// it will also buffer to align the writes on a 1MB boundary (required
// for some RADOS configurations). When force is set to true, it will
// skip the buffering and always write (this should only be done at the
// end of a stream!).
//
// Returns the number of bytes written; on error, returns -1 and sets
// the error code and error message for the stream
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force);

size_t AvailableBuffers() const {return m_avail_count;}

Expand Down Expand Up @@ -79,8 +89,10 @@ private:
size_desired -= (size_desired % (1024*1024));
if (!size_desired) {return 0;}
}
int retval = stream.Write(m_offset, &m_buffer[0], size_desired, force);
if (retval < 0 && (static_cast<size_t>(retval) != size_desired)) {
ssize_t retval = stream.Write(m_offset, &m_buffer[0], size_desired, force);
// Currently the only valid negative value is SFS_ERROR (-1); checking for
// all negative values to future-proof the code.
if ((retval < 0) || (static_cast<size_t>(retval) != size_desired)) {
return -1;
}
// If partial data remains, copy it to the beginning of the buffer.
Expand Down

0 comments on commit 9e7d834

Please sign in to comment.