Skip to content

Commit

Permalink
[XrdEc] StrmWriter::Close : make sure user handler is always called.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Jan 14, 2021
1 parent 395c3e7 commit be6906a
Showing 1 changed file with 19 additions and 34 deletions.
53 changes: 19 additions & 34 deletions src/XrdEc/XrdEcStrmWriter.hh
Expand Up @@ -395,27 +395,26 @@ namespace XrdEc
// Find a server where we can append the next data chunk
//-------------------------------------------------------------------
XrdCl::Ctx<XrdCl::ZipArchive> zip;
XrdCl::XRootDStatus st;
do
size_t srvid;
if( !servers->dequeue( srvid ) )
{
size_t srvid;
if( !servers->dequeue( srvid ) )
{
XrdCl::XRootDStatus err( XrdCl::stError, XrdCl::errNoMoreReplicas,
0, "No more data servers to try." );
global_status.report_wrt( err, strpsize );
return;
}

zip = *dataarchs[srvid];
st = zip->OpenFile( fn, XrdCl::OpenFlags::New, strpsize, crc32c );
XrdCl::XRootDStatus err( XrdCl::stError, XrdCl::errNoMoreReplicas,
0, "No more data servers to try." );
//-----------------------------------------------------------------
// calculate the full block size, otherwise the user handler
// will be never called
//-----------------------------------------------------------------
for( size_t i = strpnb + 1; i < objcfg.nbdata; ++i )
blksize += wrtbuff->GetStrpSize( i );
global_status.report_wrt( err, blksize );
return;
}
while( !st.IsOK() );
zip = *dataarchs[srvid];

//-------------------------------------------------------------------
// Create the Write request
//-------------------------------------------------------------------
XrdCl::Pipeline p = XrdCl::Write( zip, strpsize, strpbuff ) >>
XrdCl::Pipeline p = XrdCl::Write( zip, strpsize, strpbuff ) >> // TODO replace Write with WriteTo
[=]( XrdCl::XRootDStatus &st ) mutable
{
//---------------------------------------------
Expand All @@ -424,32 +423,18 @@ namespace XrdEc
if( !st.IsOK() )
{
//-------------------------------------------
// First clean up the ZipArchive object
// Select another server
//-------------------------------------------
zip->CloseFile();
//-------------------------------------------
// Then select another server
//-------------------------------------------
XrdCl::XRootDStatus status;
do
{
size_t srvid;
if( !servers->dequeue( srvid ) ) return; // if there are no more servers we simply fail
zip = *dataarchs[srvid];
st = zip->OpenFile( fn, XrdCl::OpenFlags::New, strpsize, crc32c );
} while( !status.IsOK() );
size_t srvid;
if( !servers->dequeue( srvid ) ) return; // if there are no more servers we simply fail
zip = *dataarchs[srvid];
//-------------------------------------------
// Retry this operation at different server
//-------------------------------------------
XrdCl::Pipeline::Repeat();
}
}
| XrdCl::Final(
[=]( const XrdCl::XRootDStatus &st ) mutable
{
zip->CloseFile();
wrtbuff.reset();
} );
| XrdCl::Final( [wrtbuff]( const XrdCl::XRootDStatus &st ){ } );
writes.emplace_back( std::move( p ) );
}

Expand Down

0 comments on commit be6906a

Please sign in to comment.