Skip to content

Commit

Permalink
XrdTpc: Factor out recursive writes.
Browse files Browse the repository at this point in the history
This separates the writing methods of the stream to avoid potential
recursion when dumping buffered data.  This also allows the buffer
to accept partial data so the beginning offset of the buffer is
always a multiple of the buffer size (i.e., the buffer offsets
always stay MB-aligned).

With this, a single incoming buffer from curl may be split across
two entries in order to keep alignment.
  • Loading branch information
bbockelm committed Jan 30, 2021
1 parent 9e7d834 commit 8e84fe4
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 23 deletions.
56 changes: 39 additions & 17 deletions src/XrdTpc/XrdTpcStream.cc
Expand Up @@ -68,7 +68,7 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force)
DumpBuffers();
*/
if (!m_open_for_write) return SFS_ERROR;
bool buffer_accepted = false;
size_t bytes_accepted = 0;
int retval = size;
if (offset < m_offset) {
return SFS_ERROR;
Expand All @@ -77,18 +77,11 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force)
// MB-aligned, then we write it to disk; otherwise, the
// data will be buffered.
if (offset == m_offset && (force || (size && !(size % (1024*1024))))) {
retval = m_fh->write(offset, buf, size);
buffer_accepted = true;
if (retval != SFS_ERROR) {
m_offset += retval;
} else {
std::stringstream ss;
const char *msg = m_fh->error.getErrText();
if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
ss << msg << " (code=" << m_fh->error.getErrInfo() << ")";
m_error_buf = ss.str();
// Write failed; we don't care if there's any more buffer we
// can dump to disk later.
retval = WriteImpl(offset, buf, size);
bytes_accepted = retval;
// On failure, we don't care about flushing buffers from memory --
// the stream is now invalid.
if (retval < 0) {
return retval;
}
// If there are no in-use buffers, then we don't need to
Expand Down Expand Up @@ -121,18 +114,29 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force)
if (!avail_entry) {avail_entry = *entry_iter;}
avail_count ++;
}
else if (!buffer_accepted && (*entry_iter)->Accept(offset, buf, size)) {
buffer_accepted = true;
else if (bytes_accepted != size && size) {
size_t new_accept = (*entry_iter)->Accept(offset + bytes_accepted, buf, size - bytes_accepted);
// Partial accept; buffer should be writable which means we should free it up
// for next iteration
if (new_accept && new_accept != size - bytes_accepted) {
int retval3 = (*entry_iter)->Write(*this, false);
if (retval3 == SFS_ERROR) {
if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";}
return SFS_ERROR;
}
buffer_was_written = true;
}
bytes_accepted += new_accept;
}
}
} while ((avail_count != m_buffers.size()) && buffer_was_written);
m_avail_count = avail_count;

if (!buffer_accepted && size) { // No place for this data in allocated buffers
if (bytes_accepted != size && size) { // No place for this data in allocated buffers
if (!avail_entry) { // No available buffers to allocate.
return SFS_ERROR;
}
if (!avail_entry->Accept(offset, buf, size)) { // Empty buffer cannot accept?!?
if (avail_entry->Accept(offset + bytes_accepted, buf, size - bytes_accepted) != size - bytes_accepted) { // Empty buffer cannot accept?!?
return SFS_ERROR;
}
m_avail_count --;
Expand All @@ -151,6 +155,24 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force)
}


ssize_t Stream::WriteImpl(off_t offset, const char *buf, size_t size)
{
ssize_t retval;
if (size == 0) {return 0;}
retval = m_fh->write(offset, buf, size);
if (retval != SFS_ERROR) {
m_offset += retval;
} else {
std::stringstream ss;
const char *msg = m_fh->error.getErrText();
if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
ss << msg << " (code=" << m_fh->error.getErrInfo() << ")";
m_error_buf = ss.str();
}
return retval;
}


void
Stream::DumpBuffers() const
{
Expand Down
16 changes: 10 additions & 6 deletions src/XrdTpc/XrdTpcStream.hh
Expand Up @@ -89,7 +89,7 @@ private:
size_desired -= (size_desired % (1024*1024));
if (!size_desired) {return 0;}
}
ssize_t retval = stream.Write(m_offset, &m_buffer[0], size_desired, force);
ssize_t retval = stream.WriteImpl(m_offset, &m_buffer[0], m_size);
// Currently the only valid negative value is SFS_ERROR (-1); checking for
// all negative values to future-proof the code.
if ((retval < 0) || (static_cast<size_t>(retval) != size_desired)) {
Expand All @@ -108,13 +108,15 @@ private:
return retval;
}

bool Accept(off_t offset, const char *buf, size_t size) {
size_t Accept(off_t offset, const char *buf, size_t size) {
// Validate acceptance criteria.
if ((m_offset != -1) && (offset != m_offset + static_cast<ssize_t>(m_size))) {
return false;
return 0;
}
if (size > m_capacity - m_size) {
return false;
size_t to_accept = m_capacity - m_size;
if (to_accept == 0) {return 0;}
if (size > to_accept) {
size = to_accept;
}

// Inflate the underlying buffer if needed.
Expand All @@ -129,7 +131,7 @@ private:
if (m_offset == -1) {
m_offset = offset;
}
return true;
return size;
}

void ShrinkIfUnused() {
Expand Down Expand Up @@ -163,6 +165,8 @@ private:
std::vector<char> m_buffer;
};

ssize_t WriteImpl(off_t offset, const char *buffer, size_t size);

bool m_open_for_write;
size_t m_avail_count;
std::unique_ptr<XrdSfsFile> m_fh;
Expand Down

0 comments on commit 8e84fe4

Please sign in to comment.