diff --git a/src/XrdApps/XrdCpConfig.cc b/src/XrdApps/XrdCpConfig.cc index 0b4030faae4..dd5e9041e85 100644 --- a/src/XrdApps/XrdCpConfig.cc +++ b/src/XrdApps/XrdCpConfig.cc @@ -119,6 +119,7 @@ struct option XrdCpConfig::opVec[] = // For getopt_long() {OPT_TYPE "continue", 0, 0, XrdCpConfig::OpContinue}, {OPT_TYPE "xrate-threshold",1, 0, XrdCpConfig::OpXrateThreashold}, {OPT_TYPE "retry-policy", 1, 0, XrdCpConfig::OpRetryPolicy}, + {OPT_TYPE "zip-append", 0, 0, XrdCpConfig::OpZipAppend}, {0, 0, 0, 0} }; @@ -272,6 +273,8 @@ do{while(optind < Argc && Legacy(optind)) {} RetryPolicy = optarg; if( RetryPolicy != "force" && RetryPolicy != "continue" ) Usage(22); break; + case OpZipAppend: OpSpec |= DoZipAppend; + break; case OpServer: OpSpec |= DoServer|DoSilent|DoNoPbar|DoForce; break; case OpSilent: OpSpec |= DoSilent|DoNoPbar; diff --git a/src/XrdApps/XrdCpConfig.hh b/src/XrdApps/XrdCpConfig.hh index 6831a832940..f3b0f73070d 100644 --- a/src/XrdApps/XrdCpConfig.hh +++ b/src/XrdApps/XrdCpConfig.hh @@ -197,6 +197,9 @@ static const uint64_t DoXrateThreashold = 0x0000000200000000LL; // --xrate-th static const uint64_t OpRetryPolicy = 0x12; static const uint64_t DoRetryPolicy = 0x0000000400000000LL; // --retry-policy +static const uint64_t OpZipAppend = 0x13; +static const uint64_t DoZipAppend = 0x0000000800000000LL; // --zip-append + // Flag to allow the use of HTTP (and HTTPS) as source and destination // protocols. If specified, the XrdClHttp client plugin must be available // for the transfer operations to succeed. diff --git a/src/XrdCl/XrdClClassicCopyJob.cc b/src/XrdCl/XrdClClassicCopyJob.cc index 186f1aec7b9..ef25ee17e19 100644 --- a/src/XrdCl/XrdClClassicCopyJob.cc +++ b/src/XrdCl/XrdClClassicCopyJob.cc @@ -1948,6 +1948,319 @@ namespace std::string pWrtRecoveryRedir; std::string pLastURL; }; + + //---------------------------------------------------------------------------- + //! XRootD destination + //---------------------------------------------------------------------------- + class XRootDZipDestination: public Destination + { + public: + //------------------------------------------------------------------------ + //! Constructor + //------------------------------------------------------------------------ + XRootDZipDestination( const XrdCl::URL &url, const std::string &fn, int64_t size, uint8_t parallelChunks, + const std::string &ckSumType ): + Destination( ckSumType ), + pUrl( url ), pFilename( fn ), pZip( new XrdCl::ZipArchive() ), + pParallel( parallelChunks ), pSize( size ) + { + } + + //------------------------------------------------------------------------ + //! Destructor + //------------------------------------------------------------------------ + virtual ~XRootDZipDestination() + { + CleanUpChunks(); + delete pZip; + + //---------------------------------------------------------------------- + // If the copy failed and user requested posc and we are dealing with + // a local destination, remove the file + //---------------------------------------------------------------------- + if( pUrl.IsLocalFile() && pPosc && !Result::Get().IsOK() ) + { + XrdCl::FileSystem fs( pUrl ); + XrdCl::XRootDStatus st = fs.Rm( pUrl.GetPath() ); + if( !st.IsOK() ) + { + XrdCl::Log *log = XrdCl::DefaultEnv::GetLog(); + log->Error( XrdCl::UtilityMsg, "Failed to remove local destination" + " on failure: %s", st.ToString().c_str() ); + } + } + } + + //------------------------------------------------------------------------ + //! Initialize the destination + //------------------------------------------------------------------------ + virtual XrdCl::XRootDStatus Initialize() + { + using namespace XrdCl; + Log *log = DefaultEnv::GetLog(); + log->Debug( UtilityMsg, "Opening %s for writing", + pUrl.GetURL().c_str() ); + + std::string value; + DefaultEnv::GetEnv()->GetString( "WriteRecovery", value ); + pZip->SetProperty( "WriteRecovery", value ); + + OpenFlags::Flags flags = OpenFlags::Update; + + FileSystem fs( pUrl ); + StatInfo *info = nullptr; + auto st = fs.Stat( pUrl.GetPath(), info ); + if( !st.IsOK() && st.code == errErrorResponse && st.errNo == kXR_NotFound ) + flags |= OpenFlags::New; + + if( pPosc ) + flags |= OpenFlags::POSC; + + if( pCoerce ) + flags |= OpenFlags::Force; + + if( pMakeDir) + flags |= OpenFlags::MakePath; + + st = XrdCl::WaitFor( XrdCl::OpenArchive( pZip, pUrl.GetURL(), flags ) ); + if( !st.IsOK() ) + return st; + + std::string cptarget = XrdCl::DefaultCpTarget; + XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget ); + if( !cptarget.empty() ) + { + std::string targeturl; + pZip->GetProperty( "LastURL", targeturl ); + if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 ) + log->Warning( UtilityMsg, "Could not create cp-target symlink: %s", + XrdSysE2T( errno ) ); + } + + st = pZip->OpenFile( pFilename, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write, pSize, 0/*file checksum*/ ); // TODO + if( !st.IsOK() ) + return st; + + if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue ) + return pCkSumHelper->Initialize(); + + return XRootDStatus(); + } + + //------------------------------------------------------------------------ + //! Finalize the destination + //------------------------------------------------------------------------ + virtual XrdCl::XRootDStatus Finalize() + { + pZip->CloseFile(); + return XrdCl::WaitFor( XrdCl::CloseArchive( pZip ) ); + } + + //------------------------------------------------------------------------ + //! Put a data chunk at a destination + //! + //! @param ci chunk information + //! @return status of the operation + //------------------------------------------------------------------------ + virtual XrdCl::XRootDStatus PutChunk( XrdCl::ChunkInfo &ci ) + { + using namespace XrdCl; + + //---------------------------------------------------------------------- + // If there is still place for this chunk to be sent send it + //---------------------------------------------------------------------- + if( pChunks.size() < pParallel ) + return QueueChunk( ci ); + + //---------------------------------------------------------------------- + // We wait for a chunk to be sent so that we have space for the current + // one + //---------------------------------------------------------------------- + std::unique_ptr ch( pChunks.front() ); + pChunks.pop(); + ch->sem->Wait(); + delete [] (char*)ch->chunk.buffer; + if( !ch->status.IsOK() ) + { + Log *log = DefaultEnv::GetLog(); + log->Debug( UtilityMsg, "Unable write %d bytes at %ld from %s: %s", + ch->chunk.length, ch->chunk.offset, + pUrl.GetURL().c_str(), ch->status.ToStr().c_str() ); + CleanUpChunks(); + + //-------------------------------------------------------------------- + // Check if we should re-try the transfer from scratch at a different + // data server + //-------------------------------------------------------------------- + return CheckIfRetriable( ch->status ); + } + + return QueueChunk( ci ); + } + + //------------------------------------------------------------------------ + //! Get size + //------------------------------------------------------------------------ + virtual int64_t GetSize() + { + return -1; + } + + //------------------------------------------------------------------------ + //! Clean up the chunks that are flying + //------------------------------------------------------------------------ + void CleanUpChunks() + { + while( !pChunks.empty() ) + { + ChunkHandler *ch = pChunks.front(); + pChunks.pop(); + ch->sem->Wait(); + delete [] (char *)ch->chunk.buffer; + delete ch; + } + } + + //------------------------------------------------------------------------ + //! Queue a chunk + //------------------------------------------------------------------------ + XrdCl::XRootDStatus QueueChunk( XrdCl::ChunkInfo &ci ) + { + // we are writing chunks in order so we can calc the checksum + // in case of local files + if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue ) + pCkSumHelper->Update( ci.buffer, ci.length ); + + ChunkHandler *ch = new ChunkHandler(ci); + XrdCl::XRootDStatus st; + + st = pZip->Write( ci.length, ci.buffer, ch ); + if( !st.IsOK() ) + { + CleanUpChunks(); + delete [] (char*)ci.buffer; + ci.buffer = 0; + delete ch; + return st; + } + pChunks.push( ch ); + return XrdCl::XRootDStatus(); + } + + //------------------------------------------------------------------------ + //! Flush chunks that might have been queues + //------------------------------------------------------------------------ + virtual XrdCl::XRootDStatus Flush() + { + XrdCl::XRootDStatus st; + while( !pChunks.empty() ) + { + ChunkHandler *ch = pChunks.front(); + pChunks.pop(); + ch->sem->Wait(); + if( !ch->status.IsOK() ) + { + //-------------------------------------------------------------------- + // Check if we should re-try the transfer from scratch at a different + // data server + //-------------------------------------------------------------------- + st = CheckIfRetriable( ch->status ); + } + delete [] (char *)ch->chunk.buffer; + delete ch; + } + return st; + } + + //------------------------------------------------------------------------ + //! Get check sum + //------------------------------------------------------------------------ + virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum, + std::string &checkSumType ) + { + return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotSupported ); + } + + //------------------------------------------------------------------------ + //! Set extended attributes + //------------------------------------------------------------------------ + virtual XrdCl::XRootDStatus SetXAttr( const std::vector &xattrs ) + { + return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotSupported ); + } + + //------------------------------------------------------------------------ + //! Get last URL + //------------------------------------------------------------------------ + const std::string& GetLastURL() const + { + return pLastURL; + } + + //------------------------------------------------------------------------ + //! Get write-recovery redirector + //------------------------------------------------------------------------ + const std::string& GetWrtRecoveryRedir() const + { + return pWrtRecoveryRedir; + } + + private: + XRootDZipDestination(const XRootDDestination &other); + XRootDZipDestination &operator = (const XRootDDestination &other); + + //------------------------------------------------------------------------ + // Asynchronous chunk handler + //------------------------------------------------------------------------ + class ChunkHandler: public XrdCl::ResponseHandler + { + public: + ChunkHandler( XrdCl::ChunkInfo ci ): + sem( new XrdSysSemaphore(0) ), + chunk(ci) {} + virtual ~ChunkHandler() { delete sem; } + virtual void HandleResponse( XrdCl::XRootDStatus *statusval, + XrdCl::AnyObject */*response*/ ) + { + this->status = *statusval; + delete statusval; + sem->Post(); + } + + XrdSysSemaphore *sem; + XrdCl::ChunkInfo chunk; + XrdCl::XRootDStatus status; + }; + + inline XrdCl::XRootDStatus CheckIfRetriable( XrdCl::XRootDStatus &status ) + { + if( status.IsOK() ) return status; + + //-------------------------------------------------------------------- + // Check if we should re-try the transfer from scratch at a different + // data server + //-------------------------------------------------------------------- + std::string value; + if( pZip->GetProperty( "WrtRecoveryRedir", value ) ) + { + pWrtRecoveryRedir = value; + if( pZip->GetProperty( "LastURL", value ) ) pLastURL = value; + return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errRetry ); + } + + return status; + } + + const XrdCl::URL pUrl; + std::string pFilename; + XrdCl::ZipArchive *pZip; + uint8_t pParallel; + std::queue pChunks; + int64_t pSize; + + std::string pWrtRecoveryRedir; + std::string pLastURL; + }; } //------------------------------------------------------------------------------ @@ -2014,7 +2327,7 @@ namespace XrdCl uint32_t chunkSize; uint64_t blockSize; bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr, - rmOnBadCksum, continue_; + rmOnBadCksum, continue_, zipappend; int32_t nbXcpSources; long long xRate; long long xRateThreshold; @@ -2039,6 +2352,7 @@ namespace XrdCl pProperties->Get( "rmOnBadCksum", rmOnBadCksum ); pProperties->Get( "continue", continue_ ); pProperties->Get( "cpTimeout", cpTimeout ); + pProperties->Get( "zipAppend", zipappend ); if( zip ) pProperties->Get( "zipSource", zipSource ); @@ -2050,6 +2364,10 @@ namespace XrdCl return Result( stError, errInvalidArgs, EINVAL, "Invalid argument combination: continue + force." ); + if( zipappend && ( continue_ || force ) ) + return Result( stError, errInvalidArgs, EINVAL, + "Invalid argument combination: ( continue | force ) + zip-append." ); + //-------------------------------------------------------------------------- // Start the cp t/o timer if necessary //-------------------------------------------------------------------------- @@ -2106,6 +2424,15 @@ namespace XrdCl if( GetTarget().GetProtocol() == "stdio" ) dest.reset( new StdOutDestination( checkSumType ) ); + else if( zipappend ) + { + std::string fn = GetSource().GetPath(); + size_t pos = fn.rfind( '/' ); + if( pos != std::string::npos ) + fn = fn.substr( pos + 1 ); + int64_t size = src->GetSize(); + dest.reset( new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, checkSumType ) ); + } //-------------------------------------------------------------------------- // For xrootd destination build the oss.asize hint //-------------------------------------------------------------------------- diff --git a/src/XrdCl/XrdClCopy.cc b/src/XrdCl/XrdClCopy.cc index c2e27754e32..93bb6389e03 100644 --- a/src/XrdCl/XrdClCopy.cc +++ b/src/XrdCl/XrdClCopy.cc @@ -494,13 +494,15 @@ int main( int argc, char **argv ) bool rmOnBadCksum = false; bool continue_ = false; bool recurse = false; + bool zipappend = false; std::string thirdParty = "none"; - if( config.Want( XrdCpConfig::DoPosc ) ) posc = true; - if( config.Want( XrdCpConfig::DoForce ) ) force = true; - if( config.Want( XrdCpConfig::DoCoerce ) ) coerce = true; - if( config.Want( XrdCpConfig::DoTpc ) ) thirdParty = "first"; - if( config.Want( XrdCpConfig::DoTpcOnly ) ) thirdParty = "only"; + if( config.Want( XrdCpConfig::DoPosc ) ) posc = true; + if( config.Want( XrdCpConfig::DoForce ) ) force = true; + if( config.Want( XrdCpConfig::DoCoerce ) ) coerce = true; + if( config.Want( XrdCpConfig::DoTpc ) ) thirdParty = "first"; + if( config.Want( XrdCpConfig::DoTpcOnly ) ) thirdParty = "only"; + if( config.Want( XrdCpConfig::DoZipAppend ) ) zipappend = true; if( config.Want( XrdCpConfig::DoTpcDlgt ) ) { // the env var is being set already here (we are issuing a stat @@ -709,7 +711,7 @@ int main( int argc, char **argv ) delete statInfo; } - if( !targetIsDir && targetExists && !force && !recurse ) + if( !targetIsDir && targetExists && !force && !recurse && !zipappend ) { XRootDStatus st( stError, errInvalidOp, EEXIST ); // Unable to create /tmp/test.txt; file exists @@ -855,6 +857,7 @@ int main( int argc, char **argv ) properties.Set( "xrateThreashold", config.xRateThreashold ); properties.Set( "rmOnBadCksum", rmOnBadCksum ); properties.Set( "continue", continue_ ); + properties.Set( "zipAppend", zipappend ); if( zip ) properties.Set( "zipSource", zipFile ); diff --git a/src/XrdCl/XrdClCopyProcess.cc b/src/XrdCl/XrdClCopyProcess.cc index 904e2415767..00c84a95ef0 100644 --- a/src/XrdCl/XrdClCopyProcess.cc +++ b/src/XrdCl/XrdClCopyProcess.cc @@ -264,7 +264,7 @@ namespace XrdCl PropertyList &p = pImpl->pJobProperties.back(); const char *bools[] = {"target", "force", "posc", "coerce", "makeDir", "zipArchive", - "xcp", "preserveXAttr", "rmOnBadCksum", "continue", 0}; + "xcp", "preserveXAttr", "rmOnBadCksum", "continue", "zipAppend", 0}; for( int i = 0; bools[i]; ++i ) if( !p.HasProperty( bools[i] ) ) p.Set( bools[i], false ); diff --git a/src/XrdCl/XrdClZipArchive.cc b/src/XrdCl/XrdClZipArchive.cc index a6b70b288de..42a667c0263 100644 --- a/src/XrdCl/XrdClZipArchive.cc +++ b/src/XrdCl/XrdClZipArchive.cc @@ -338,7 +338,7 @@ namespace XrdCl log->Dump( ZipMsg, "[0x%x] Open failed: file exists %s, cannot append.", this, fn.c_str() ); - return XRootDStatus( stError, errInvalidOp ); + return XRootDStatus( stError, errInvalidOp, EEXIST, "The file already exists in the ZIP archive." ); } openfn = fn; @@ -653,7 +653,7 @@ namespace XrdCl { if( openstage != Done ) return XRootDStatus( stError, errInvalidOp, - errInvalidOp, "Archive not opened." ); + 0, "Archive not opened." ); std::string value; archive.GetProperty( "LastURL", value ); @@ -762,6 +762,7 @@ namespace XrdCl mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; cdvec.emplace_back( new CDFH( lfh.get(), mode, wrtoff ) ); cdmap[openfn] = cdvec.size() - 1; + lfh.reset(); } Async( std::move( p ), timeout ); return XRootDStatus(); diff --git a/src/XrdCl/XrdClZipArchive.hh b/src/XrdCl/XrdClZipArchive.hh index 87a7e399926..d6e22779dc1 100644 --- a/src/XrdCl/XrdClZipArchive.hh +++ b/src/XrdCl/XrdClZipArchive.hh @@ -158,8 +158,7 @@ namespace XrdCl uint16_t timeout = 0 ) { if( openstage != Done || openfn.empty() ) - return XRootDStatus( stError, errInvalidOp, - errInvalidOp, "Archive not opened." ); + return XRootDStatus( stError, errInvalidOp, 0, "Archive not opened." ); return WriteImpl( size, buffer, handler, timeout ); } @@ -252,7 +251,7 @@ namespace XrdCl { if( openstage != Done || openfn.empty() ) return XRootDStatus( stError, errInvalidOp, - errInvalidOp, "Archive not opened." ); + 0, "Archive not opened." ); openfn.clear(); lfh.reset(); return XRootDStatus(); @@ -280,6 +279,14 @@ namespace XrdCl return archive.SetProperty( name, value ); } + //----------------------------------------------------------------------- + //! Get property on the underlying File object + //----------------------------------------------------------------------- + inline bool GetProperty( const std::string &name, std::string &value ) + { + return archive.GetProperty( name, value ); + } + private: //-----------------------------------------------------------------------