Skip to content

Commit

Permalink
[XrdCl] xrdcp: add --zip-append functionality.
Browse files Browse the repository at this point in the history
The crc32 checksum in the LFH and CDFH is still missing.
  • Loading branch information
simonmichal committed Jun 14, 2021
1 parent 02b63a5 commit 133df56
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 13 deletions.
3 changes: 3 additions & 0 deletions src/XrdApps/XrdCpConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ struct option XrdCpConfig::opVec[] = // For getopt_long()
{OPT_TYPE "continue", 0, 0, XrdCpConfig::OpContinue},
{OPT_TYPE "xrate-threshold",1, 0, XrdCpConfig::OpXrateThreashold},
{OPT_TYPE "retry-policy", 1, 0, XrdCpConfig::OpRetryPolicy},
{OPT_TYPE "zip-append", 0, 0, XrdCpConfig::OpZipAppend},
{0, 0, 0, 0}
};

Expand Down Expand Up @@ -272,6 +273,8 @@ do{while(optind < Argc && Legacy(optind)) {}
RetryPolicy = optarg;
if( RetryPolicy != "force" && RetryPolicy != "continue" ) Usage(22);
break;
case OpZipAppend: OpSpec |= DoZipAppend;
break;
case OpServer: OpSpec |= DoServer|DoSilent|DoNoPbar|DoForce;
break;
case OpSilent: OpSpec |= DoSilent|DoNoPbar;
Expand Down
3 changes: 3 additions & 0 deletions src/XrdApps/XrdCpConfig.hh
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ static const uint64_t DoXrateThreashold = 0x0000000200000000LL; // --xrate-th
static const uint64_t OpRetryPolicy = 0x12;
static const uint64_t DoRetryPolicy = 0x0000000400000000LL; // --retry-policy

static const uint64_t OpZipAppend = 0x13;
static const uint64_t DoZipAppend = 0x0000000800000000LL; // --zip-append

// Flag to allow the use of HTTP (and HTTPS) as source and destination
// protocols. If specified, the XrdClHttp client plugin must be available
// for the transfer operations to succeed.
Expand Down
329 changes: 328 additions & 1 deletion src/XrdCl/XrdClClassicCopyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1948,6 +1948,319 @@ namespace
std::string pWrtRecoveryRedir;
std::string pLastURL;
};

