From 1e6efc2013ce0434b3024127b95fd668c6d10a88 Mon Sep 17 00:00:00 2001 From: Michal Simon Date: Tue, 19 Jan 2021 22:24:53 +0100 Subject: [PATCH] [XrdEc] Split StrmWriter into hh/cc. --- src/XrdEc/CMakeLists.txt | 2 +- src/XrdEc/XrdEcStrmWriter.cc | 279 ++++++++++++++++++++++++++++- src/XrdEc/XrdEcStrmWriter.hh | 332 +++++++---------------------------- 3 files changed, 340 insertions(+), 273 deletions(-) diff --git a/src/XrdEc/CMakeLists.txt b/src/XrdEc/CMakeLists.txt index 318b8175437..4effa607c21 100644 --- a/src/XrdEc/CMakeLists.txt +++ b/src/XrdEc/CMakeLists.txt @@ -80,7 +80,7 @@ add_library( XrdEcUtilities.hh XrdEcUtilities.cc XrdEcWrtBuff.hh XrdEcThreadPool.hh - XrdEcStrmWriter.hh + XrdEcStrmWriter.hh XrdEcStrmWriter.cc XrdEcReader.hh XrdEcReader.cc ) diff --git a/src/XrdEc/XrdEcStrmWriter.cc b/src/XrdEc/XrdEcStrmWriter.cc index 1c57fbfbee8..4d4922e8f2f 100644 --- a/src/XrdEc/XrdEcStrmWriter.cc +++ b/src/XrdEc/XrdEcStrmWriter.cc @@ -14,13 +14,284 @@ #include #include -namespace +namespace XrdEc { + //--------------------------------------------------------------------------- + // Open the data object for writting + //--------------------------------------------------------------------------- + void StrmWriter::Open( XrdCl::ResponseHandler *handler ) + { + const size_t size = objcfg.plgr.size(); -} + std::vector opens; + opens.reserve( size ); + // initialize all zip archive objects + for( size_t i = 0; i < size; ++i ) + dataarchs.emplace_back( std::make_shared() ); -namespace XrdEc -{ + for( size_t i = 0; i < size; ++i ) + { + std::string url = objcfg.GetDataUrl( i ); + XrdCl::Ctx zip( *dataarchs[i] ); + opens.emplace_back( XrdCl::OpenArchive( zip, url, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write ) ); + } + + XrdCl::Async( XrdCl::Parallel( opens ).AtLeast( objcfg.nbchunks ) >> + [=]( XrdCl::XRootDStatus &st ) + { + if( !st.IsOK() ) global_status.report_open( st ); + handler->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr ); + } ); + } + + //--------------------------------------------------------------------------- + // Write data to the data object + //--------------------------------------------------------------------------- + void StrmWriter::Write( uint32_t size, const void *buff, XrdCl::ResponseHandler *handler ) + { + //------------------------------------------------------------------------- + // First, check the global status, if we are in an error state just + // fail the request. + //------------------------------------------------------------------------- + XrdCl::XRootDStatus gst = global_status.get(); + if( !gst.IsOK() ) return ScheduleHandler( handler, gst ); + + //------------------------------------------------------------------------- + // Update the number of bytes left to be written + //------------------------------------------------------------------------- + global_status.issue_write( size ); + + const char* buffer = reinterpret_cast( buff ); + uint32_t wrtsize = size; + while( wrtsize > 0 ) + { + if( !wrtbuff ) wrtbuff.reset( new WrtBuff( objcfg ) ); + uint64_t written = wrtbuff->Write( wrtsize, buffer ); + buffer += written; + wrtsize -= written; + if( wrtbuff->Complete() ) EnqueueBuff( std::move( wrtbuff ) ); + } + + //------------------------------------------------------------------------- + // We can tell the user it's done as we have the date cached in the + // buffer + //------------------------------------------------------------------------- + ScheduleHandler( handler ); + } + + //--------------------------------------------------------------------------- + // Close the data object + //--------------------------------------------------------------------------- + void StrmWriter::Close( XrdCl::ResponseHandler *handler ) + { + //------------------------------------------------------------------------- + // First, check the global status, if we are in an error state just + // fail the request. + //------------------------------------------------------------------------- + XrdCl::XRootDStatus gst = global_status.get(); + if( !gst.IsOK() ) return ScheduleHandler( handler, gst ); + //------------------------------------------------------------------------- + // Take care of the left-over data ... + //------------------------------------------------------------------------- + if( wrtbuff && !wrtbuff->Empty() ) EnqueueBuff( std::move( wrtbuff ) ); + //------------------------------------------------------------------------- + // Let the global status handle the close + //------------------------------------------------------------------------- + global_status.issue_close( handler ); + } + + //--------------------------------------------------------------------------- + // Issue the write requests for the given write buffer + //--------------------------------------------------------------------------- + void StrmWriter::WriteBuff( std::unique_ptr buff ) + { + //------------------------------------------------------------------------- + // Our buffer with the data block, will be shared between all pipelines + // writing to different servers. + //------------------------------------------------------------------------- + std::shared_ptr wrtbuff( std::move( buff ) ); + + //------------------------------------------------------------------------- + // Shuffle the servers so every block has a different placement + //------------------------------------------------------------------------- + static std::default_random_engine random_engine( std::chrono::system_clock::now().time_since_epoch().count() ); + std::shared_ptr> servers = std::make_shared>(); + std::vector zipid( dataarchs.size() ); + std::iota( zipid.begin(), zipid.end(), 0 ); + std::shuffle( zipid.begin(), zipid.end(), random_engine ); + auto itr = zipid.begin(); + for( ; itr != zipid.end() ; ++itr ) servers->enqueue( std::move( *itr ) ); + + //------------------------------------------------------------------------- + // Create the write pipelines for updating stripes + //------------------------------------------------------------------------- + const size_t nbchunks = objcfg.nbchunks; + std::vector writes; + writes.reserve( nbchunks ); + size_t blknb = next_blknb++; + uint64_t blksize = 0; + for( size_t strpnb = 0; strpnb < nbchunks; ++strpnb ) + { + std::string fn = objcfg.GetFileName( blknb, strpnb ); + uint32_t crc32c = wrtbuff->GetCrc32c( strpnb ); + uint64_t strpsize = wrtbuff->GetStrpSize( strpnb ); + char* strpbuff = wrtbuff->GetStrpBuff( strpnb ); + if( strpnb < objcfg.nbdata ) blksize += strpsize; + + //----------------------------------------------------------------------- + // Find a server where we can append the next data chunk + //----------------------------------------------------------------------- + XrdCl::Ctx zip; + size_t srvid; + if( !servers->dequeue( srvid ) ) + { + XrdCl::XRootDStatus err( XrdCl::stError, XrdCl::errNoMoreReplicas, + 0, "No more data servers to try." ); + //--------------------------------------------------------------------- + // calculate the full block size, otherwise the user handler + // will be never called + //--------------------------------------------------------------------- + for( size_t i = strpnb + 1; i < objcfg.nbdata; ++i ) + blksize += wrtbuff->GetStrpSize( i ); + global_status.report_wrt( err, blksize ); + return; + } + zip = *dataarchs[srvid]; + + //----------------------------------------------------------------------- + // Create the Write request + //----------------------------------------------------------------------- + XrdCl::Pipeline p = XrdCl::AppendFile( zip, fn, crc32c, strpsize, strpbuff ) >> + [=]( XrdCl::XRootDStatus &st ) mutable + { + //------------------------------------------------ + // Try to recover from error + //------------------------------------------------ + if( !st.IsOK() ) + { + //---------------------------------------------- + // Select another server + //---------------------------------------------- + size_t srvid; + if( !servers->dequeue( srvid ) ) return; // if there are no more servers we simply fail + zip = *dataarchs[srvid]; + //---------------------------------------------- + // Retry this operation at different server + //---------------------------------------------- + XrdCl::Pipeline::Repeat(); + } + //------------------------------------------------ + // Make sure the buffer is only deallocated + // after the handler is called + //------------------------------------------------ + wrtbuff.reset(); + }; + writes.emplace_back( std::move( p ) ); + } + + XrdCl::Async( XrdCl::Parallel( writes ) >> [=]( XrdCl::XRootDStatus &st ){ global_status.report_wrt( st, blksize ); } ); + } + + //--------------------------------------------------------------------------- + // Get a buffer with metadata (CDFH and EOCD records) + //--------------------------------------------------------------------------- + XrdZip::buffer_t StrmWriter::GetMetadataBuffer() + { + using namespace XrdZip; + + const size_t cdcnt = objcfg.plgr.size(); + std::vector buffs; buffs.reserve( cdcnt ); // buffers with raw data + std::vector lfhs; lfhs.reserve( cdcnt ); // LFH records + std::vector cdfhs; cdfhs.reserve( cdcnt ); // CDFH records + + //------------------------------------------------------------------------- + // prepare data structures (LFH and CDFH records) + //------------------------------------------------------------------------- + uint64_t offset = 0; + uint64_t cdsize = 0; + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + for( size_t i = 0; i < cdcnt; ++i ) + { + std::string fn = objcfg.GetDataUrl( i ); // file name (URL of the data archive) + buffer_t buff( dataarchs[i]->GetCD() ); // raw data buffer (central directory of the data archive) + uint32_t cksum = crc32c( 0, buff.data(), buff.size() ); // crc32c of the buffer + lfhs.emplace_back( fn, cksum, buff.size(), time( 0 ) ); // LFH record for the buffer + LFH &lfh = lfhs.back(); + cdfhs.emplace_back( &lfh, mode, offset ); // CDFH record for the buffer + offset += LFH::lfhBaseSize + fn.size() + buff.size(); // shift the offset + cdsize += cdfhs.back().cdfhSize; // update central directory size + buffs.emplace_back( std::move( buff ) ); // keep the buffer for later + } + + uint64_t zipsize = offset + cdsize + EOCD::eocdBaseSize; + buffer_t zipbuff; zipbuff.reserve( zipsize ); + + //------------------------------------------------------------------------- + // write into the final buffer LFH records + raw data + //------------------------------------------------------------------------- + for( size_t i = 0; i < cdcnt; ++i ) + { + lfhs[i].Serialize( zipbuff ); + std::copy( buffs[i].begin(), buffs[i].end(), std::back_inserter( zipbuff ) ); + } + //------------------------------------------------------------------------- + // write into the final buffer CDFH records + //------------------------------------------------------------------------- + for( size_t i = 0; i < cdcnt; ++i ) + cdfhs[i].Serialize( zipbuff ); + //------------------------------------------------------------------------- + // prepare and write into the final buffer the EOCD record + //------------------------------------------------------------------------- + EOCD eocd( offset, cdcnt, cdsize ); + eocd.Serialize( zipbuff ); + + return zipbuff; + } + + //--------------------------------------------------------------------------- + // Close the data object (implementation) + //--------------------------------------------------------------------------- + void StrmWriter::CloseImpl( XrdCl::ResponseHandler *handler ) + { + const size_t size = objcfg.plgr.size(); + //------------------------------------------------------------------------- + // prepare the metadata (the Central Directory of each data ZIP) + //------------------------------------------------------------------------- + auto zipbuff = std::make_shared( GetMetadataBuffer() ); + //------------------------------------------------------------------------- + // prepare the pipelines ... + //------------------------------------------------------------------------- + std::vector closes; + std::vector save_metadata; + closes.reserve( size ); + for( size_t i = 0; i < size; ++i ) + { + //----------------------------------------------------------------------- + // close ZIP archives with data + //----------------------------------------------------------------------- + closes.emplace_back( XrdCl::CloseArchive( *dataarchs[i] ) ); + //----------------------------------------------------------------------- + // replicate the metadata + //----------------------------------------------------------------------- + std::string url = objcfg.GetMetadataUrl( i ); + metadataarchs.emplace_back( std::make_shared() ); + XrdCl::Pipeline p = XrdCl::Open( *metadataarchs[i], url, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write ) + | XrdCl::Write( *metadataarchs[i], 0, zipbuff->size(), zipbuff->data() ) + | XrdCl::Close( *metadataarchs[i] ) + | XrdCl::Final( [zipbuff]( const XrdCl::XRootDStatus& ){ } ); + save_metadata.emplace_back( std::move( p ) ); + } + //------------------------------------------------------------------------- + // compose closes & save_metadata: + // - closes must be successful at least for #data + #parity + // - save_metadata must be successful at least for #parity + 1 + //------------------------------------------------------------------------- + XrdCl::Pipeline p = XrdCl::Parallel( + XrdCl::Parallel( closes ).AtLeast( objcfg.nbchunks ), + XrdCl::Parallel( save_metadata ).AtLeast( objcfg.nbparity + 1 ) + ) >> handler; + XrdCl::Async( std::move( p ) ); + } } diff --git a/src/XrdEc/XrdEcStrmWriter.hh b/src/XrdEc/XrdEcStrmWriter.hh index 26be70a61a9..a4009e003c8 100644 --- a/src/XrdEc/XrdEcStrmWriter.hh +++ b/src/XrdEc/XrdEcStrmWriter.hh @@ -67,80 +67,28 @@ namespace XrdEc writer_thread.join(); } - void Open( XrdCl::ResponseHandler *handler ) - { - const size_t size = objcfg.plgr.size(); - - std::vector opens; - opens.reserve( size ); - // initialize all zip archive objects - for( size_t i = 0; i < size; ++i ) - dataarchs.emplace_back( std::make_shared() ); - - for( size_t i = 0; i < size; ++i ) - { - std::string url = objcfg.GetDataUrl( i ); - XrdCl::Ctx zip( *dataarchs[i] ); - opens.emplace_back( XrdCl::OpenArchive( zip, url, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write ) ); - } - - XrdCl::Async( XrdCl::Parallel( opens ).AtLeast( objcfg.nbchunks ) >> - [=]( XrdCl::XRootDStatus &st ) - { - if( !st.IsOK() ) global_status.report_open( st ); - handler->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr ); - } ); - } - - void Write( uint32_t size, const void *buff, XrdCl::ResponseHandler *handler ) - { - //--------------------------------------------------------------------- - // First, check the global status, if we are in an error state just - // fail the request. - //--------------------------------------------------------------------- - XrdCl::XRootDStatus gst = global_status.get(); - if( !gst.IsOK() ) return ScheduleHandler( handler, gst ); - - //--------------------------------------------------------------------- - // Update the number of bytes left to be written - //--------------------------------------------------------------------- - global_status.issue_write( size ); - - const char* buffer = reinterpret_cast( buff ); - uint32_t wrtsize = size; - while( wrtsize > 0 ) - { - if( !wrtbuff ) wrtbuff.reset( new WrtBuff( objcfg ) ); - uint64_t written = wrtbuff->Write( wrtsize, buffer ); - buffer += written; - wrtsize -= written; - if( wrtbuff->Complete() ) EnqueueBuff( std::move( wrtbuff ) ); - } + //----------------------------------------------------------------------- + //! Open the data object for writting + //! + //! @param handler : user callback + //----------------------------------------------------------------------- + void Open( XrdCl::ResponseHandler *handler ); - //--------------------------------------------------------------------- - // We can tell the user it's done as we have the date cached in the - // buffer - //--------------------------------------------------------------------- - ScheduleHandler( handler ); - } + //----------------------------------------------------------------------- + //! Write data to the data object + //! + //! @param size : number of bytes to be written + //! @param buff : buffer with data to be written + //! @param handler : user callback + //----------------------------------------------------------------------- + void Write( uint32_t size, const void *buff, XrdCl::ResponseHandler *handler ); - void Close( XrdCl::ResponseHandler *handler ) - { - //--------------------------------------------------------------------- - // First, check the global status, if we are in an error state just - // fail the request. - //--------------------------------------------------------------------- - XrdCl::XRootDStatus gst = global_status.get(); - if( !gst.IsOK() ) return ScheduleHandler( handler, gst ); - //--------------------------------------------------------------------- - // Take care of the left-over data ... - //--------------------------------------------------------------------- - if( wrtbuff && !wrtbuff->Empty() ) EnqueueBuff( std::move( wrtbuff ) ); - //--------------------------------------------------------------------- - // Let the global status handle the close - //--------------------------------------------------------------------- - global_status.issue_close( handler ); - } + //----------------------------------------------------------------------- + //! Close the data objectA + //! + //! @param handler : user callback + //----------------------------------------------------------------------- + void Close( XrdCl::ResponseHandler *handler ); private: @@ -234,6 +182,11 @@ namespace XrdEc XrdCl::ResponseHandler *closeHandler; //> user close handler }; + //----------------------------------------------------------------------- + //! Enqueue the write buffer for calculating parity and crc32c + //! + //! @param wrtbuff : the write buffer + //----------------------------------------------------------------------- inline void EnqueueBuff( std::unique_ptr wrtbuff ) { // the routine to be called in the thread-pool @@ -248,6 +201,11 @@ namespace XrdEc buffers.enqueue( ThreadPool::Instance().Execute( prepare_buff, wrtbuff.release() ) ); } + //----------------------------------------------------------------------- + //! Dequeue a write buffer after it has been erasure coded and checksumed + //! + //! @return : the write buffer, ready for writing + //----------------------------------------------------------------------- inline std::unique_ptr DequeueBuff() { std::future ftr = buffers.dequeue(); @@ -255,6 +213,11 @@ namespace XrdEc return std::move( result ); } + //----------------------------------------------------------------------- + //! The writing routine running in a dedicated thread. + //! + //! @param me : the StrmWriter object + //----------------------------------------------------------------------- static void writer_routine( StrmWriter *me ) { try @@ -269,206 +232,39 @@ namespace XrdEc catch( const buff_queue::wait_interrupted& ){ } } - void WriteBuff( std::unique_ptr buff ) - { - //--------------------------------------------------------------------- - // Our buffer with the data block, will be shared between all pipelines - // writing to different servers. - //--------------------------------------------------------------------- - std::shared_ptr wrtbuff( std::move( buff ) ); - - //--------------------------------------------------------------------- - // Shuffle the servers so every block has a different placement - //--------------------------------------------------------------------- - static std::default_random_engine random_engine( std::chrono::system_clock::now().time_since_epoch().count() ); - std::shared_ptr> servers = std::make_shared>(); - std::vector zipid( dataarchs.size() ); - std::iota( zipid.begin(), zipid.end(), 0 ); - std::shuffle( zipid.begin(), zipid.end(), random_engine ); - auto itr = zipid.begin(); - for( ; itr != zipid.end() ; ++itr ) servers->enqueue( std::move( *itr ) ); - - //--------------------------------------------------------------------- - // Create the write pipelines for updating stripes - //--------------------------------------------------------------------- - const size_t nbchunks = objcfg.nbchunks; - std::vector writes; - writes.reserve( nbchunks ); - size_t blknb = next_blknb++; - uint64_t blksize = 0; - for( size_t strpnb = 0; strpnb < nbchunks; ++strpnb ) - { - std::string fn = objcfg.GetFileName( blknb, strpnb ); - uint32_t crc32c = wrtbuff->GetCrc32c( strpnb ); - uint64_t strpsize = wrtbuff->GetStrpSize( strpnb ); - char* strpbuff = wrtbuff->GetStrpBuff( strpnb ); - if( strpnb < objcfg.nbdata ) blksize += strpsize; - - //------------------------------------------------------------------- - // Find a server where we can append the next data chunk - //------------------------------------------------------------------- - XrdCl::Ctx zip; - size_t srvid; - if( !servers->dequeue( srvid ) ) - { - XrdCl::XRootDStatus err( XrdCl::stError, XrdCl::errNoMoreReplicas, - 0, "No more data servers to try." ); - //----------------------------------------------------------------- - // calculate the full block size, otherwise the user handler - // will be never called - //----------------------------------------------------------------- - for( size_t i = strpnb + 1; i < objcfg.nbdata; ++i ) - blksize += wrtbuff->GetStrpSize( i ); - global_status.report_wrt( err, blksize ); - return; - } - zip = *dataarchs[srvid]; - - //------------------------------------------------------------------- - // Create the Write request - //------------------------------------------------------------------- - XrdCl::Pipeline p = XrdCl::AppendFile( zip, fn, crc32c, strpsize, strpbuff ) >> - [=]( XrdCl::XRootDStatus &st ) mutable - { - //--------------------------------------------- - // Try to recover from error - //--------------------------------------------- - if( !st.IsOK() ) - { - //------------------------------------------- - // Select another server - //------------------------------------------- - size_t srvid; - if( !servers->dequeue( srvid ) ) return; // if there are no more servers we simply fail - zip = *dataarchs[srvid]; - //------------------------------------------- - // Retry this operation at different server - //------------------------------------------- - XrdCl::Pipeline::Repeat(); - } - //--------------------------------------------- - // Make sure the buffer is only deallocated - // after the handler is called - //--------------------------------------------- - wrtbuff.reset(); - }; - writes.emplace_back( std::move( p ) ); - } - - XrdCl::Async( XrdCl::Parallel( writes ) >> [=]( XrdCl::XRootDStatus &st ){ global_status.report_wrt( st, blksize ); } ); - } - - XrdZip::buffer_t GetMetadataBuffer() - { - using namespace XrdZip; - - const size_t cdcnt = objcfg.plgr.size(); - std::vector buffs; buffs.reserve( cdcnt ); // buffers with raw data - std::vector lfhs; lfhs.reserve( cdcnt ); // LFH records - std::vector cdfhs; cdfhs.reserve( cdcnt ); // CDFH records - - //--------------------------------------------------------------------- - // prepare data structures (LFH and CDFH records) - //--------------------------------------------------------------------- - uint64_t offset = 0; - uint64_t cdsize = 0; - mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; - for( size_t i = 0; i < cdcnt; ++i ) - { - std::string fn = objcfg.GetDataUrl( i ); // file name (URL of the data archive) - buffer_t buff( dataarchs[i]->GetCD() ); // raw data buffer (central directory of the data archive) - uint32_t cksum = crc32c( 0, buff.data(), buff.size() ); // crc32c of the buffer - lfhs.emplace_back( fn, cksum, buff.size(), time( 0 ) ); // LFH record for the buffer - LFH &lfh = lfhs.back(); - cdfhs.emplace_back( &lfh, mode, offset ); // CDFH record for the buffer - offset += LFH::lfhBaseSize + fn.size() + buff.size(); // shift the offset - cdsize += cdfhs.back().cdfhSize; // update central directory size - buffs.emplace_back( std::move( buff ) ); // keep the buffer for later - } - - uint64_t zipsize = offset + cdsize + EOCD::eocdBaseSize; - buffer_t zipbuff; zipbuff.reserve( zipsize ); - - //--------------------------------------------------------------------- - // write into the final buffer LFH records + raw data - //--------------------------------------------------------------------- - for( size_t i = 0; i < cdcnt; ++i ) - { - lfhs[i].Serialize( zipbuff ); - std::copy( buffs[i].begin(), buffs[i].end(), std::back_inserter( zipbuff ) ); - } - //--------------------------------------------------------------------- - // write into the final buffer CDFH records - //--------------------------------------------------------------------- - for( size_t i = 0; i < cdcnt; ++i ) - cdfhs[i].Serialize( zipbuff ); - //--------------------------------------------------------------------- - // prepare and write into the final buffer the EOCD record - //--------------------------------------------------------------------- - EOCD eocd( offset, cdcnt, cdsize ); - eocd.Serialize( zipbuff ); - - return zipbuff; - } - - void CloseImpl( XrdCl::ResponseHandler *handler ) - { - const size_t size = objcfg.plgr.size(); - //--------------------------------------------------------------------- - // prepare the metadata (the Central Directory of each data ZIP) - //--------------------------------------------------------------------- - auto zipbuff = std::make_shared( GetMetadataBuffer() ); - //--------------------------------------------------------------------- - // prepare the pipelines ... - //--------------------------------------------------------------------- - std::vector closes; - std::vector save_metadata; - closes.reserve( size ); - for( size_t i = 0; i < size; ++i ) - { - //------------------------------------------------------------------- - // close ZIP archives with data - //------------------------------------------------------------------- - closes.emplace_back( XrdCl::CloseArchive( *dataarchs[i] ) ); - //------------------------------------------------------------------- - // replicate the metadata - //------------------------------------------------------------------- - std::string url = objcfg.GetMetadataUrl( i ); - metadataarchs.emplace_back( std::make_shared() ); - XrdCl::Pipeline p = XrdCl::Open( *metadataarchs[i], url, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write ) - | XrdCl::Write( *metadataarchs[i], 0, zipbuff->size(), zipbuff->data() ) - | XrdCl::Close( *metadataarchs[i] ) - | XrdCl::Final( [zipbuff]( const XrdCl::XRootDStatus& ){ } ); + //----------------------------------------------------------------------- + //! Issue the write requests for the given write buffer + //! + //! @param buff : the buffer to be written + //----------------------------------------------------------------------- + void WriteBuff( std::unique_ptr buff ); - save_metadata.emplace_back( std::move( p ) ); - } + //----------------------------------------------------------------------- + //! Get a buffer with metadata (CDFH and EOCD records) + //! + //! @return : the buffer with metadata + //----------------------------------------------------------------------- + XrdZip::buffer_t GetMetadataBuffer(); - //--------------------------------------------------------------------- - // compose closes & save_metadata: - // - closes must be successful at least for #data + #parity - // - save_metadata must be successful at least for #parity + 1 - //--------------------------------------------------------------------- - XrdCl::Pipeline p = XrdCl::Parallel( - XrdCl::Parallel( closes ).AtLeast( objcfg.nbchunks ), - XrdCl::Parallel( save_metadata ).AtLeast( objcfg.nbparity + 1 ) - ) >> handler; - XrdCl::Async( std::move( p ) ); - } + //----------------------------------------------------------------------- + //! Close the data object (implementation) + //! + //! @param handler : user callback + //----------------------------------------------------------------------- + void CloseImpl( XrdCl::ResponseHandler *handler ); const ObjCfg &objcfg; - std::unique_ptr wrtbuff; - std::vector> dataarchs; - std::vector> metadataarchs; - std::vector cdbuffs; - - // queue of buffer being prepared (erasure encoded and checksummed) for write - buff_queue buffers; - - std::atomic writer_thread_stop; - std::thread writer_thread; - size_t next_blknb; - - global_status_t global_status; + std::unique_ptr wrtbuff; //< current write buffer + std::vector> dataarchs; //< ZIP archives with data + std::vector> metadataarchs; //< ZIP archives with metadata + std::vector cdbuffs; //< buffers with CDs + buff_queue buffers; //< queue of buffer for writing + //< (waiting to be erasure coded) + std::atomic writer_thread_stop; //< true if the writer thread should be stopped, + //< flase otherwise + std::thread writer_thread; //< handle to the writer thread + size_t next_blknb; //< number of the next block to be created + global_status_t global_status; //< global status of the writer }; }