diff --git a/src/XrdTpc/XrdTpcMultistream.cc b/src/XrdTpc/XrdTpcMultistream.cc index 6f7542c13e5..942bb3e92e6 100644 --- a/src/XrdTpc/XrdTpcMultistream.cc +++ b/src/XrdTpc/XrdTpcMultistream.cc @@ -372,7 +372,12 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, ss << "failure: Remote side failed with status code " << state.GetStatusCode(); m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); } else { - ss << "success: Created"; + if (!handles[0]->Finalize()) { + ss << "failure: Failed to finalize and close file handle."; + m_log.Emsg(log_prefix, "Failed to finalize file handle"); + } else { + ss << "success: Created"; + } } if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { diff --git a/src/XrdTpc/XrdTpcState.cc b/src/XrdTpc/XrdTpcState.cc index d9b749d9159..7d7ed5cc0c2 100644 --- a/src/XrdTpc/XrdTpcState.cc +++ b/src/XrdTpc/XrdTpcState.cc @@ -243,3 +243,9 @@ void State::DumpBuffers() const { m_stream->DumpBuffers(); } + +bool State::Finalize() +{ + return m_stream->Finalize(); +} + diff --git a/src/XrdTpc/XrdTpcState.hh b/src/XrdTpc/XrdTpcState.hh index fdbab41255d..d710e5150e4 100644 --- a/src/XrdTpc/XrdTpcState.hh +++ b/src/XrdTpc/XrdTpcState.hh @@ -81,6 +81,15 @@ public: // constructor once C++11 is allowed in XRootD. void Move (State &other); + // Flush and finalize a transfer state. Eventually calls close() on the underlying + // file handle, which should hopefully synchronize the file metadata across + // all readers (even other load-balanced servers on the same distributed file + // system). + // + // Returns true on success; false otherwise. Failures can happen, for example, if + // not all buffers have been reordered by the underlying stream. + bool Finalize(); + private: bool InstallHandlers(CURL *curl); diff --git a/src/XrdTpc/XrdTpcStream.cc b/src/XrdTpc/XrdTpcStream.cc index 0bf52656b3c..c16d7071f4c 100644 --- a/src/XrdTpc/XrdTpcStream.cc +++ b/src/XrdTpc/XrdTpcStream.cc @@ -20,6 +20,26 @@ Stream::~Stream() } +bool +Stream::Finalize() +{ + // Do not close twice + if (!m_open_for_write) { + return false; + } + for (std::vector::iterator buffer_iter = m_buffers.begin(); + buffer_iter != m_buffers.end(); + buffer_iter++) { + delete *buffer_iter; + *buffer_iter = NULL; + } + m_fh->close(); + m_open_for_write = false; + // If there are outstanding buffers to reorder, finalization failed + return m_avail_count == m_buffers.size(); +} + + int Stream::Stat(struct stat* buf) { @@ -29,6 +49,7 @@ Stream::Stat(struct stat* buf) int Stream::Write(off_t offset, const char *buf, size_t size) { + if (!m_open_for_write) return SFS_ERROR; bool buffer_accepted = false; int retval = size; if (offset < m_offset) { diff --git a/src/XrdTpc/XrdTpcStream.hh b/src/XrdTpc/XrdTpcStream.hh index f6c559308ce..ef5688a4817 100644 --- a/src/XrdTpc/XrdTpcStream.hh +++ b/src/XrdTpc/XrdTpcStream.hh @@ -21,7 +21,8 @@ namespace TPC { class Stream { public: Stream(std::unique_ptr fh, size_t max_blocks, size_t buffer_size, XrdSysError &log) - : m_avail_count(max_blocks), + : m_open_for_write(false), + m_avail_count(max_blocks), m_fh(std::move(fh)), m_offset(0), m_log(log) @@ -30,6 +31,7 @@ public: for (size_t idx=0; idx < max_blocks; idx++) { m_buffers.push_back(new Entry(buffer_size)); } + m_open_for_write = true; } ~Stream(); @@ -44,6 +46,15 @@ public: void DumpBuffers() const; + // Flush and finalize the stream. If all data has been sent to the underlying + // file handle, close() will be invoked on the file handle. + // + // Further write operations on this stream will result in an error. + // If any memory buffers remain, an error occurs. + // + // Returns true on success; false otherwise. + bool Finalize(); + private: class Entry { @@ -124,6 +135,7 @@ private: std::vector m_buffer; }; + bool m_open_for_write; size_t m_avail_count; std::unique_ptr m_fh; off_t m_offset;