//----------------------------------------------------------------------------
//! XRootD destination
//----------------------------------------------------------------------------
class XRootDZipDestination: public Destination
{
public:
//------------------------------------------------------------------------
//! Constructor
//------------------------------------------------------------------------
XRootDZipDestination( const XrdCl::URL &url, const std::string &fn, int64_t size, uint8_t parallelChunks,
const std::string &ckSumType ):
Destination( ckSumType ),
pUrl( url ), pFilename( fn ), pZip( new XrdCl::ZipArchive() ),
pParallel( parallelChunks ), pSize( size )
{
}

//------------------------------------------------------------------------
//! Destructor
//------------------------------------------------------------------------
virtual ~XRootDZipDestination()
{
CleanUpChunks();
delete pZip;

//----------------------------------------------------------------------
// If the copy failed and user requested posc and we are dealing with
// a local destination, remove the file
//----------------------------------------------------------------------
if( pUrl.IsLocalFile() && pPosc && !Result::Get().IsOK() )
{
XrdCl::FileSystem fs( pUrl );
XrdCl::XRootDStatus st = fs.Rm( pUrl.GetPath() );
if( !st.IsOK() )
{
XrdCl::Log *log = XrdCl::DefaultEnv::GetLog();
log->Error( XrdCl::UtilityMsg, "Failed to remove local destination"
" on failure: %s", st.ToString().c_str() );
}
}
}

//------------------------------------------------------------------------
//! Initialize the destination
//------------------------------------------------------------------------
virtual XrdCl::XRootDStatus Initialize()
{
using namespace XrdCl;
Log *log = DefaultEnv::GetLog();
log->Debug( UtilityMsg, "Opening %s for writing",
pUrl.GetURL().c_str() );

std::string value;
DefaultEnv::GetEnv()->GetString( "WriteRecovery", value );
pZip->SetProperty( "WriteRecovery", value );

OpenFlags::Flags flags = OpenFlags::Update;

FileSystem fs( pUrl );
StatInfo *info = nullptr;
auto st = fs.Stat( pUrl.GetPath(), info );
if( !st.IsOK() && st.code == errErrorResponse && st.errNo == kXR_NotFound )
flags |= OpenFlags::New;

if( pPosc )
flags |= OpenFlags::POSC;

if( pCoerce )
flags |= OpenFlags::Force;

if( pMakeDir)
flags |= OpenFlags::MakePath;

st = XrdCl::WaitFor( XrdCl::OpenArchive( pZip, pUrl.GetURL(), flags ) );
if( !st.IsOK() )
return st;

std::string cptarget = XrdCl::DefaultCpTarget;
XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
if( !cptarget.empty() )
{
std::string targeturl;
pZip->GetProperty( "LastURL", targeturl );
if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
log->Warning( UtilityMsg, "Could not create cp-target symlink: %s",
XrdSysE2T( errno ) );
}

st = pZip->OpenFile( pFilename, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write, pSize, 0/*file checksum*/ ); // TODO
if( !st.IsOK() )
return st;

if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
return pCkSumHelper->Initialize();

return XRootDStatus();
}

//------------------------------------------------------------------------
//! Finalize the destination
//------------------------------------------------------------------------
virtual XrdCl::XRootDStatus Finalize()
{
pZip->CloseFile();
return XrdCl::WaitFor( XrdCl::CloseArchive( pZip ) );
}

//------------------------------------------------------------------------
//! Put a data chunk at a destination
//!
//! @param ci chunk information
//! @return status of the operation
//------------------------------------------------------------------------
virtual XrdCl::XRootDStatus PutChunk( XrdCl::ChunkInfo &ci )
{
using namespace XrdCl;

//----------------------------------------------------------------------
// If there is still place for this chunk to be sent send it
//----------------------------------------------------------------------
if( pChunks.size() < pParallel )
return QueueChunk( ci );

//----------------------------------------------------------------------
// We wait for a chunk to be sent so that we have space for the current
// one
//----------------------------------------------------------------------
std::unique_ptr<ChunkHandler> ch( pChunks.front() );
pChunks.pop();
ch->sem->Wait();
delete [] (char*)ch->chunk.buffer;
if( !ch->status.IsOK() )
{
Log *log = DefaultEnv::GetLog();
log->Debug( UtilityMsg, "Unable write %d bytes at %ld from %s: %s",
ch->chunk.length, ch->chunk.offset,
pUrl.GetURL().c_str(), ch->status.ToStr().c_str() );
CleanUpChunks();

//--------------------------------------------------------------------
// Check if we should re-try the transfer from scratch at a different
// data server
//--------------------------------------------------------------------
return CheckIfRetriable( ch->status );
}

return QueueChunk( ci );
}

//------------------------------------------------------------------------
//! Get size
//------------------------------------------------------------------------
virtual int64_t GetSize()
{
return -1;
}

//------------------------------------------------------------------------
//! Clean up the chunks that are flying
//------------------------------------------------------------------------
void CleanUpChunks()
{
while( !pChunks.empty() )
{
ChunkHandler *ch = pChunks.front();
pChunks.pop();
ch->sem->Wait();
delete [] (char *)ch->chunk.buffer;
delete ch;
}
}

//------------------------------------------------------------------------
//! Queue a chunk
//------------------------------------------------------------------------
XrdCl::XRootDStatus QueueChunk( XrdCl::ChunkInfo &ci )
{
// we are writing chunks in order so we can calc the checksum
// in case of local files
if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
pCkSumHelper->Update( ci.buffer, ci.length );

ChunkHandler *ch = new ChunkHandler(ci);
XrdCl::XRootDStatus st;

st = pZip->Write( ci.length, ci.buffer, ch );
if( !st.IsOK() )
{
CleanUpChunks();
delete [] (char*)ci.buffer;
ci.buffer = 0;
delete ch;
return st;
}
pChunks.push( ch );
return XrdCl::XRootDStatus();
}

//------------------------------------------------------------------------
//! Flush chunks that might have been queues
//------------------------------------------------------------------------
virtual XrdCl::XRootDStatus Flush()
{
XrdCl::XRootDStatus st;
while( !pChunks.empty() )
{
ChunkHandler *ch = pChunks.front();
pChunks.pop();
ch->sem->Wait();
if( !ch->status.IsOK() )
{
//--------------------------------------------------------------------
// Check if we should re-try the transfer from scratch at a different
// data server
//--------------------------------------------------------------------
st = CheckIfRetriable( ch->status );
}
delete [] (char *)ch->chunk.buffer;
delete ch;
}
return st;
}

//------------------------------------------------------------------------
//! Get check sum
//------------------------------------------------------------------------
virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
std::string &checkSumType )
{
return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotSupported );
}

