From 8e84fe4106b777b3351878d8c4af74a431cac2d6 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 30 Jan 2021 09:33:59 -0600 Subject: [PATCH 1/4] XrdTpc: Factor out recursive writes. This separates the writing methods of the stream to avoid potential recursion when dumping buffered data. This also allows the buffer to accept partial data so the beginning offset of the buffer is always a multiple of the buffer size (i.e., the buffer offsets always stay MB-aligned). With this, a single incoming buffer from curl may be split across two entries in order to keep alignment. --- src/XrdTpc/XrdTpcStream.cc | 56 ++++++++++++++++++++++++++------------ src/XrdTpc/XrdTpcStream.hh | 16 +++++++---- 2 files changed, 49 insertions(+), 23 deletions(-) diff --git a/src/XrdTpc/XrdTpcStream.cc b/src/XrdTpc/XrdTpcStream.cc index 2c70d8176e6..fa4ba4395f7 100644 --- a/src/XrdTpc/XrdTpcStream.cc +++ b/src/XrdTpc/XrdTpcStream.cc @@ -68,7 +68,7 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force) DumpBuffers(); */ if (!m_open_for_write) return SFS_ERROR; - bool buffer_accepted = false; + size_t bytes_accepted = 0; int retval = size; if (offset < m_offset) { return SFS_ERROR; @@ -77,18 +77,11 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force) // MB-aligned, then we write it to disk; otherwise, the // data will be buffered. if (offset == m_offset && (force || (size && !(size % (1024*1024))))) { - retval = m_fh->write(offset, buf, size); - buffer_accepted = true; - if (retval != SFS_ERROR) { - m_offset += retval; - } else { - std::stringstream ss; - const char *msg = m_fh->error.getErrText(); - if (!msg || (*msg == '\0')) {msg = "(no error message provided)";} - ss << msg << " (code=" << m_fh->error.getErrInfo() << ")"; - m_error_buf = ss.str(); - // Write failed; we don't care if there's any more buffer we - // can dump to disk later. + retval = WriteImpl(offset, buf, size); + bytes_accepted = retval; + // On failure, we don't care about flushing buffers from memory -- + // the stream is now invalid. + if (retval < 0) { return retval; } // If there are no in-use buffers, then we don't need to @@ -121,18 +114,29 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force) if (!avail_entry) {avail_entry = *entry_iter;} avail_count ++; } - else if (!buffer_accepted && (*entry_iter)->Accept(offset, buf, size)) { - buffer_accepted = true; + else if (bytes_accepted != size && size) { + size_t new_accept = (*entry_iter)->Accept(offset + bytes_accepted, buf, size - bytes_accepted); + // Partial accept; buffer should be writable which means we should free it up + // for next iteration + if (new_accept && new_accept != size - bytes_accepted) { + int retval3 = (*entry_iter)->Write(*this, false); + if (retval3 == SFS_ERROR) { + if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";} + return SFS_ERROR; + } + buffer_was_written = true; + } + bytes_accepted += new_accept; } } } while ((avail_count != m_buffers.size()) && buffer_was_written); m_avail_count = avail_count; - if (!buffer_accepted && size) { // No place for this data in allocated buffers + if (bytes_accepted != size && size) { // No place for this data in allocated buffers if (!avail_entry) { // No available buffers to allocate. return SFS_ERROR; } - if (!avail_entry->Accept(offset, buf, size)) { // Empty buffer cannot accept?!? + if (avail_entry->Accept(offset + bytes_accepted, buf, size - bytes_accepted) != size - bytes_accepted) { // Empty buffer cannot accept?!? return SFS_ERROR; } m_avail_count --; @@ -151,6 +155,24 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force) } +ssize_t Stream::WriteImpl(off_t offset, const char *buf, size_t size) +{ + ssize_t retval; + if (size == 0) {return 0;} + retval = m_fh->write(offset, buf, size); + if (retval != SFS_ERROR) { + m_offset += retval; + } else { + std::stringstream ss; + const char *msg = m_fh->error.getErrText(); + if (!msg || (*msg == '\0')) {msg = "(no error message provided)";} + ss << msg << " (code=" << m_fh->error.getErrInfo() << ")"; + m_error_buf = ss.str(); + } + return retval; +} + + void Stream::DumpBuffers() const { diff --git a/src/XrdTpc/XrdTpcStream.hh b/src/XrdTpc/XrdTpcStream.hh index f30d9ee5d44..29851a99efc 100644 --- a/src/XrdTpc/XrdTpcStream.hh +++ b/src/XrdTpc/XrdTpcStream.hh @@ -89,7 +89,7 @@ private: size_desired -= (size_desired % (1024*1024)); if (!size_desired) {return 0;} } - ssize_t retval = stream.Write(m_offset, &m_buffer[0], size_desired, force); + ssize_t retval = stream.WriteImpl(m_offset, &m_buffer[0], m_size); // Currently the only valid negative value is SFS_ERROR (-1); checking for // all negative values to future-proof the code. if ((retval < 0) || (static_cast(retval) != size_desired)) { @@ -108,13 +108,15 @@ private: return retval; } - bool Accept(off_t offset, const char *buf, size_t size) { + size_t Accept(off_t offset, const char *buf, size_t size) { // Validate acceptance criteria. if ((m_offset != -1) && (offset != m_offset + static_cast(m_size))) { - return false; + return 0; } - if (size > m_capacity - m_size) { - return false; + size_t to_accept = m_capacity - m_size; + if (to_accept == 0) {return 0;} + if (size > to_accept) { + size = to_accept; } // Inflate the underlying buffer if needed. @@ -129,7 +131,7 @@ private: if (m_offset == -1) { m_offset = offset; } - return true; + return size; } void ShrinkIfUnused() { @@ -163,6 +165,8 @@ private: std::vector m_buffer; }; + ssize_t WriteImpl(off_t offset, const char *buffer, size_t size); + bool m_open_for_write; size_t m_avail_count; std::unique_ptr m_fh; From f8970471e06da3e1dee53b1f62df54ca001deed5 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 30 Jan 2021 09:40:01 -0600 Subject: [PATCH 2/4] XrdTpc: Do not allow partial buffer writes The multistream code uses an occupied buffer as an indication that a request is ongoing; for now, we can't do partial buffer writes as this causes the back-off code to think the buffer is free and start too many requests. --- src/XrdTpc/XrdTpcStream.hh | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/src/XrdTpc/XrdTpcStream.hh b/src/XrdTpc/XrdTpcStream.hh index 29851a99efc..0e854334d53 100644 --- a/src/XrdTpc/XrdTpcStream.hh +++ b/src/XrdTpc/XrdTpcStream.hh @@ -82,29 +82,22 @@ private: int Write(Stream &stream, bool force) { if (Available() || !CanWrite(stream)) {return 0;} - // Currently, only full writes are accepted along megabyte boundaries - // unless the stream forces a flush (i.e., we are at EOF). - size_t size_desired = m_size; - if (!force) { - size_desired -= (size_desired % (1024*1024)); - if (!size_desired) {return 0;} + // Only full buffer writes are accepted unless the stream forces a flush + // (i.e., we are at EOF) because the multistream code uses buffer occupancy + // to determine how many streams are currently in-flight. If we do an early + // write, then the buffer will be empty and the multistream code may decide + // to start another request (which we don't have the capacity to serve!). + if (!force && (m_size != m_capacity)) { + return 0; } ssize_t retval = stream.WriteImpl(m_offset, &m_buffer[0], m_size); // Currently the only valid negative value is SFS_ERROR (-1); checking for // all negative values to future-proof the code. - if ((retval < 0) || (static_cast(retval) != size_desired)) { + if ((retval < 0) || (static_cast(retval) != m_size)) { return -1; } - // If partial data remains, copy it to the beginning of the buffer. - // Otherwise, mark the buffer as available. - if (size_desired < m_size) { - m_offset += size_desired; - m_size -= size_desired; - memcpy(&m_buffer[0], &m_buffer[size_desired], m_size); - } else { - m_offset = -1; - m_size = 0; - } + m_offset = -1; + m_size = 0; return retval; } From 3d28dd50dc1e467d3fbb83d77a2144b22fa4c74a Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 30 Jan 2021 09:42:38 -0600 Subject: [PATCH 3/4] XrdTpc: Adjust buffer size based on number of streams. As we only write when buffers are full and we don't need a full 16MB buffer when we are doing single-streams (for single stream, we're only waiting to accumulate a "large-enough" MB-aligned buffer), we can simply decrease the buffer size. Saves some allocated-but-unused space. --- src/XrdTpc/XrdTpcTPC.cc | 6 +++++- src/XrdTpc/XrdTpcTPC.hh | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/XrdTpc/XrdTpcTPC.cc b/src/XrdTpc/XrdTpcTPC.cc index 9fd3b9c64fd..7c6095214bc 100644 --- a/src/XrdTpc/XrdTpcTPC.cc +++ b/src/XrdTpc/XrdTpcTPC.cc @@ -26,6 +26,7 @@ using namespace TPC; uint64_t TPCHandler::m_monid{0}; int TPCHandler::m_marker_period = 5; size_t TPCHandler::m_block_size = 16*1024*1024; +size_t TPCHandler::m_small_block_size = 1*1024*1024; XrdSysMutex TPCHandler::m_monid_mutex; XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC); @@ -356,6 +357,8 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } + //curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 128*1024); + CURLMcode mres; mres = curl_multi_add_handle(multi_handle, curl); if (mres) { @@ -592,6 +595,7 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) logTransferEvent(LogMask::Error, rec, "PUSH_FAIL", msg); return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } + auto query_header = req.headers.find("xrd-http-fullresource"); std::string redirect_resource = req.resource; if (query_header != req.headers.end()) { @@ -733,7 +737,7 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str()); } curl_easy_setopt(curl, CURLOPT_URL, resource.c_str()); - Stream stream(std::move(fh), streams * m_pipelining_multiplier, m_block_size, m_log); + Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log); State state(0, stream, curl, false); state.CopyHeaders(req); diff --git a/src/XrdTpc/XrdTpcTPC.hh b/src/XrdTpc/XrdTpcTPC.hh index ca34d2e6791..e0add7d243b 100644 --- a/src/XrdTpc/XrdTpcTPC.hh +++ b/src/XrdTpc/XrdTpcTPC.hh @@ -109,6 +109,7 @@ private: static int m_marker_period; static size_t m_block_size; + static size_t m_small_block_size; bool m_desthttps; std::string m_cadir; static XrdSysMutex m_monid_mutex; From 3b40aa1163db5044be7b38f5a76dac03610e557f Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 30 Jan 2021 09:48:48 -0600 Subject: [PATCH 4/4] XrdTpc: Always populate error buffer with messages. Even when there's an internal logic error, make sure the Write method populates the error message - if nothing else, will be useful for debugging. --- src/XrdTpc/XrdTpcStream.cc | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/XrdTpc/XrdTpcStream.cc b/src/XrdTpc/XrdTpcStream.cc index fa4ba4395f7..861e8ebf55e 100644 --- a/src/XrdTpc/XrdTpcStream.cc +++ b/src/XrdTpc/XrdTpcStream.cc @@ -67,10 +67,14 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force) m_log.Emsg("Stream::Write", ss.str().c_str()); DumpBuffers(); */ - if (!m_open_for_write) return SFS_ERROR; + if (!m_open_for_write) { + if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a buffer not opened for write";} + return SFS_ERROR; + } size_t bytes_accepted = 0; int retval = size; if (offset < m_offset) { + if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a prior offset";} return SFS_ERROR; } // If this is write is appending to the stream and @@ -107,6 +111,7 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force) // going to force a flush even if things are not MB-aligned. int retval2 = (*entry_iter)->Write(*this, size == 0); if (retval2 == SFS_ERROR) { + if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";} return retval2; } buffer_was_written |= retval2 > 0; @@ -133,10 +138,13 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force) m_avail_count = avail_count; if (bytes_accepted != size && size) { // No place for this data in allocated buffers - if (!avail_entry) { // No available buffers to allocate. + if (!avail_entry) { // No available buffers to allocate; logic error, should not happen. + DumpBuffers(); + m_error_buf = "No empty buffers available to place unordered data."; return SFS_ERROR; } if (avail_entry->Accept(offset + bytes_accepted, buf, size - bytes_accepted) != size - bytes_accepted) { // Empty buffer cannot accept?!? + m_error_buf = "Empty re-ordering buffer was unable to to accept data; internal logic error."; return SFS_ERROR; } m_avail_count --; @@ -177,6 +185,11 @@ void Stream::DumpBuffers() const { m_log.Emsg("Stream::DumpBuffers", "Beginning dump of stream buffers."); + { + std::stringstream ss; + ss << "Stream offset: " << m_offset; + m_log.Emsg("Stream::DumpBuffers", ss.str().c_str()); + } size_t idx = 0; for (std::vector::const_iterator entry_iter = m_buffers.begin(); entry_iter!= m_buffers.end();