Skip to content

Commit

Permalink
Merge pull request #1391 from bbockelm/xrdtpc_stream_write_refactor
Browse files Browse the repository at this point in the history
XrdTPC: Refactor stream writes
  • Loading branch information
abh3 committed Feb 1, 2021
2 parents a31d667 + 3b40aa1 commit defdc08
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 43 deletions.
73 changes: 54 additions & 19 deletions src/XrdTpc/XrdTpcStream.cc
Expand Up @@ -67,28 +67,25 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force)
m_log.Emsg("Stream::Write", ss.str().c_str());
DumpBuffers();
*/
if (!m_open_for_write) return SFS_ERROR;
bool buffer_accepted = false;
if (!m_open_for_write) {
if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a buffer not opened for write";}
return SFS_ERROR;
}
size_t bytes_accepted = 0;
int retval = size;
if (offset < m_offset) {
if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a prior offset";}
return SFS_ERROR;
}
// If this is write is appending to the stream and
// MB-aligned, then we write it to disk; otherwise, the
// data will be buffered.
if (offset == m_offset && (force || (size && !(size % (1024*1024))))) {
retval = m_fh->write(offset, buf, size);
buffer_accepted = true;
if (retval != SFS_ERROR) {
m_offset += retval;
} else {
std::stringstream ss;
const char *msg = m_fh->error.getErrText();
if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
ss << msg << " (code=" << m_fh->error.getErrInfo() << ")";
m_error_buf = ss.str();
// Write failed; we don't care if there's any more buffer we
// can dump to disk later.
retval = WriteImpl(offset, buf, size);
bytes_accepted = retval;
// On failure, we don't care about flushing buffers from memory --
// the stream is now invalid.
if (retval < 0) {
return retval;
}
// If there are no in-use buffers, then we don't need to
Expand All @@ -114,25 +111,40 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force)
// going to force a flush even if things are not MB-aligned.
int retval2 = (*entry_iter)->Write(*this, size == 0);
if (retval2 == SFS_ERROR) {
if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";}
return retval2;
}
buffer_was_written |= retval2 > 0;
if ((*entry_iter)->Available()) { // Empty buffer
if (!avail_entry) {avail_entry = *entry_iter;}
avail_count ++;
}
else if (!buffer_accepted && (*entry_iter)->Accept(offset, buf, size)) {
buffer_accepted = true;
else if (bytes_accepted != size && size) {
size_t new_accept = (*entry_iter)->Accept(offset + bytes_accepted, buf, size - bytes_accepted);
// Partial accept; buffer should be writable which means we should free it up
// for next iteration
if (new_accept && new_accept != size - bytes_accepted) {
int retval3 = (*entry_iter)->Write(*this, false);
if (retval3 == SFS_ERROR) {
if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";}
return SFS_ERROR;
}
buffer_was_written = true;
}
bytes_accepted += new_accept;
}
}
} while ((avail_count != m_buffers.size()) && buffer_was_written);
m_avail_count = avail_count;