//------------------------------------------------------------------------
//! Set extended attributes
//------------------------------------------------------------------------
virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs )
{
return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotSupported );
}

//------------------------------------------------------------------------
//! Get last URL
//------------------------------------------------------------------------
const std::string& GetLastURL() const
{
return pLastURL;
}

//------------------------------------------------------------------------
//! Get write-recovery redirector
//------------------------------------------------------------------------
const std::string& GetWrtRecoveryRedir() const
{
return pWrtRecoveryRedir;
}

private:
XRootDZipDestination(const XRootDDestination &other);
XRootDZipDestination &operator = (const XRootDDestination &other);

//------------------------------------------------------------------------
// Asynchronous chunk handler
//------------------------------------------------------------------------
class ChunkHandler: public XrdCl::ResponseHandler
{
public:
ChunkHandler( XrdCl::ChunkInfo ci ):
sem( new XrdSysSemaphore(0) ),
chunk(ci) {}
virtual ~ChunkHandler() { delete sem; }
virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
XrdCl::AnyObject */*response*/ )
{
this->status = *statusval;
delete statusval;
sem->Post();
}

XrdSysSemaphore *sem;
XrdCl::ChunkInfo chunk;
XrdCl::XRootDStatus status;
};

inline XrdCl::XRootDStatus CheckIfRetriable( XrdCl::XRootDStatus &status )
{
if( status.IsOK() ) return status;

//--------------------------------------------------------------------
// Check if we should re-try the transfer from scratch at a different
// data server
//--------------------------------------------------------------------
std::string value;
if( pZip->GetProperty( "WrtRecoveryRedir", value ) )
{
pWrtRecoveryRedir = value;
if( pZip->GetProperty( "LastURL", value ) ) pLastURL = value;
return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errRetry );
}

return status;
}

const XrdCl::URL pUrl;
std::string pFilename;
XrdCl::ZipArchive *pZip;
uint8_t pParallel;
std::queue<ChunkHandler *> pChunks;
int64_t pSize;

std::string pWrtRecoveryRedir;
std::string pLastURL;
};
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -2014,7 +2327,7 @@ namespace XrdCl
uint32_t chunkSize;
uint64_t blockSize;
bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
rmOnBadCksum, continue_;
rmOnBadCksum, continue_, zipappend;
int32_t nbXcpSources;
long long xRate;
long long xRateThreshold;
Expand All @@ -2039,6 +2352,7 @@ namespace XrdCl
pProperties->Get( "rmOnBadCksum", rmOnBadCksum );
pProperties->Get( "continue", continue_ );
pProperties->Get( "cpTimeout", cpTimeout );
pProperties->Get( "zipAppend", zipappend );

if( zip )
pProperties->Get( "zipSource", zipSource );
Expand All @@ -2050,6 +2364,10 @@ namespace XrdCl
return Result( stError, errInvalidArgs, EINVAL,
"Invalid argument combination: continue + force." );

if( zipappend && ( continue_ || force ) )
return Result( stError, errInvalidArgs, EINVAL,
"Invalid argument combination: ( continue | force ) + zip-append." );

//--------------------------------------------------------------------------
// Start the cp t/o timer if necessary
//--------------------------------------------------------------------------
Expand Down Expand Up @@ -2106,6 +2424,15 @@ namespace XrdCl

if( GetTarget().GetProtocol() == "stdio" )
dest.reset( new StdOutDestination( checkSumType ) );
else if( zipappend )
{
std::string fn = GetSource().GetPath();
size_t pos = fn.rfind( '/' );
if( pos != std::string::npos )
fn = fn.substr( pos + 1 );
int64_t size = src->GetSize();
dest.reset( new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, checkSumType ) );
}
//--------------------------------------------------------------------------
// For xrootd destination build the oss.asize hint
//--------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 133df56

Please sign in to comment.