Skip to content

Commit

Permalink
[XrdCl] Implement checkpointed WriteV.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Jun 2, 2021
1 parent e668d16 commit 6fdc6a5
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 11 deletions.
62 changes: 61 additions & 1 deletion src/XrdCl/XrdClCheckpointOperation.hh
Expand Up @@ -100,7 +100,7 @@ namespace XrdCl
//------------------------------------------------------------------------
std::string ToString()
{
return "Checkpoint";
return "ChkptWrt";
}

protected:
Expand Down Expand Up @@ -133,6 +133,66 @@ namespace XrdCl
return ChkptWrtImpl<false>( std::move( file ), std::move( offset ),
std::move( size ), std::move( buffer ) ).Timeout( timeout );
}


//----------------------------------------------------------------------------
//! Checkpointed WriteV operation (@see FileOperation)
//----------------------------------------------------------------------------
template<bool HasHndl>
class ChkptWrtVImpl: public FileOperation<ChkptWrtVImpl, HasHndl, Resp<void>,
Arg<uint64_t>, Arg<const struct iovec*>, Arg<int>>
{
public:

//------------------------------------------------------------------------
//! Inherit constructors from FileOperation (@see FileOperation)
//------------------------------------------------------------------------
using FileOperation<ChkptWrtVImpl, HasHndl, Resp<void>,
Arg<uint64_t>, Arg<const struct iovec*>, Arg<int>>::FileOperation;

//------------------------------------------------------------------------
//! Argument indexes in the args tuple
//------------------------------------------------------------------------
enum { OffArg, IovecArg, IovcntArg };

//------------------------------------------------------------------------
//! @return : name of the operation (@see Operation)
//------------------------------------------------------------------------
std::string ToString()
{
return "ChkptWrtV";
}

protected:

//------------------------------------------------------------------------
//! RunImpl operation (@see Operation)
//!
//! @param params : container with parameters forwarded from
//! previous operation
//! @return : status of the operation
//------------------------------------------------------------------------
XRootDStatus RunImpl( PipelineHandler *handler, uint16_t pipelineTimeout )
{
uint64_t off = std::get<OffArg>( this->args ).Get();
const struct iovec* iov = std::get<IovecArg>( this->args ).Get();
int iovcnt = std::get<IovcntArg>( this->args ).Get();
uint16_t timeout = pipelineTimeout < this->timeout ?
pipelineTimeout : this->timeout;
return this->file->ChkptWrtV( off, iov, iovcnt, handler, timeout );
}
};

//----------------------------------------------------------------------------
//! Factory for creating ReadImpl objects
//----------------------------------------------------------------------------
inline ChkptWrtVImpl<false> ChkptWrtV( Ctx<File> file, Arg<uint64_t> offset,
Arg<const struct iovec*> iov, Arg<int> iovcnt,
uint16_t timeout = 0 )
{
return ChkptWrtVImpl<false>( std::move( file ), std::move( offset ),
std::move( iov ), std::move( iovcnt ) ).Timeout( timeout );
}
}

#endif /* SRC_XRDCL_XRDCLCHECKPOINTOPERATION_HH_ */
23 changes: 15 additions & 8 deletions src/XrdCl/XrdClFile.cc
Expand Up @@ -774,14 +774,6 @@ namespace XrdCl

//------------------------------------------------------------------------
//! Checkpointed write - async
//!
//! @param offset offset from the beginning of the file
//! @param size number of bytes to be written
//! @param buffer a pointer to the buffer holding the data to be written
//! @param handler handler to be notified when the response arrives
//! @param timeout timeout value, if 0 the environment default will be
//! used
//! @return status of the operation
//------------------------------------------------------------------------
XRootDStatus File::ChkptWrt( uint64_t offset,
uint32_t size,
Expand All @@ -795,6 +787,21 @@ namespace XrdCl
return pStateHandler->ChkptWrt( offset, size, buffer, handler, timeout );
}

//------------------------------------------------------------------------
//! Checkpointed WriteV - async
//------------------------------------------------------------------------
XRootDStatus File::ChkptWrtV( uint64_t offset,
const struct iovec *iov,
int iovcnt,
ResponseHandler *handler,
uint16_t timeout )
{
if( pPlugIn )
return XRootDStatus( stError, errNotSupported );

return pStateHandler->ChkptWrtV( offset, iov, iovcnt, handler, timeout );
}

//------------------------------------------------------------------------
// Try different data server
//------------------------------------------------------------------------
Expand Down
20 changes: 20 additions & 0 deletions src/XrdCl/XrdClFile.hh
Expand Up @@ -788,6 +788,9 @@ namespace XrdCl
template<bool HasHndl>
friend class ChkptWrtImpl;

template <bool HasHndl>
friend class ChkptWrtVImpl;