if (!buffer_accepted && size) { // No place for this data in allocated buffers
if (!avail_entry) { // No available buffers to allocate.
if (bytes_accepted != size && size) { // No place for this data in allocated buffers
if (!avail_entry) { // No available buffers to allocate; logic error, should not happen.
DumpBuffers();
m_error_buf = "No empty buffers available to place unordered data.";
return SFS_ERROR;
}
if (!avail_entry->Accept(offset, buf, size)) { // Empty buffer cannot accept?!?
if (avail_entry->Accept(offset + bytes_accepted, buf, size - bytes_accepted) != size - bytes_accepted) { // Empty buffer cannot accept?!?
m_error_buf = "Empty re-ordering buffer was unable to to accept data; internal logic error.";
return SFS_ERROR;
}
m_avail_count --;
Expand All @@ -151,10 +163,33 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force)
}


ssize_t Stream::WriteImpl(off_t offset, const char *buf, size_t size)
{
ssize_t retval;
if (size == 0) {return 0;}
retval = m_fh->write(offset, buf, size);
if (retval != SFS_ERROR) {
m_offset += retval;
} else {
std::stringstream ss;
const char *msg = m_fh->error.getErrText();
if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
ss << msg << " (code=" << m_fh->error.getErrInfo() << ")";
m_error_buf = ss.str();
}
return retval;
}


void
Stream::DumpBuffers() const
{
m_log.Emsg("Stream::DumpBuffers", "Beginning dump of stream buffers.");
{
std::stringstream ss;
ss << "Stream offset: " << m_offset;
m_log.Emsg("Stream::DumpBuffers", ss.str().c_str());
}
size_t idx = 0;
for (std::vector<Entry*>::const_iterator entry_iter = m_buffers.begin();
entry_iter!= m_buffers.end();
Expand Down
43 changes: 20 additions & 23 deletions src/XrdTpc/XrdTpcStream.hh
Expand Up @@ -82,39 +82,34 @@ private:

int Write(Stream &stream, bool force) {
if (Available() || !CanWrite(stream)) {return 0;}
// Currently, only full writes are accepted along megabyte boundaries
// unless the stream forces a flush (i.e., we are at EOF).
size_t size_desired = m_size;
if (!force) {
size_desired -= (size_desired % (1024*1024));
if (!size_desired) {return 0;}
// Only full buffer writes are accepted unless the stream forces a flush
// (i.e., we are at EOF) because the multistream code uses buffer occupancy
// to determine how many streams are currently in-flight. If we do an early
// write, then the buffer will be empty and the multistream code may decide
// to start another request (which we don't have the capacity to serve!).
if (!force && (m_size != m_capacity)) {
return 0;
}
ssize_t retval = stream.Write(m_offset, &m_buffer[0], size_desired, force);
ssize_t retval = stream.WriteImpl(m_offset, &m_buffer[0], m_size);
// Currently the only valid negative value is SFS_ERROR (-1); checking for
// all negative values to future-proof the code.
if ((retval < 0) || (static_cast<size_t>(retval) != size_desired)) {
if ((retval < 0) || (static_cast<size_t>(retval) != m_size)) {
return -1;
}
// If partial data remains, copy it to the beginning of the buffer.
// Otherwise, mark the buffer as available.
if (size_desired < m_size) {
m_offset += size_desired;
m_size -= size_desired;
memcpy(&m_buffer[0], &m_buffer[size_desired], m_size);
} else {
m_offset = -1;
m_size = 0;
}
m_offset = -1;
m_size = 0;
return retval;
}

bool Accept(off_t offset, const char *buf, size_t size) {
size_t Accept(off_t offset, const char *buf, size_t size) {
// Validate acceptance criteria.
if ((m_offset != -1) && (offset != m_offset + static_cast<ssize_t>(m_size))) {
return false;
return 0;
}
if (size > m_capacity - m_size) {
return false;
size_t to_accept = m_capacity - m_size;
if (to_accept == 0) {return 0;}
if (size > to_accept) {
size = to_accept;
}

// Inflate the underlying buffer if needed.
Expand All @@ -129,7 +124,7 @@ private:
if (m_offset == -1) {
m_offset = offset;
}
return true;
return size;
}

void ShrinkIfUnused() {
Expand Down Expand Up @@ -163,6 +158,8 @@ private:
std::vector<char> m_buffer;
};

ssize_t WriteImpl(off_t offset, const char *buffer, size_t size);

bool m_open_for_write;
size_t m_avail_count;
std::unique_ptr<XrdSfsFile> m_fh;
Expand Down
6 changes: 5 additions & 1 deletion src/XrdTpc/XrdTpcTPC.cc
Expand Up @@ -26,6 +26,7 @@ using namespace TPC;
uint64_t TPCHandler::m_monid{0};
int TPCHandler::m_marker_period = 5;
size_t TPCHandler::m_block_size = 16*1024*1024;
size_t TPCHandler::m_small_block_size = 1*1024*1024;
XrdSysMutex TPCHandler::m_monid_mutex;

XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC);
Expand Down Expand Up @@ -356,6 +357,8 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0);
}

//curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 128*1024);

CURLMcode mres;
mres = curl_multi_add_handle(multi_handle, curl);
if (mres) {
Expand Down Expand Up @@ -592,6 +595,7 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req)
logTransferEvent(LogMask::Error, rec, "PUSH_FAIL", msg);
return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0);
}

auto query_header = req.headers.find("xrd-http-fullresource");
std::string redirect_resource = req.resource;
if (query_header != req.headers.end()) {
Expand Down Expand Up @@ -733,7 +737,7 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str());
}
curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
Stream stream(std::move(fh), streams * m_pipelining_multiplier, m_block_size, m_log);
Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log);
State state(0, stream, curl, false);
state.CopyHeaders(req);

Expand Down
1 change: 1 addition & 0 deletions src/XrdTpc/XrdTpcTPC.hh
Expand Up @@ -109,6 +109,7 @@ private:

static int m_marker_period;
static size_t m_block_size;
static size_t m_small_block_size;
bool m_desthttps;
std::string m_cadir;
static XrdSysMutex m_monid_mutex;
Expand Down

0 comments on commit defdc08

Please sign in to comment.