Skip to content

Commit

Permalink
[XrdCl] Extreme copy implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Oct 14, 2016
1 parent 768b872 commit 8022290
Show file tree
Hide file tree
Showing 2 changed files with 682 additions and 0 deletions.
313 changes: 313 additions & 0 deletions src/XrdCl/XrdClMultiSrcCpCtx.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
/*
* XrdClSegemntedFileTransferCtx.cc
*
* Created on: Sep 15, 2016
* Author: simonm
*/

#include "XrdClMultiSrcCpCtx.hh"

namespace XrdCl
{

void MultiSrcCpCtx::ChunkHandler::HandleResponse( XRootDStatus *status, AnyObject *response )
{
ChunkInfo *chunk = 0;
if( response ) // get the response
{
response->Get( chunk );
response->Set( ( int* )0 );
delete response;
}

if( !chunk && status->IsOK() ) // if the response is not there make sure the status is error
{
*status = XRootDStatus( stError, errInternal );
}

if( !chunk ) // make sure there is a chunk (so we now which one failed)
{
chunk = new ChunkInfo( pOffset, pSize, pBuffer );
}

pCtx->Produce( status, chunk );

delete this;
}

XRootDStatus MultiSrcCpCtx::SourceCtx::Initialize()
{
Log *log = DefaultEnv::GetLog();
log->Debug( UtilityMsg, "Opening %s for reading", pUrl->GetURL().c_str() );

std::string value;
DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
pFile->SetProperty( "ReadRecovery", value );

XRootDStatus st = pFile->Open( pUrl->GetURL(), OpenFlags::Read );
if( !st.IsOK() )
return st;

StatInfo *statInfo;
st = pFile->Stat( false, statInfo );
if( !st.IsOK() )
return st;

pFileSize = statInfo->GetSize();
delete statInfo;

return XRootDStatus();
}

XRootDStatus MultiSrcCpCtx::SourceCtx::ReadChunk( MultiSrcCpCtx *ctx )
{
XrdSysMutexHelper scoped( pSrcMutex );

if( !pFile->IsOpen() )
pStatus = XRootDStatus( stError, errUninitialized );

if( !pStatus.IsOK() ) return pStatus;

// first replay the failures that we
// inherited from another source
while( pOngoing.size() < pParallel && !pStolenFailed.empty() )
{
// pick the first item
std::pair<uint64_t, uint64_t> p = *pStolenFailed.begin();
pStolenFailed.erase( pStolenFailed.begin() );
// and replay it
ReadChunk( ctx, p.first, p.second );
}

Blocks::value_type &blk = pBlocks.front();

// than handle new offsets
while( pOngoing.size() < pParallel && blk.first < blk.second )
{
uint64_t chunkSize = pChunkSize;
if( blk.first + chunkSize > blk.second )
chunkSize = blk.second - blk.first;

ReadChunk( ctx, blk.first, chunkSize );
blk.first += chunkSize;
}

if( blk.first >= blk.second ) pBlocks.pop_front();

if( !pBlocks.empty() ) // there is still work to be done
pStatus.code = suContinue;
else if( !pOngoing.empty() || !pStolenOngoing.empty() ) // there are outstanding read requests
pStatus.code = suPartial;
else // we are done
pStatus.code = suDone;

return pStatus;
}

void MultiSrcCpCtx::SourceCtx::ReadChunk( MultiSrcCpCtx *ctx, uint64_t offset, uint64_t size )
{
pOngoing[offset] = size;
char *buffer = new char[size];
ChunkHandler *handler = new ChunkHandler( ctx, offset, size, buffer );
pStatus = pFile->Read( offset, size, buffer, handler );
if( !pStatus.IsOK() )
{
delete[] buffer;
delete handler;
ReportResult( pStatus, offset, size );
}
}

void MultiSrcCpCtx::SourceCtx::ReportResult( XRootDStatus &status, uint64_t offset, uint64_t size )
{
XrdSysMutexHelper scoped( pSrcMutex );

// check if this chunk was mine or we
// inherited it from a source that failed
bool ongoing = pOngoing.erase( offset );
if( !ongoing ) pStolenOngoing.erase( offset );

if( !status.IsOK() )
{
if( ongoing )
{
pStatus = status;
pFailed[offset] = size;
}
else
pStolenFailed[offset] = size;
}
else
pBytesRead += size;
}

XRootDStatus MultiSrcCpCtx::ReplaceWithSpare( std::vector<SourceCtx*>::iterator itr )
{
while( pSpareUrl < pUrls.size() )
{
SourceCtx *src = new SourceCtx( pUrls[pSpareUrl], pChunkSize, pParallelChunks );
++pSpareUrl;
if( src->Initialize().IsOK() )
{
SourceCtx *old = *itr;
src->StealBlocks( old );
*itr = src;
pFailedSources.push_back( old );
return XRootDStatus();
}
else delete src;
}

return XRootDStatus( stError, errNotFound );
}

XRootDStatus MultiSrcCpCtx::ReadChunk( std::vector<SourceCtx*>::iterator itr )
{
XRootDStatus ret;

// try to issue the read
while( !( ret = (*itr)->ReadChunk( this ) ).IsOK() )
{
XRootDStatus st = ReplaceWithSpare( itr );
if( !st.IsOK() ) return ret;
}
// if everything went OK we return
return ret;
}

void MultiSrcCpCtx::Produce( XRootDStatus *status, ChunkInfo *chunk )
{
// first report the result to the source
// that is responsible for given offset
std::vector<SourceCtx*>::iterator itr = pSources.begin();
for( ; itr != pSources.end(); ++itr )
{
SourceCtx *src = *itr;
if( src->IsMine( chunk->offset ) )
{
src->ReportResult( *status, chunk->offset, chunk->length );
break;
}
}

XrdSysMutexHelper scoped( pCtxMutex );
// always report an error, consumer will decide what to do with it
pChunks.push( std::make_pair( status, chunk ) );
pSemaphore.Post();
}

void MultiSrcCpCtx::Delete( XRootDStatus *status, ChunkInfo *chunk )
{
if( chunk )
delete[] reinterpret_cast<char*>( chunk->buffer );
delete chunk;
delete status;
}

std::pair<XRootDStatus*, ChunkInfo*> MultiSrcCpCtx::Consume()
{
pSemaphore.Wait();
XrdSysMutexHelper scoped( pCtxMutex );
ChunkQueue::value_type p = pChunks.front();
pChunks.pop();
return p;
}

MultiSrcCpCtx::~MultiSrcCpCtx()
{
std::vector<SourceCtx*>::iterator srcItr = pFailedSources.begin();
for( ; srcItr != pFailedSources.end(); ++srcItr )
{
delete *srcItr;
}

srcItr = pSources.begin();
for( ; srcItr != pSources.end(); ++srcItr )
{
delete *srcItr;
}

std::vector<URL*>::iterator urlItr = pUrls.begin();
for( ; urlItr != pUrls.end(); ++urlItr )
{
delete *urlItr;
}
}

XRootDStatus MultiSrcCpCtx::GetChunk( ChunkInfo &ci )
{
uint8_t failed = 0, done = 0;

XRootDStatus *status = 0;
ChunkInfo *chunk = 0;

do
{
if( status || chunk )
Delete( status, chunk );

std::vector<SourceCtx*>::iterator itr = pSources.begin();
for( ; itr != pSources.end(); ++itr )
{
XRootDStatus st = ReadChunk( itr );
if( !st.IsOK() ) ++failed;
else if( st.code == suPartial )
{
AssignBlock( *itr );
}
else if( st.code == suDone )
{
if( AssignBlock( *itr ).code == suAlreadyDone ) ++done;
}
}

if( pParallelSrc == done + failed )
return XRootDStatus();

if( pParallelSrc == failed ) // all sources are broken
return XRootDStatus( stError /*TODO figure out something more specific*/ );

std::pair<XRootDStatus*, ChunkInfo*> p = Consume();
XRootDStatus *status = p.first;
ChunkInfo *chunk = p.second;
}
while( !status->IsOK() );

if( !status->IsOK() )
{
XRootDStatus ret = *status;
Delete( status, chunk );
return ret;
}

ci = *chunk;
delete chunk;
delete status;

return XRootDStatus( stOK, suContinue );
}

XRootDStatus MultiSrcCpCtx::AssignBlock( SourceCtx *src )
{
// there are still some blocks that need downloading
if( pCurrentOffset < pFileSize )
{
uint64_t size = pBlockSize;
if( pCurrentOffset + size > pFileSize )
size = pFileSize - pCurrentOffset;
src->AddBlock( pCurrentOffset, size );
pCurrentOffset += size;
}
// reshuffle the already allocated blocks
else
{
BlockAllocator allocator( pSources );
allocator.Realloc();
}

// TODO: if there is nothing to reshuffle
// we can only reissue outstanding
// read request on other sources
}

} /* namespace XrdCl */

0 comments on commit 8022290

Please sign in to comment.