Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TPC] Ensure file is closed prior to sending response to client #891

Merged
merged 2 commits into from
Jan 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/XrdTpc/XrdTpcMultistream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,12 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
ss << "failure: Remote side failed with status code " << state.GetStatusCode();
m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str());
} else {
ss << "success: Created";
if (!handles[0]->Finalize()) {
ss << "failure: Failed to finalize and close file handle.";
m_log.Emsg(log_prefix, "Failed to finalize file handle");
} else {
ss << "success: Created";
}
}

if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
Expand Down
6 changes: 6 additions & 0 deletions src/XrdTpc/XrdTpcState.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,9 @@ void State::DumpBuffers() const
{
m_stream->DumpBuffers();
}

bool State::Finalize()
{
return m_stream->Finalize();
}

9 changes: 9 additions & 0 deletions src/XrdTpc/XrdTpcState.hh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ public:
// constructor once C++11 is allowed in XRootD.
void Move (State &other);

// Flush and finalize a transfer state. Eventually calls close() on the underlying
// file handle, which should hopefully synchronize the file metadata across
// all readers (even other load-balanced servers on the same distributed file
// system).
//
// Returns true on success; false otherwise. Failures can happen, for example, if
// not all buffers have been reordered by the underlying stream.
bool Finalize();

private:
bool InstallHandlers(CURL *curl);

Expand Down
21 changes: 21 additions & 0 deletions src/XrdTpc/XrdTpcStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,26 @@ Stream::~Stream()
}


bool
Stream::Finalize()
{
// Do not close twice
if (!m_open_for_write) {
return false;
}
for (std::vector<Entry*>::iterator buffer_iter = m_buffers.begin();
buffer_iter != m_buffers.end();
buffer_iter++) {
delete *buffer_iter;
*buffer_iter = NULL;
}
m_fh->close();
m_open_for_write = false;
// If there are outstanding buffers to reorder, finalization failed
return m_avail_count == m_buffers.size();
}


int
Stream::Stat(struct stat* buf)
{
Expand All @@ -29,6 +49,7 @@ Stream::Stat(struct stat* buf)
int
Stream::Write(off_t offset, const char *buf, size_t size)
{
if (!m_open_for_write) return SFS_ERROR;
bool buffer_accepted = false;
int retval = size;
if (offset < m_offset) {
Expand Down
14 changes: 13 additions & 1 deletion src/XrdTpc/XrdTpcStream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace TPC {
class Stream {
public:
Stream(std::unique_ptr<XrdSfsFile> fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
: m_avail_count(max_blocks),
: m_open_for_write(false),
m_avail_count(max_blocks),
m_fh(std::move(fh)),
m_offset(0),
m_log(log)
Expand All @@ -30,6 +31,7 @@ public:
for (size_t idx=0; idx < max_blocks; idx++) {
m_buffers.push_back(new Entry(buffer_size));
}
m_open_for_write = true;
}

~Stream();
Expand All @@ -44,6 +46,15 @@ public:

void DumpBuffers() const;

// Flush and finalize the stream. If all data has been sent to the underlying
// file handle, close() will be invoked on the file handle.
//
// Further write operations on this stream will result in an error.
// If any memory buffers remain, an error occurs.
//
// Returns true on success; false otherwise.
bool Finalize();

private:

class Entry {
Expand Down Expand Up @@ -124,6 +135,7 @@ private:
std::vector<char> m_buffer;
};

bool m_open_for_write;
size_t m_avail_count;
std::unique_ptr<XrdSfsFile> m_fh;
off_t m_offset;
Expand Down