diff --git a/src/XrdEc/XrdEcReader.hh b/src/XrdEc/XrdEcReader.hh index 335068ca66b..091112f186a 100644 --- a/src/XrdEc/XrdEcReader.hh +++ b/src/XrdEc/XrdEcReader.hh @@ -9,6 +9,7 @@ #define SRC_XRDEC_XRDECREADER_HH_ #include "XrdEc/XrdEcObjCfg.hh" +#include "XrdEc/XrdEcScheduleHandler.hh" #include "XrdCl/XrdClFileOperations.hh" @@ -210,6 +211,7 @@ namespace XrdEc case Valid: ++validcnt; break; case Loading: ++loadingcnt; break; case Recovering: ++recoveringcnt; break; + default: ; } } ); //--------------------------------------------------------------------- @@ -238,9 +240,14 @@ namespace XrdEc } //------------------------------------------------------------------- // Now when we recovered the data we need to mark every stripe as - // valid. + // valid and execute the pending reads //------------------------------------------------------------------- - std::for_each( state.begin(), state.end(), []( state_t &s ){ s = Valid; } ); + for( size_t strpid = 0; strpid < objcfg.nbchunks; ++strpid ) + { + if( state[strpid] != Recovering ) continue; + state[strpid] = Valid; + carryout( pending[strpid], stripes[strpid] ); + } return true; } //--------------------------------------------------------------------- @@ -317,18 +324,18 @@ namespace XrdEc const buffer_t &stripe, const XrdCl::XRootDStatus &st = XrdCl::XRootDStatus() ) { - auto itr = pending.begin(); - //--------------------------------------------------------------- + //--------------------------------------------------------------------- // Iterate over all pending read operations for given stripe - //--------------------------------------------------------------- + //--------------------------------------------------------------------- + auto itr = pending.begin(); for( ; itr != pending.end() ; ++itr ) { auto &args = *itr; callback_t &callback = std::get<3>( args ); uint32_t nbrd = 0; // number of bytes read - //------------------------------------------------------------- + //------------------------------------------------------------------- // If the read was successful, copy the data to user buffer - //------------------------------------------------------------- + //------------------------------------------------------------------- if( st.IsOK() ) { uint64_t offset = std::get<0>( args ); @@ -339,11 +346,15 @@ namespace XrdEc memcpy( usrbuff, stripe.data() + offset, size ); nbrd = size; } - //------------------------------------------------------------- + //------------------------------------------------------------------- // Call the user callback - //------------------------------------------------------------- + //------------------------------------------------------------------- callback( st, nbrd ); } + //--------------------------------------------------------------------- + // Now we can clear the pending reads + //--------------------------------------------------------------------- + pending.clear(); } Reader &reader; @@ -432,9 +443,7 @@ namespace XrdEc //----------------------------------------------------------------- if( !st.IsOK() ) { - // TODO first check if we can recover using EC - XrdCl::ResponseHandler *h = std::get<4>( *rdctx ); - if( h ) h->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr ); + ScheduleHandler( std::get<4>( *rdctx ), XrdCl::XRootDStatus( st ) ); return; } //----------------------------------------------------------------- @@ -443,18 +452,8 @@ namespace XrdEc std::get<1>( *rdctx ) += nbrd; // number of bytes read std::get<3>( *rdctx ) -= rdsize; // number of bytes requested if( std::get<3>( *rdctx ) == 0 ) - { - XrdCl::ResponseHandler *handler = std::get<4>( *rdctx ); - if( handler ) - { - XrdCl::ChunkInfo *ch = new XrdCl::ChunkInfo( std::get<0>( *rdctx ), - std::get<1>( *rdctx ), - std::get<2>( *rdctx ) ); - XrdCl::AnyObject *rsp = new XrdCl::AnyObject(); - rsp->Set( ch ); - handler->HandleResponse( new XrdCl::XRootDStatus(), rsp ); - } - } + ScheduleHandler( std::get<0>( *rdctx ), std::get<1>( *rdctx ), + std::get<2>( *rdctx ), std::get<4>( *rdctx ) ); }; //------------------------------------------------------------------- // Read data from a stripe diff --git a/src/XrdEc/XrdEcScheduleHandler.cc b/src/XrdEc/XrdEcScheduleHandler.cc index 9b53a3d118f..42e605b9bc4 100644 --- a/src/XrdEc/XrdEcScheduleHandler.cc +++ b/src/XrdEc/XrdEcScheduleHandler.cc @@ -35,7 +35,7 @@ namespace XrdEc XrdCl::AnyObject *pResponse; }; - void ScheduleHandler( uint64_t offset, uint32_t size, char *buffer, XrdCl::ResponseHandler *handler ) + void ScheduleHandler( uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler ) { if( !handler ) return; diff --git a/src/XrdEc/XrdEcScheduleHandler.hh b/src/XrdEc/XrdEcScheduleHandler.hh index da903f72e04..24eeaadeec6 100644 --- a/src/XrdEc/XrdEcScheduleHandler.hh +++ b/src/XrdEc/XrdEcScheduleHandler.hh @@ -14,7 +14,7 @@ namespace XrdEc { - void ScheduleHandler( uint64_t offset, uint32_t size, char *buffer, XrdCl::ResponseHandler *handler ); + void ScheduleHandler( uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler ); void ScheduleHandler( XrdCl::ResponseHandler *handler, const XrdCl::XRootDStatus &st = XrdCl::XRootDStatus() ); }