Skip to content

Commit

Permalink
[XrdCl] Extreme copy implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Jan 9, 2017
1 parent 768b872 commit 817ee97
Show file tree
Hide file tree
Showing 8 changed files with 1,106 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/XrdCl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ add_library(
XrdClMetalinkRedirector.cc XrdClMetalinkRedirector.hh
XrdClRedirectorRegistry.cc XrdClRedirectorRegistry.hh
XrdClZipArchiveReader.cc XrdClZipArchiveReader.hh
XrdClXCpCtx.cc XrdClXCpCtx.hh
XrdClXCpSrc.cc XrdClXCpSrc.hh
)

target_link_libraries(
Expand Down
76 changes: 76 additions & 0 deletions src/XrdCl/XrdClClassicCopyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "XrdCl/XrdClUglyHacks.hh"
#include "XrdCl/XrdClRedirectorRegistry.hh"
#include "XrdCl/XrdClZipArchiveReader.hh"
#include "XrdCl/XrdClXCpCtx.hh"

#include <memory>
#include <iostream>
Expand Down Expand Up @@ -1078,6 +1079,72 @@ namespace
bool pDone;
};

//----------------------------------------------------------------------------
//! XRootDSourceDynamic
//----------------------------------------------------------------------------
class XRootDSourceXCp: public Source
{
public:
//------------------------------------------------------------------------
//! Constructor
//------------------------------------------------------------------------
XRootDSourceXCp( const std::vector<XrdCl::URL*> &pUrls, uint64_t blockSize,
uint64_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks ):
pXCpCtx( pUrls, blockSize, parallelSrc, chunkSize, parallelChunks )
{
}

//------------------------------------------------------------------------
//! Initialize the source
//------------------------------------------------------------------------
virtual XrdCl::XRootDStatus Initialize()
{
return pXCpCtx.Initialize();
}

//------------------------------------------------------------------------
//! Get size
//------------------------------------------------------------------------
virtual int64_t GetSize()
{
return pXCpCtx.GetSize();
}

//------------------------------------------------------------------------
//! Get a data chunk from the source
//!
//! @param buffer buffer for the data
//! @param ci chunk information
//! @return status of the operation
//! suContinue - there are some chunks left
//! suDone - no chunks left
//------------------------------------------------------------------------
virtual XrdCl::XRootDStatus GetChunk( XrdCl::ChunkInfo &ci )
{
XrdCl::XRootDStatus st;
do
{
st = pXCpCtx.GetChunk( ci );
}
while( st.IsOK() && st.code == XrdCl::suRetry );
return st;
}

//------------------------------------------------------------------------
// Get check sum
//------------------------------------------------------------------------
virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
std::string &checkSumType )
{
// TODO
return XrdCl::XRootDStatus();
}

private:

XrdCl::XCpCtx pXCpCtx;
};

//----------------------------------------------------------------------------
//! Local destination
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -1648,6 +1715,15 @@ namespace XrdCl
src.reset( new XRootDSource( &GetSource(), chunkSize, parallelChunks ) );
}

// TODO just testing begin
std::vector<URL*> urls;
urls.push_back( new URL( "root://slc6-test//tmp/file.txt" ) );
urls.push_back( new URL( "root://radosfs-test//tmp/file.txt" ) );
uint64_t blockSize = 1024 * 1024 * 1024;
uint64_t parallelSrc = 2;
src.reset( new XRootDSourceXCp( urls, blockSize, parallelSrc, chunkSize, parallelChunks ) );
// TODO just testing end

XRootDStatus st = src->Initialize();
if( !st.IsOK() ) return st;

Expand Down
70 changes: 70 additions & 0 deletions src/XrdCl/XrdClSharedQueue.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* XrdClSharedQueue.hh
*
* Created on: Nov 22, 2016
* Author: simonm
*/

#ifndef SRC_XRDCL_XRDCLSHAREDQUEUE_HH_
#define SRC_XRDCL_XRDCLSHAREDQUEUE_HH_

#include "XrdCl/XrdClSyncQueue.hh"

namespace XrdCl
{

template <typename Item>
class SharedQueue : public SyncQueue<Item*>
{
public:

typedef void (*Dealloc_t)( Item* );

SharedQueue( Dealloc_t dealloc = 0 ) : Dealloc( dealloc ), pRefCount( 1 ) { }

void Delete()
{
XrdSysMutexHelper lck( this->pMutex );
--pRefCount;
if( !pRefCount )
delete this;
}

SharedQueue* Self()
{
XrdSysMutexHelper lck( this->pMutex );
++pRefCount;
return this;
}

bool Empty()
{
XrdSysMutexHelper lck( this->pMutex );
return this->pQueue.empty();
}

private:

~SharedQueue()
{
if( !Dealloc ) return;

while( this->pQueue.empty() )
{
Item *item = this->pQueue.front();
this->pQueue.pop();
Dealloc( item );
}
}

Dealloc_t Dealloc;

// mutable XrdSysMutex pMutex;
uint8_t pRefCount;
};

}



#endif /* SRC_XRDCL_XRDCLSHAREDQUEUE_HH_ */
2 changes: 1 addition & 1 deletion src/XrdCl/XrdClSyncQueue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ namespace XrdCl
pSem = new Semaphore(0);
}

private:
protected:
std::queue<Item> pQueue;
XrdSysMutex pMutex;
Semaphore *pSem;
Expand Down
Loading

0 comments on commit 817ee97

Please sign in to comment.