Skip to content

Commit

Permalink
[XrdEc] Small refactoring.
Browse files Browse the repository at this point in the history
- call user handler in the thread-pool
- call pending reads for recovered stripes
  • Loading branch information
simonmichal committed Jan 7, 2021
1 parent 66bbe62 commit 679fe7d
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 26 deletions.
47 changes: 23 additions & 24 deletions src/XrdEc/XrdEcReader.hh
Expand Up @@ -9,6 +9,7 @@
#define SRC_XRDEC_XRDECREADER_HH_

#include "XrdEc/XrdEcObjCfg.hh"
#include "XrdEc/XrdEcScheduleHandler.hh"

#include "XrdCl/XrdClFileOperations.hh"

Expand Down Expand Up @@ -210,6 +211,7 @@ namespace XrdEc
case Valid: ++validcnt; break;
case Loading: ++loadingcnt; break;
case Recovering: ++recoveringcnt; break;
default: ;
}
} );
//---------------------------------------------------------------------
Expand Down Expand Up @@ -238,9 +240,14 @@ namespace XrdEc
}
//-------------------------------------------------------------------
// Now when we recovered the data we need to mark every stripe as
// valid.
// valid and execute the pending reads
//-------------------------------------------------------------------
std::for_each( state.begin(), state.end(), []( state_t &s ){ s = Valid; } );
for( size_t strpid = 0; strpid < objcfg.nbchunks; ++strpid )
{
if( state[strpid] != Recovering ) continue;
state[strpid] = Valid;
carryout( pending[strpid], stripes[strpid] );
}
return true;
}
//---------------------------------------------------------------------
Expand Down Expand Up @@ -317,18 +324,18 @@ namespace XrdEc
const buffer_t &stripe,
const XrdCl::XRootDStatus &st = XrdCl::XRootDStatus() )
{
auto itr = pending.begin();
//---------------------------------------------------------------
//---------------------------------------------------------------------
// Iterate over all pending read operations for given stripe
//---------------------------------------------------------------
//---------------------------------------------------------------------
auto itr = pending.begin();
for( ; itr != pending.end() ; ++itr )
{
auto &args = *itr;
callback_t &callback = std::get<3>( args );
uint32_t nbrd = 0; // number of bytes read
//-------------------------------------------------------------
//-------------------------------------------------------------------
// If the read was successful, copy the data to user buffer
//-------------------------------------------------------------
//-------------------------------------------------------------------
if( st.IsOK() )
{
uint64_t offset = std::get<0>( args );
Expand All @@ -339,11 +346,15 @@ namespace XrdEc
memcpy( usrbuff, stripe.data() + offset, size );
nbrd = size;
}
//-------------------------------------------------------------
//-------------------------------------------------------------------
// Call the user callback
//-------------------------------------------------------------
//-------------------------------------------------------------------
callback( st, nbrd );
}
//---------------------------------------------------------------------
// Now we can clear the pending reads
//---------------------------------------------------------------------
pending.clear();
}

Reader &reader;
Expand Down Expand Up @@ -432,9 +443,7 @@ namespace XrdEc
//-----------------------------------------------------------------
if( !st.IsOK() )
{
// TODO first check if we can recover using EC
XrdCl::ResponseHandler *h = std::get<4>( *rdctx );
if( h ) h->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr );
ScheduleHandler( std::get<4>( *rdctx ), XrdCl::XRootDStatus( st ) );
return;
}
//-----------------------------------------------------------------
Expand All @@ -443,18 +452,8 @@ namespace XrdEc
std::get<1>( *rdctx ) += nbrd; // number of bytes read
std::get<3>( *rdctx ) -= rdsize; // number of bytes requested
if( std::get<3>( *rdctx ) == 0 )
{
XrdCl::ResponseHandler *handler = std::get<4>( *rdctx );
if( handler )
{
XrdCl::ChunkInfo *ch = new XrdCl::ChunkInfo( std::get<0>( *rdctx ),
std::get<1>( *rdctx ),
std::get<2>( *rdctx ) );
XrdCl::AnyObject *rsp = new XrdCl::AnyObject();
rsp->Set( ch );
handler->HandleResponse( new XrdCl::XRootDStatus(), rsp );
}
}
ScheduleHandler( std::get<0>( *rdctx ), std::get<1>( *rdctx ),
std::get<2>( *rdctx ), std::get<4>( *rdctx ) );
};
//-------------------------------------------------------------------
// Read data from a stripe
Expand Down
2 changes: 1 addition & 1 deletion src/XrdEc/XrdEcScheduleHandler.cc
Expand Up @@ -35,7 +35,7 @@ namespace XrdEc
XrdCl::AnyObject *pResponse;
};

void ScheduleHandler( uint64_t offset, uint32_t size, char *buffer, XrdCl::ResponseHandler *handler )
void ScheduleHandler( uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler )
{
if( !handler ) return;

Expand Down
2 changes: 1 addition & 1 deletion src/XrdEc/XrdEcScheduleHandler.hh
Expand Up @@ -14,7 +14,7 @@

namespace XrdEc
{
void ScheduleHandler( uint64_t offset, uint32_t size, char *buffer, XrdCl::ResponseHandler *handler );
void ScheduleHandler( uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler );

void ScheduleHandler( XrdCl::ResponseHandler *handler, const XrdCl::XRootDStatus &st = XrdCl::XRootDStatus() );
}
Expand Down

0 comments on commit 679fe7d

Please sign in to comment.