Skip to content

Commit

Permalink
journal compression. the write half of the work.
Browse files Browse the repository at this point in the history
  • Loading branch information
dwight committed Aug 2, 2011
1 parent c72fb70 commit c716fa5
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 10 deletions.
1 change: 1 addition & 0 deletions db/dur.cpp
Expand Up @@ -129,6 +129,7 @@ namespace mongo {
"commits" << _commits <<
"journaledMB" << _journaledBytes / 1000000.0 <<
"writeToDataFilesMB" << _writeToDataFilesBytes / 1000000.0 <<
"compression" << _journaledBytes / (_uncompressedBytes+1.0) <<
"commitsInWriteLock" << _commitsInWriteLock <<
"earlyCommits" << _earlyCommits <<
"timeMs" <<
Expand Down
24 changes: 19 additions & 5 deletions db/dur_journal.cpp
Expand Up @@ -34,6 +34,7 @@
#include "dur_journalimpl.h"
#include "../util/file.h"
#include "../util/checksum.h"
#include "../util/compress.h"

using namespace mongoutils;

Expand Down Expand Up @@ -633,17 +634,30 @@ namespace mongo {
void journal(const AlignedBuilder& b) {
j.journal(b);
}
void Journal::journal(const AlignedBuilder& b) {
void Journal::journal(const AlignedBuilder& _b) {
#if defined(_NOCOMPRESS)
unsigned w = _b.len();
const AlignedBuilder& b = _b;
#else
static AlignedBuilder compressed(32*1024*1024);
compressed.reset( maxCompressedLength(_b.len()) );
size_t compressedLength = 0;
rawCompress(_b.buf(), _b.len(), compressed.atOfs(0), &compressedLength);

unsigned w = (compressedLength+8191)&(~8191);
const AlignedBuilder& b = compressed;
#endif

try {
mutex::scoped_lock lk(_curLogFileMutex);

// must already be open -- so that _curFileId is correct for previous buffer building
assert( _curLogFile );

stats.curr->_journaledBytes += b.len();
_written += b.len();
_curLogFile->synchronousAppend((void *) b.buf(), b.len());

stats.curr->_uncompressedBytes += _b.len();
stats.curr->_journaledBytes += w;
_written += w;
_curLogFile->synchronousAppend((void *) b.buf(), w);
_rotate();
}
catch(std::exception& e) {
Expand Down
4 changes: 4 additions & 0 deletions db/dur_journalformat.h
Expand Up @@ -34,7 +34,11 @@ namespace mongo {

// x4142 is asci--readable if you look at the file with head/less -- thus the starting values were near
// that. simply incrementing the version # is safe on a fwd basis.
#if defined(_NOCOMPRESS)
enum { CurrentVersion = 0x4148 };
#else
enum { CurrentVersion = 0x4149 };
#endif
unsigned short _version;

// these are just for diagnostic ease (make header more useful as plain text)
Expand Down
1 change: 1 addition & 0 deletions db/dur_stats.h
Expand Up @@ -20,6 +20,7 @@ namespace mongo {
unsigned _commits;
unsigned _earlyCommits; // count of early commits from commitIfNeeded() or from getDur().commitNow()
unsigned long long _journaledBytes;
unsigned long long _uncompressedBytes;
unsigned long long _writeToDataFilesBytes;

unsigned long long _prepLogBufferMicros;
Expand Down
33 changes: 29 additions & 4 deletions util/alignedbuilder.cpp
Expand Up @@ -32,9 +32,28 @@ namespace mongo {
/** reset for a re-use. shrinks if > 128MB */
void AlignedBuilder::reset() {
_len = 0;
const unsigned sizeCap = 128*1024*1024;
if (_p._size > sizeCap)
_realloc(sizeCap, _len);
RARELY {
const unsigned sizeCap = 128*1024*1024;
if (_p._size > sizeCap)
_realloc(sizeCap, _len);
}
}

/** reset with a hint as to the upcoming needed size specified */
void AlignedBuilder::reset(unsigned sz) {
_len = 0;
unsigned Q = 64 * 1024 * 1024 - 1;
unsigned want = (sz+Q) & (~Q);
if( _p._size == want ) {
return;
}
if( _p._size > want ) {
bool downsize = false;
RARELY { downsize = true; }
if( !downsize )
return;
}
_realloc(want, _len);
}

void AlignedBuilder::mallocSelfAligned(unsigned sz) {
Expand All @@ -52,10 +71,16 @@ namespace mongo {

/* "slow"/infrequent portion of 'grow()' */
void NOINLINE_DECL AlignedBuilder::growReallocate(unsigned oldLen) {
dassert( _len > _p._size );
unsigned a = _p._size;
assert( a );
while( 1 ) {
a *= 2;
if( a < 128 * 1024 * 1024 )
a *= 2;
else if( sizeof(int*) == 4 )
a += 32 * 1024 * 1024;
else
a += 64 * 1024 * 1024;
DEV if( a > 256*1024*1024 ) {
log() << "dur AlignedBuilder too big, aborting in _DEBUG build" << endl;
abort();
Expand Down
5 changes: 4 additions & 1 deletion util/alignedbuilder.h
Expand Up @@ -28,6 +28,9 @@ namespace mongo {
AlignedBuilder(unsigned init_size);
~AlignedBuilder() { kill(); }

/** reset with a hint as to the upcoming needed size specified */
void reset(unsigned sz);

/** reset for a re-use. shrinks if > 128MB */
void reset();

Expand Down Expand Up @@ -94,7 +97,7 @@ namespace mongo {
inline char* grow(unsigned by) {
unsigned oldlen = _len;
_len += by;
if ( _len > _p._size ) {
MONGO_IF ( _len > _p._size ) {
growReallocate(oldlen);
}
return _p._data + oldlen;
Expand Down
12 changes: 12 additions & 0 deletions util/compress.cpp
Expand Up @@ -7,6 +7,18 @@

namespace mongo {

void rawCompress(const char* input,
size_t input_length,
char* compressed,
size_t* compressed_length)
{
snappy::RawCompress(input, input_length, compressed, compressed_length);
}

size_t maxCompressedLength(size_t source_len) {
return snappy::MaxCompressedLength(source_len);
}

size_t compress(const char* input, size_t input_length, std::string* output) {
return snappy::Compress(input, input_length, output);
}
Expand Down
6 changes: 6 additions & 0 deletions util/compress.h
Expand Up @@ -10,6 +10,12 @@ namespace mongo {

bool uncompress(const char* compressed, size_t compressed_length, std::string* uncompressed);

size_t maxCompressedLength(size_t source_len);
void rawCompress(const char* input,
size_t input_length,
char* compressed,
size_t* compressed_length);

}


0 comments on commit c716fa5

Please sign in to comment.