Skip to content

Commit

Permalink
[XrdCl] Refactor ZipCache so it supports reading chunks in parallel.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Feb 9, 2021
1 parent 45a6f14 commit 928f0d1
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 113 deletions.
108 changes: 41 additions & 67 deletions src/XrdCl/XrdClZipArchive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -526,82 +526,56 @@ namespace XrdCl
// if the entry does not exist, it will be created using
// default constructor
ZipCache &cache = zipcache[fn];

if( relativeOffset > cdfh->uncompressedSize )
{
// we are reading past the end of file,
// we can serve the request right away!
ChunkInfo *ch = new ChunkInfo( relativeOffset, 0, usrbuff );
AnyObject *rsp = new AnyObject();
rsp->Set( ch );
usrHandler->HandleResponse( new XRootDStatus(), rsp );
return XRootDStatus();
}

uint32_t sizereq = size;
if( relativeOffset + size > cdfh->uncompressedSize )
sizereq = cdfh->uncompressedSize - relativeOffset;
cache.QueueReq( relativeOffset, sizereq, usrbuff, usrHandler );

// if we have the whole ZIP archive we can populate the cache
// straight away
if( empty && buffer)
{
XRootDStatus st = cache.Input( buffer.get() + offset, filesize - fileoff, relativeOffset );
if( !st.IsOK() ) return st;
auto begin = buffer.get();
auto end = begin + filesize ;
buffer_t buff( begin, end );
cache.QueueRsp( XRootDStatus(), 0, std::move( buff ) );
return XRootDStatus();
}

XRootDStatus st = cache.Output( usrbuff, size, relativeOffset );

// read from cache
if( !empty || buffer )
// if we don't have the data we need to issue a remote read
if( !buffer )
{
uint32_t bytesRead = 0;
st = cache.Read( bytesRead );
// propagate errors to the end-user
if( !st.IsOK() ) return st;
log->Dump( ZipMsg, "[0x%x] Read %d bytes from ZipCache.", this, bytesRead );
// we have all the data ...
if( st.code == suDone )
{
if( usrHandler )
{
XRootDStatus *st = make_status();
ChunkInfo *ch = new ChunkInfo( relativeOffset, size, usrbuff );
Schedule( usrHandler, st, ch );
}
return XRootDStatus();
}
if( relativeOffset > cdfh->compressedSize ) return XRootDStatus(); // there's nothing to do,
// we already have all the data locally
uint32_t rdsize = size;
if( relativeOffset + size > cdfh->compressedSize )
rdsize = cdfh->compressedSize - relativeOffset;

// now read the data ...
auto rdbuff = std::make_shared<ZipCache::buffer_t>( rdsize );
Pipeline p = XrdCl::Read( archive, offset, rdbuff->size(), rdbuff->data() ) >>
[relativeOffset, rdbuff, &cache, this]( XRootDStatus &st, ChunkInfo &ch )
{
Log *log = DefaultEnv::GetLog();
log->Dump( ZipMsg, "[0x%x] Read %d bytes of remote data at offset %d.",
this, ch.length, ch.offset );
cache.QueueRsp( st, relativeOffset, std::move( *rdbuff ) );
};
Async( std::move( p ), timeout );
}

// the raw offset of the next chunk within the file
uint64_t rawOffset = cache.NextChunkOffset();
// if this is the first time we are setting an input chunk
// use the user-specified offset
if( !rawOffset )
rawOffset = relativeOffset;
// size of the next chunk of raw (compressed) data
uint32_t chunkSize = size;
// make sure we are not reading passed the end of the file
if( rawOffset + chunkSize > filesize )
chunkSize = filesize - rawOffset;
// allocate the buffer for the compressed data
buffer.reset( new char[chunkSize] );
Fwd<ChunkInfo> chunk;
Pipeline p = XrdCl::Read( archive, fileoff + rawOffset, chunkSize, buffer.get() ) >>
[=, &cache]( XRootDStatus &st, ChunkInfo &ch )
{
if( !st.IsOK() ) return;
log->Dump( ZipMsg, "[0x%x] Read %d bytes of remote data at offset %d.",
this, ch.length, ch.offset );

st = cache.Input( ch.buffer, ch.length, rawOffset );
if( !st.IsOK() ) Pipeline::Stop( st );

// at this point we can be sure that all the needed data are in the cache
// (we requested as much data as the user asked for so in the worst case
// we have exactly as much data as the user needs, most likely we have
// more because the data are compressed)
uint32_t bytesRead = 0;
st = cache.Read( bytesRead );
if( !st.IsOK() ) Pipeline::Stop( st );

// forward server response to the final operation
chunk->buffer = usrbuff;
chunk->length = size;
chunk->offset = relativeOffset;
}
| XrdCl::Final( [=]( const XRootDStatus &st ) mutable
{
buffer.reset();
AnyObject *rsp = nullptr;
if( st.IsOK() ) rsp = PkgRsp( new ChunkInfo( *chunk ) );
if( usrHandler ) usrHandler->HandleResponse( make_status( st ), rsp );
} );
Async( std::move( p ), timeout );
return XRootDStatus();
}

