diff --git a/src/XrdTpc/XrdTpcStream.cc b/src/XrdTpc/XrdTpcStream.cc index 2c70d8176e6..861e8ebf55e 100644 --- a/src/XrdTpc/XrdTpcStream.cc +++ b/src/XrdTpc/XrdTpcStream.cc @@ -67,28 +67,25 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force) m_log.Emsg("Stream::Write", ss.str().c_str()); DumpBuffers(); */ - if (!m_open_for_write) return SFS_ERROR; - bool buffer_accepted = false; + if (!m_open_for_write) { + if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a buffer not opened for write";} + return SFS_ERROR; + } + size_t bytes_accepted = 0; int retval = size; if (offset < m_offset) { + if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a prior offset";} return SFS_ERROR; } // If this is write is appending to the stream and // MB-aligned, then we write it to disk; otherwise, the // data will be buffered. if (offset == m_offset && (force || (size && !(size % (1024*1024))))) { - retval = m_fh->write(offset, buf, size); - buffer_accepted = true; - if (retval != SFS_ERROR) { - m_offset += retval; - } else { - std::stringstream ss; - const char *msg = m_fh->error.getErrText(); - if (!msg || (*msg == '\0')) {msg = "(no error message provided)";} - ss << msg << " (code=" << m_fh->error.getErrInfo() << ")"; - m_error_buf = ss.str(); - // Write failed; we don't care if there's any more buffer we - // can dump to disk later. + retval = WriteImpl(offset, buf, size); + bytes_accepted = retval; + // On failure, we don't care about flushing buffers from memory -- + // the stream is now invalid. + if (retval < 0) { return retval; } // If there are no in-use buffers, then we don't need to @@ -114,6 +111,7 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force) // going to force a flush even if things are not MB-aligned. int retval2 = (*entry_iter)->Write(*this, size == 0); if (retval2 == SFS_ERROR) { + if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";} return retval2; } buffer_was_written |= retval2 > 0; @@ -121,18 +119,32 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force) if (!avail_entry) {avail_entry = *entry_iter;} avail_count ++; } - else if (!buffer_accepted && (*entry_iter)->Accept(offset, buf, size)) { - buffer_accepted = true; + else if (bytes_accepted != size && size) { + size_t new_accept = (*entry_iter)->Accept(offset + bytes_accepted, buf, size - bytes_accepted); + // Partial accept; buffer should be writable which means we should free it up + // for next iteration + if (new_accept && new_accept != size - bytes_accepted) { + int retval3 = (*entry_iter)->Write(*this, false); + if (retval3 == SFS_ERROR) { + if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";} + return SFS_ERROR; + } + buffer_was_written = true; + } + bytes_accepted += new_accept; } } } while ((avail_count != m_buffers.size()) && buffer_was_written); m_avail_count = avail_count; - if (!buffer_accepted && size) { // No place for this data in allocated buffers - if (!avail_entry) { // No available buffers to allocate. + if (bytes_accepted != size && size) { // No place for this data in allocated buffers + if (!avail_entry) { // No available buffers to allocate; logic error, should not happen. + DumpBuffers(); + m_error_buf = "No empty buffers available to place unordered data."; return SFS_ERROR; } - if (!avail_entry->Accept(offset, buf, size)) { // Empty buffer cannot accept?!? + if (avail_entry->Accept(offset + bytes_accepted, buf, size - bytes_accepted) != size - bytes_accepted) { // Empty buffer cannot accept?!? + m_error_buf = "Empty re-ordering buffer was unable to to accept data; internal logic error."; return SFS_ERROR; } m_avail_count --; @@ -151,10 +163,33 @@ Stream::Write(off_t offset, const char *buf, size_t size, bool force) } +ssize_t Stream::WriteImpl(off_t offset, const char *buf, size_t size) +{ + ssize_t retval; + if (size == 0) {return 0;} + retval = m_fh->write(offset, buf, size); + if (retval != SFS_ERROR) { + m_offset += retval; + } else { + std::stringstream ss; + const char *msg = m_fh->error.getErrText(); + if (!msg || (*msg == '\0')) {msg = "(no error message provided)";} + ss << msg << " (code=" << m_fh->error.getErrInfo() << ")"; + m_error_buf = ss.str(); + } + return retval; +} + + void Stream::DumpBuffers() const { m_log.Emsg("Stream::DumpBuffers", "Beginning dump of stream buffers."); + { + std::stringstream ss; + ss << "Stream offset: " << m_offset; + m_log.Emsg("Stream::DumpBuffers", ss.str().c_str()); + } size_t idx = 0; for (std::vector::const_iterator entry_iter = m_buffers.begin(); entry_iter!= m_buffers.end(); diff --git a/src/XrdTpc/XrdTpcStream.hh b/src/XrdTpc/XrdTpcStream.hh index f30d9ee5d44..0e854334d53 100644 --- a/src/XrdTpc/XrdTpcStream.hh +++ b/src/XrdTpc/XrdTpcStream.hh @@ -82,39 +82,34 @@ private: int Write(Stream &stream, bool force) { if (Available() || !CanWrite(stream)) {return 0;} - // Currently, only full writes are accepted along megabyte boundaries - // unless the stream forces a flush (i.e., we are at EOF). - size_t size_desired = m_size; - if (!force) { - size_desired -= (size_desired % (1024*1024)); - if (!size_desired) {return 0;} + // Only full buffer writes are accepted unless the stream forces a flush + // (i.e., we are at EOF) because the multistream code uses buffer occupancy + // to determine how many streams are currently in-flight. If we do an early + // write, then the buffer will be empty and the multistream code may decide + // to start another request (which we don't have the capacity to serve!). + if (!force && (m_size != m_capacity)) { + return 0; } - ssize_t retval = stream.Write(m_offset, &m_buffer[0], size_desired, force); + ssize_t retval = stream.WriteImpl(m_offset, &m_buffer[0], m_size); // Currently the only valid negative value is SFS_ERROR (-1); checking for // all negative values to future-proof the code. - if ((retval < 0) || (static_cast(retval) != size_desired)) { + if ((retval < 0) || (static_cast(retval) != m_size)) { return -1; } - // If partial data remains, copy it to the beginning of the buffer. - // Otherwise, mark the buffer as available. - if (size_desired < m_size) { - m_offset += size_desired; - m_size -= size_desired; - memcpy(&m_buffer[0], &m_buffer[size_desired], m_size); - } else { - m_offset = -1; - m_size = 0; - } + m_offset = -1; + m_size = 0; return retval; } - bool Accept(off_t offset, const char *buf, size_t size) { + size_t Accept(off_t offset, const char *buf, size_t size) { // Validate acceptance criteria. if ((m_offset != -1) && (offset != m_offset + static_cast(m_size))) { - return false; + return 0; } - if (size > m_capacity - m_size) { - return false; + size_t to_accept = m_capacity - m_size; + if (to_accept == 0) {return 0;} + if (size > to_accept) { + size = to_accept; } // Inflate the underlying buffer if needed. @@ -129,7 +124,7 @@ private: if (m_offset == -1) { m_offset = offset; } - return true; + return size; } void ShrinkIfUnused() { @@ -163,6 +158,8 @@ private: std::vector m_buffer; }; + ssize_t WriteImpl(off_t offset, const char *buffer, size_t size); + bool m_open_for_write; size_t m_avail_count; std::unique_ptr m_fh; diff --git a/src/XrdTpc/XrdTpcTPC.cc b/src/XrdTpc/XrdTpcTPC.cc index 9fd3b9c64fd..7c6095214bc 100644 --- a/src/XrdTpc/XrdTpcTPC.cc +++ b/src/XrdTpc/XrdTpcTPC.cc @@ -26,6 +26,7 @@ using namespace TPC; uint64_t TPCHandler::m_monid{0}; int TPCHandler::m_marker_period = 5; size_t TPCHandler::m_block_size = 16*1024*1024; +size_t TPCHandler::m_small_block_size = 1*1024*1024; XrdSysMutex TPCHandler::m_monid_mutex; XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC); @@ -356,6 +357,8 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } + //curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 128*1024); + CURLMcode mres; mres = curl_multi_add_handle(multi_handle, curl); if (mres) { @@ -592,6 +595,7 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) logTransferEvent(LogMask::Error, rec, "PUSH_FAIL", msg); return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); } + auto query_header = req.headers.find("xrd-http-fullresource"); std::string redirect_resource = req.resource; if (query_header != req.headers.end()) { @@ -733,7 +737,7 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str()); } curl_easy_setopt(curl, CURLOPT_URL, resource.c_str()); - Stream stream(std::move(fh), streams * m_pipelining_multiplier, m_block_size, m_log); + Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log); State state(0, stream, curl, false); state.CopyHeaders(req); diff --git a/src/XrdTpc/XrdTpcTPC.hh b/src/XrdTpc/XrdTpcTPC.hh index ca34d2e6791..e0add7d243b 100644 --- a/src/XrdTpc/XrdTpcTPC.hh +++ b/src/XrdTpc/XrdTpcTPC.hh @@ -109,6 +109,7 @@ private: static int m_marker_period; static size_t m_block_size; + static size_t m_small_block_size; bool m_desthttps; std::string m_cadir; static XrdSysMutex m_monid_mutex;