//------------------------------------------------------------------------
//! Create a checkpoint - async
//!
Expand Down Expand Up @@ -820,6 +823,23 @@ namespace XrdCl
ResponseHandler *handler,
uint16_t timeout = 0 );

//------------------------------------------------------------------------
//! Checkpointed WriteV - async
//!
//! @param offset offset from the beginning of the file
//! @param iov list of the buffers to be written
//! @param iovcnt number of buffers
//! @param handler handler to be notified when the response arrives
//! @param timeout timeout value, if 0 then the environment default
//! will be used
//! @return status of the operation
//------------------------------------------------------------------------
XRootDStatus ChkptWrtV( uint64_t offset,
const struct iovec *iov,
int iovcnt,
ResponseHandler *handler,
uint16_t timeout = 0 );

FileStateHandler *pStateHandler;
FilePlugIn *pPlugIn;
bool pEnablePlugIns;
Expand Down
70 changes: 68 additions & 2 deletions src/XrdCl/XrdClFileStateHandler.cc
Expand Up @@ -1624,8 +1624,6 @@ namespace XrdCl
req->dlen = size;
memcpy( req->fhandle, pFileHandle, 4 );



MessageSendParams params;
params.timeout = timeout;
params.followRedirects = false;
Expand Down Expand Up @@ -1987,6 +1985,74 @@ namespace XrdCl
return SendOrQueue( *pDataServer, msg, stHandler, params );
}

//------------------------------------------------------------------------
//! Write scattered buffers in one operation - async
//!
//! @param offset offset from the beginning of the file
//! @param iov list of the buffers to be written
//! @param iovcnt number of buffers
//! @param handler handler to be notified when the response arrives
//! @param timeout timeout value, if 0 then the environment default
//! will be used
//! @return status of the operation
//------------------------------------------------------------------------
XRootDStatus FileStateHandler::ChkptWrtV( uint64_t offset,
const struct iovec *iov,
int iovcnt,
ResponseHandler *handler,
uint16_t timeout )
{
XrdSysMutexHelper scopedLock( pMutex );

if( pFileState == Error ) return pStatus;

if( pFileState != Opened && pFileState != Recovering )
return XRootDStatus( stError, errInvalidOp );

Log *log = DefaultEnv::GetLog();
log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
"%s", this, pFileUrl->GetURL().c_str(),
*((uint32_t*)pFileHandle), pDataServer->GetHostId().c_str() );

Message *msg;
ClientChkPointRequest *req;
MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );

req->requestid = kXR_chkpoint;
req->opcode = kXR_ckpXeq;
req->dlen = 24; // as specified in the protocol specification
memcpy( req->fhandle, pFileHandle, 4 );

ChunkList *list = new ChunkList();
uint32_t size = 0;
for( int i = 0; i < iovcnt; ++i )
{
if( iov[i].iov_len == 0 ) continue;
size += iov[i].iov_len;
list->push_back( ChunkInfo( 0, iov[i].iov_len,
(char*)iov[i].iov_base ) );
}

ClientWriteRequest *wrtreq = (ClientWriteRequest*)msg->GetBuffer( sizeof(ClientChkPointRequest) );
wrtreq->requestid = kXR_write;
wrtreq->offset = offset;
wrtreq->dlen = size;
memcpy( wrtreq->fhandle, pFileHandle, 4 );

MessageSendParams params;
params.timeout = timeout;
params.followRedirects = false;
params.stateful = true;
params.chunkList = list;

MessageUtils::ProcessSendParams( params );

XRootDTransport::SetDescription( msg );
StatefulHandler *stHandler = new StatefulHandler( this, handler, msg, params );

return SendOrQueue( *pDataServer, msg, stHandler, params );
}

//----------------------------------------------------------------------------
// Check if the file is open
//----------------------------------------------------------------------------
Expand Down
17 changes: 17 additions & 0 deletions src/XrdCl/XrdClFileStateHandler.hh
Expand Up @@ -506,6 +506,23 @@ namespace XrdCl
ResponseHandler *handler,
uint16_t timeout = 0 );

//------------------------------------------------------------------------
//! Checkpointed WriteV - async
//!
//! @param offset offset from the beginning of the file
//! @param iov list of the buffers to be written
//! @param iovcnt number of buffers
//! @param handler handler to be notified when the response arrives
//! @param timeout timeout value, if 0 then the environment default
//! will be used
//! @return status of the operation
//------------------------------------------------------------------------
XRootDStatus ChkptWrtV( uint64_t offset,
const struct iovec *iov,
int iovcnt,
ResponseHandler *handler,
uint16_t timeout = 0 );

//------------------------------------------------------------------------
//! Process the results of the opening operation
//------------------------------------------------------------------------
Expand Down

0 comments on commit 6fdc6a5

Please sign in to comment.