Expand Down
208 changes: 162 additions & 46 deletions src/XrdCl/XrdClZipCache.hh
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
/*
* XrdZipInfltCache.hh
*
* Created on: 10 Nov 2020
* Author: simonm
*/
//------------------------------------------------------------------------------
// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
// Author: Michal Simon <michal.simon@cern.ch>
//------------------------------------------------------------------------------
// This file is part of the XRootD software suite.
//
// XRootD is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// XRootD is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
//
// In applying this licence, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
//------------------------------------------------------------------------------

#ifndef SRC_XRDZIP_XRDZIPINFLCACHE_HH_
#define SRC_XRDZIP_XRDZIPINFLCACHE_HH_
Expand All @@ -12,6 +29,10 @@
#include <zlib.h>
#include <exception>
#include <string>
#include <vector>
#include <mutex>
#include <queue>
#include <tuple>

namespace XrdCl
{
Expand All @@ -34,13 +55,53 @@ namespace XrdCl
{
public:

ZipCache() : rawOffset( 0 ), rawSize( 0 ), totalRead( 0 )
typedef std::vector<char> buffer_t;

private:

typedef std::tuple<uint64_t, uint32_t, void*, ResponseHandler*> read_args_t;
typedef std::tuple<XRootDStatus, uint64_t, buffer_t> read_resp_t;

struct greater_read_resp_t
{
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
strm.avail_in = 0;
strm.next_in = Z_NULL;
inline bool operator() ( const read_resp_t &lhs, const read_resp_t &rhs ) const
{
return std::get<1>( lhs ) > std::get<1>( rhs );
}
};

typedef std::priority_queue<read_resp_t, std::vector<read_resp_t>, greater_read_resp_t> resp_queue_t;

public:

struct ReadHandler : public ResponseHandler // TODO once we drop ZipArchiveReader this class can be removed
{
ReadHandler( uint64_t offset, uint32_t length, ZipCache &self ) : offset( offset ), buffer( length ), self( self )
{
}

void HandleResponse( XRootDStatus *status, AnyObject *response )
{
self.QueueRsp( *status, offset, std::move( buffer ) );
delete status;
delete response;
delete this;
}

uint64_t offset;
buffer_t buffer;
ZipCache &self;
};

ZipCache() : inabsoff( 0 )
{
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
strm.avail_in = 0;
strm.next_in = Z_NULL;
strm.avail_out = 0;
strm.next_out = Z_NULL;

// make sure zlib doesn't look for gzip headers, in order to do so
// pass negative window bits !!!
Expand All @@ -54,54 +115,107 @@ namespace XrdCl
inflateEnd( &strm );
}

XrdCl::XRootDStatus Input( void *inbuff, size_t insize, uint64_t rawoff )
inline void QueueReq( uint64_t offset, uint32_t length, void *buffer, ResponseHandler *handler )
{
// we only support streaming for compressed files
if( rawoff != rawOffset + rawSize )
return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal );

strm.avail_in = insize;
strm.next_in = (Bytef*)inbuff;
rawOffset = rawoff;
rawSize = insize;

return XrdCl::XRootDStatus();
std::unique_lock<std::mutex> lck( mtx );
rdreqs.emplace( offset, length, buffer, handler );
Decompress();
}

XrdCl::XRootDStatus Output( void *outbuff, size_t outsize, uint64_t offset )
inline void QueueRsp( const XRootDStatus &st, uint64_t offset, buffer_t &&buffer )
{
// we only support streaming for compressed files
if( offset != totalRead )
return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal );
std::unique_lock<std::mutex> lck( mtx );
rdrsps.emplace( st, offset, std::move( buffer ) );
Decompress();
}

