Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds decompressor call to read path #9

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/compressor/Compressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class Compressor {
virtual ~Compressor() {}
virtual int compress(const bufferlist &in, bufferlist &out) = 0;
virtual int decompress(const bufferlist &in, bufferlist &out) = 0;
virtual int decompress(bufferlist::iterator &p, bufferlist &out) = 0; //that's a bit weird but we need non-const iterator to be in alignment with decode methods

static CompressorRef create(CephContext *cct, const string &type);
};
Expand Down
1 change: 1 addition & 0 deletions src/compressor/snappy/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ noinst_HEADERS += \
compressor/snappy/SnappyCompressor.h

snappy_sources = \
common/buffer.cc \
compressor/Compressor.cc \
compressor/snappy/CompressionPluginSnappy.cc

Expand Down
46 changes: 24 additions & 22 deletions src/compressor/snappy/SnappyCompressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,57 +20,59 @@
#include "include/buffer.h"
#include "compressor/Compressor.h"

class BufferlistSource : public snappy::Source {
list<bufferptr>::const_iterator pb;
size_t pb_off;
size_t left;
class CEPH_BUFFER_DETAILS BufferlistSource : public snappy::Source {

bufferlist::iterator pb;

public:
explicit BufferlistSource(const bufferlist &data): pb(data.buffers().begin()), pb_off(0), left(data.length()) {}
explicit BufferlistSource(bufferlist::iterator _pb): pb(_pb) {}
virtual ~BufferlistSource() {}
virtual size_t Available() const { return left; }
virtual size_t Available() const { return pb.get_remaining(); }
virtual const char* Peek(size_t* len) {
if (left) {
*len = pb->length() - pb_off;
return pb->c_str() + pb_off;
} else {
*len = 0;
return NULL;
const char* data = NULL;
*len = 0;
if(pb.get_remaining()) {
auto ptmp = pb;
*len = ptmp.get_ptr_and_advance(pb.get_remaining(), &data);
}
return data;
}
virtual void Skip(size_t n) {
if (n + pb_off == pb->length()) {
++pb;
pb_off = 0;
} else {
pb_off += n;
}
left -= n;
pb.advance(n);
}
bufferlist::iterator get_pos() const { return pb; }
};

class SnappyCompressor : public Compressor {


public:
virtual ~SnappyCompressor() {}
virtual const char* get_method_name() { return "snappy"; }
virtual int compress(const bufferlist &src, bufferlist &dst) {
BufferlistSource source(src);
BufferlistSource source(const_cast<bufferlist&>(src).begin());
bufferptr ptr(snappy::MaxCompressedLength(src.length()));
snappy::UncheckedByteArraySink sink(ptr.c_str());
snappy::Compress(&source, &sink);
dst.append(ptr, 0, sink.CurrentDestination()-ptr.c_str());
return 0;
}
virtual int decompress(const bufferlist &src, bufferlist &dst) {
bufferlist::iterator i = const_cast<bufferlist&>(src).begin();
return decompress(i, dst);
}
virtual int decompress(bufferlist::iterator &p, bufferlist &dst) {
size_t res_len = 0;
// Trick, decompress only need first 32bits buffer
bufferlist::const_iterator ptmp = p;
bufferlist tmp;
tmp.substr_of( src, 0, 4 );
ptmp.copy(4, tmp);
if (!snappy::GetUncompressedLength(tmp.c_str(), tmp.length(), &res_len))
return -1;
BufferlistSource source(src);
BufferlistSource source(p);
bufferptr ptr(res_len);
if (snappy::RawUncompress(&source, ptr.c_str())) {
p = source.get_pos();
dst.append(ptr);
return 0;
}
Expand Down
18 changes: 11 additions & 7 deletions src/compressor/zlib/CompressionZlib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ int CompressionZlib::compress(const bufferlist &in, bufferlist &out)
return 0;
}

int CompressionZlib::decompress(const bufferlist &in, bufferlist &out)
int CompressionZlib::decompress(bufferlist::iterator &p, bufferlist &out)
{
int ret;
unsigned have;
z_stream strm;
unsigned char* c_in;
const char* c_in;

/* allocate inflate state */
strm.zalloc = Z_NULL;
Expand All @@ -121,14 +121,12 @@ int CompressionZlib::decompress(const bufferlist &in, bufferlist &out)

unsigned char c_out[max_len];

for (std::list<buffer::ptr>::const_iterator i = in.buffers().begin();
i != in.buffers().end(); ++i) {
while(!p.end()) {

c_in = (unsigned char*) (*i).c_str();
long unsigned int len = (*i).length();
long unsigned int len = p.get_ptr_and_advance(p.get_remaining(), &c_in);

strm.avail_in = len;
strm.next_in = c_in;
strm.next_in = (unsigned char*)c_in;

do {
strm.avail_out = max_len;
Expand All @@ -150,3 +148,9 @@ int CompressionZlib::decompress(const bufferlist &in, bufferlist &out)
(void)inflateEnd(&strm);
return 0;
}

int CompressionZlib::decompress(const bufferlist &in, bufferlist &out)
{
bufferlist::iterator i = const_cast<bufferlist&>(in).begin();
return decompress(i, out);
}
1 change: 1 addition & 0 deletions src/compressor/zlib/CompressionZlib.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class CompressionZlib : public Compressor {

virtual int compress(const bufferlist &in, bufferlist &out);
virtual int decompress(const bufferlist &in, bufferlist &out);
virtual int decompress(bufferlist::iterator &p, bufferlist &out);
virtual const char* get_method_name();

};
Expand Down
1 change: 1 addition & 0 deletions src/compressor/zlib/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ noinst_HEADERS += \
compressor/zlib/CompressionZlib.h

zlib_sources = \
common/buffer.cc \
compressor/Compressor.cc \
compressor/zlib/CompressionPluginZlib.cc \
compressor/zlib/CompressionZlib.cc
Expand Down
22 changes: 19 additions & 3 deletions src/os/bluestore/BlueStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "BlueFS.h"
#include "BlueRocksEnv.h"
#include "Checksummer.h"
#include "compressor/Compressor.h"

#define dout_subsys ceph_subsys_bluestore

Expand Down Expand Up @@ -2974,11 +2975,24 @@ int BlueStore::_verify_csum(const bluestore_blob_t* blob, uint64_t blob_xoffset,
return 0;
}

int BlueStore::_decompress(const bufferlist& source, bufferlist* result)
int BlueStore::_decompress(bufferlist& source, bufferlist* result)
{
int r = 0;
//FIXME: just a stub, need to be implemented!
result->append(source);
bufferlist::iterator i = source.begin();
bluestore_compression_header_t chdr;
::decode(chdr, i);
CompressorRef compressor = Compressor::create(cct, chdr.type);
if (!compressor.get()) {
// if compressor isn't available - error, because cannot return decompressed data?
derr << __func__ << " can't load decompressor " << chdr.type << dendl;
r = -EIO;
} else {
r = compressor->decompress(i, *result);
if (r < 0) {
derr << __func__ << " decompression failed with exit code " << r << dendl;
r = -EIO;
}
}
return r;
}

Expand Down Expand Up @@ -5212,6 +5226,8 @@ void BlueStore::_do_write_small(
b->length = MAX(b->length, b_off + b_len - tail_pad);
int64_t blob = ep->second.blob;
o->onode.punch_hole(offset, length, &wctx->lex_old);
dout(20) << __func__ << " lexold 0x" << std::hex << offset << std::dec
<< ": " << ep->second << dendl;
bluestore_lextent_t& lex = o->onode.extent_map[offset] =
bluestore_lextent_t(blob, b_off + head_pad, length, 0);
b->ref_map.get(lex.offset, lex.length);
Expand Down
14 changes: 13 additions & 1 deletion src/os/bluestore/BlueStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,19 @@ class BlueStore : public ObjectStore,
extents2read_t* result);

int _verify_csum(const bluestore_blob_t* blob, uint64_t blob_xoffset, const bufferlist& bl) const;
int _decompress(const bufferlist& source, bufferlist* result);
int _decompress(bufferlist& source, bufferlist* result);

// --------------------------------------------------------
// compressed block header
struct CompressionHeader {
std::string type;
CompressionHeader(const std::string& _type)
: type(_type) {}

void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& p);
};
WRITE_CLASS_ENCODER(bluestore_bdev_label_t)


// --------------------------------------------------------
Expand Down
14 changes: 14 additions & 0 deletions src/os/bluestore/bluestore_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -822,3 +822,17 @@ void bluestore_wal_transaction_t::generate_test_instances(list<bluestore_wal_tra
o.back()->ops.back().extents.push_back(bluestore_pextent_t(1,7));
o.back()->ops.back().data.append("foodata");
}

void bluestore_compression_header_t::encode(bufferlist& bl) const
{
ENCODE_START(1, 1, bl);
::encode(type, bl);
ENCODE_FINISH(bl);
}

void bluestore_compression_header_t::decode(bufferlist::iterator& p)
{
DECODE_START(1, p);
::decode(type, p);
DECODE_FINISH(p);
}
12 changes: 12 additions & 0 deletions src/os/bluestore/bluestore_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -621,4 +621,16 @@ struct bluestore_wal_transaction_t {
};
WRITE_CLASS_ENCODER(bluestore_wal_transaction_t)

struct bluestore_compression_header_t {
std::string type;
bluestore_compression_header_t() {}
bluestore_compression_header_t(const std::string& _type)
: type(_type) {}

void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& p);
};
WRITE_CLASS_ENCODER(bluestore_compression_header_t)


#endif
1 change: 1 addition & 0 deletions src/test/compressor/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ if WITH_OSD


libceph_example_la_SOURCES = \
common/buffer.cc \
compressor/Compressor.cc \
test/compressor/compressor_plugin_example.cc
noinst_HEADERS += test/compressor/compressor_example.h
Expand Down
6 changes: 5 additions & 1 deletion src/test/compressor/compressor_example.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ class CompressorExample : public Compressor {
out = in;
return 0;
}

virtual int decompress(bufferlist::iterator &p, bufferlist &out)
{
p.copy(p.get_remaining(), out);
return 0;
}
virtual const char* get_method_name()
{
return "example";
Expand Down
1 change: 1 addition & 0 deletions src/test/encoding/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ TYPE(SequencerPosition)

#include "os/bluestore/bluestore_types.h"
TYPE(bluestore_cnode_t)
TYPE(bluestore_compression_header_t)
TYPE(bluestore_extent_t)
TYPE(bluestore_extent_ref_map_t)
TYPE(bluestore_overlay_t)
Expand Down