private:

strm.avail_out = outsize;
strm.next_out = (Bytef*)outbuff;
inline bool HasInput() const
{
return strm.avail_in != 0;
}

return XrdCl::XRootDStatus();
inline bool HasOutput() const
{
return strm.avail_out != 0;
}

XrdCl::XRootDStatus Read( uint32_t &bytesRead )
inline void Input( const read_resp_t &rdrsp )
{
// the available space in output buffer before inflating
uInt avail_before = strm.avail_out;
const buffer_t &buffer = std::get<2>( rdrsp );
strm.avail_in = buffer.size();
strm.next_in = (Bytef*)buffer.data();
}

int rc = inflate( &strm, Z_SYNC_FLUSH );
XrdCl::XRootDStatus st = ToXRootDStatus( rc, "inflate" );
if( !st.IsOK() ) return st;
inline void Output( const read_args_t &rdreq )
{
strm.avail_out = std::get<1>( rdreq );
strm.next_out = (Bytef*)std::get<2>( rdreq );
}

bytesRead = avail_before - strm.avail_out;
totalRead += bytesRead;
if( strm.avail_out ) return XrdCl::XRootDStatus( XrdCl::stOK, XrdCl::suPartial );
inline bool Consecutive( const read_resp_t &resp ) const
{
return ( std::get<1>( resp ) == inabsoff );
}

return XrdCl::XRootDStatus();
void Decompress()
{
while( HasInput() || HasOutput() || !rdreqs.empty() || !rdrsps.empty() )
{
if( !HasOutput() && !rdreqs.empty() )
Output( rdreqs.front() );

if( !HasInput() && !rdrsps.empty() && Consecutive( rdrsps.top() ) ) // the response might come out of order so we need to check the offset
Input( rdrsps.top() );

if( !HasInput() || !HasOutput() ) return;

// check the response status
XRootDStatus st = std::get<0>( rdrsps.top() );
if( !st.IsOK() ) return CallHandler( st );

// the available space in output buffer before inflating
uInt avail_before = strm.avail_in;
// decompress the data
int rc = inflate( &strm, Z_SYNC_FLUSH );
st = ToXRootDStatus( rc, "inflate" );
if( !st.IsOK() ) return CallHandler( st ); // report error to user handler
// update the absolute input offset by the number of bytes we consumed
inabsoff += avail_before - strm.avail_in;

if( !strm.avail_out ) // the output buffer is empty meaning a request has been fulfilled
CallHandler( XRootDStatus() );

// the input buffer is empty meaning a response has been consumed
// (we need to check if there are any elements in the responses
// queue as the input buffer might have been set directly by the user)
if( !strm.avail_in && !rdrsps.empty() )
rdrsps.pop();
}
}

uint64_t NextChunkOffset()
static inline AnyObject* PkgRsp( ChunkInfo *chunk )
{
return rawOffset + rawSize;
if( !chunk ) return nullptr;
AnyObject *rsp = new AnyObject();
rsp->Set( chunk );
return rsp;
}

private:
inline void CallHandler( const XRootDStatus &st )
{
read_args_t args = std::move( rdreqs.front() );
rdreqs.pop();

ChunkInfo *chunk = nullptr;
if( st.IsOK() ) chunk = new ChunkInfo( std::get<0>( args ),
std::get<1>( args ),
std::get<2>( args ) );

ResponseHandler *handler = std::get<3>( args );
handler->HandleResponse( new XRootDStatus( st ), PkgRsp( chunk ) );
}

XrdCl::XRootDStatus ToXRootDStatus( int rc, const std::string &func )
{
Expand All @@ -121,10 +235,12 @@ namespace XrdCl
}
}

uint64_t rawOffset; // offset of the raw data chunk in the compressed file (not archive)
uint32_t rawSize; // size of the raw data chunk
uint64_t totalRead; // total number of bytes read so far
z_stream strm; // the zlib stream we will use for reading

std::mutex mtx;
uint64_t inabsoff; //< the absolute offset in the input file (compressed), ensures the user is actually streaming the data
std::queue<read_args_t> rdreqs; //< pending read requests (we only allow read requests to be submitted in order)
resp_queue_t rdrsps; //< pending read responses (due to multiple-streams the read response may come out of order)
};

}
Expand Down

0 comments on commit 928f0d1

Please sign in to comment.