Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

journal compression

  • Loading branch information...
commit 50902acb9acc68b3215d1cf312a3e9c9406e517c 1 parent b6200ff
Dwight Merriman dwight authored
34 db/dur.cpp
View
@@ -62,11 +62,11 @@
#include "dur_journal.h"
#include "dur_commitjob.h"
#include "dur_recover.h"
+#include "dur_stats.h"
#include "../util/concurrency/race.h"
#include "../util/mongoutils/hash.h"
#include "../util/mongoutils/str.h"
#include "../util/timer.h"
-#include "dur_stats.h"
using namespace mongoutils;
@@ -74,8 +74,9 @@ namespace mongo {
namespace dur {
- void WRITETODATAFILES();
- void PREPLOGBUFFER();
+ void PREPLOGBUFFER(JSectHeader& outParm);
+ void WRITETOJOURNAL(JSectHeader h, AlignedBuilder& uncompressed);
+ void WRITETODATAFILES(const JSectHeader& h, AlignedBuilder& uncompressed);
/** declared later in this file
only used in this file -- use DurableInterface::commitNow() outside
@@ -326,15 +327,6 @@ namespace mongo {
}
#endif
- /** write the buffer we have built to the journal and fsync it.
- outside of lock as that could be slow.
- */
- static void WRITETOJOURNAL(AlignedBuilder& ab) {
- Timer t;
- journal(ab);
- stats.curr->_writeToJournalMicros += t.micros();
- }
-
// Functor to be called over all MongoFiles
class validateSingleMapMatches {
@@ -504,11 +496,12 @@ namespace mongo {
commitJob.notifyCommitted();
return true;
}
- PREPLOGBUFFER();
+ JSectHeader h;
+ PREPLOGBUFFER(h);
RWLockRecursive::Shared lk3(MongoFile::mmmutex);
- unsigned abLen = commitJob._ab.len() + sizeof(JSectFooter);
+ unsigned abLen = commitJob._ab.len();
commitJob.reset(); // must be reset before allowing anyone to write
DEV assert( !commitJob.hasWritten() );
@@ -516,15 +509,15 @@ namespace mongo {
lk1.reset();
// ****** now other threads can do writes ******
- WRITETOJOURNAL(commitJob._ab);
+ WRITETOJOURNAL(h, commitJob._ab);
assert( abLen == commitJob._ab.len() ); // a check that no one touched the builder while we were doing work. if so, our locking is wrong.
// data is now in the journal, which is sufficient for acknowledging getLastError.
// (ok to crash after that)
commitJob.notifyCommitted();
- WRITETODATAFILES();
- assert( abLen == commitJob._ab.len() ); // WRITETODATAFILES uses _ab also
+ WRITETODATAFILES(h, commitJob._ab);
+ assert( abLen == commitJob._ab.len() ); // check again wasn't modded
commitJob._ab.reset();
// can't : dbMutex._remapPrivateViewRequested = true;
@@ -570,18 +563,19 @@ namespace mongo {
// (and we are only read locked in the dbMutex, so it could happen)
scoped_lock lk(groupCommitMutex);
- PREPLOGBUFFER();
+ JSectHeader h;
+ PREPLOGBUFFER(h);
// todo : write to the journal outside locks, as this write can be slow.
// however, be careful then about remapprivateview as that cannot be done
// if new writes are then pending in the private maps.
- WRITETOJOURNAL(commitJob._ab);
+ WRITETOJOURNAL(h, commitJob._ab);
// data is now in the journal, which is sufficient for acknowledging getLastError.
// (ok to crash after that)
commitJob.notifyCommitted();
- WRITETODATAFILES();
+ WRITETODATAFILES(h, commitJob._ab);
debugValidateAllMapsMatch();
commitJob.reset();
65 db/dur_journal.cpp
View
@@ -111,6 +111,10 @@ namespace mongo {
}
bool JSectFooter::checkHash(const void* begin, int len) const {
+ if( !magicOk() ) {
+ log() << "journal footer not valid" << endl;
+ return false;
+ }
Checksum c;
c.gen(begin, len);
DEV log() << "checkHash len:" << len << " hash:" << toHex(hash, 16) << " current:" << toHex(c.bytes, 16) << endl;
@@ -634,51 +638,56 @@ namespace mongo {
}
}
- /** write to journal (called by WRITETOJOURNAL)
+ /** write (append) the buffer we have built to the journal and fsync it.
+ outside of lock as that could be slow.
+ @param uncompressed - a buffer that will be written to the journal after compression
+ will not return until on disk
*/
- void journal(AlignedBuilder& b) {
- j.journal(b);
+ void WRITETOJOURNAL(JSectHeader h, AlignedBuilder& uncompressed) {
+ Timer t;
+ j.journal(h, uncompressed);
+ stats.curr->_writeToJournalMicros += t.micros();
}
- void Journal::journal(AlignedBuilder& _b) {
+ void Journal::journal(const JSectHeader& h, const AlignedBuilder& uncompressed) {
RACECHECK
-#if 0
- unsigned w = _b.len();
- const AlignedBuilder& b = _b;
-#endif
- static AlignedBuilder cb(32*1024*1024);
+ static AlignedBuilder b(32*1024*1024);
+ /* buffer to journal will be
+ JSectHeader
+ compressed operations
+ JSectFooter
+ */
const unsigned headTailSize = sizeof(JSectHeader) + sizeof(JSectFooter);
- cb.reset( maxCompressedLength(_b.len()) + headTailSize );
+ const unsigned max = maxCompressedLength(uncompressed.len()) + headTailSize;
+ b.reset(max);
{
- JSectHeader *h = (JSectHeader *) _b.buf();
- dassert( h->sectionLen() == (unsigned) 0xffffffff );
- cb.appendStruct(*h);
+ dassert( h.sectionLen() == (unsigned) 0xffffffff ); // we will backfill later
+ b.appendStruct(h);
}
size_t compressedLength = 0;
- rawCompress(_b.buf(), _b.len() - headTailSize, cb.cur(), &compressedLength);
+ rawCompress(uncompressed.buf(), uncompressed.len(), b.cur(), &compressedLength);
assert( compressedLength < 0xffffffff );
- cb.skip(compressedLength);
+ assert( compressedLength < max );
+ b.skip(compressedLength);
// footer
unsigned L = 0xffffffff;
{
// pad to alignment, and set the total section length in the JSectHeader
assert( 0xffffe000 == (~(Alignment-1)) );
- unsigned lenUnpadded = cb.len() + sizeof(JSectFooter);
+ unsigned lenUnpadded = b.len() + sizeof(JSectFooter);
L = (lenUnpadded + Alignment-1) & (~(Alignment-1));
dassert( L >= lenUnpadded );
- ((JSectHeader*)cb.atOfs(0))->setSectionLen(lenUnpadded);
+ ((JSectHeader*)b.atOfs(0))->setSectionLen(lenUnpadded);
- JSectFooter f(cb.buf(), cb.len()); // computes checksum
- cb.appendStruct(f);
+ JSectFooter f(b.buf(), b.len()); // computes checksum
+ b.appendStruct(f);
+ dassert( b.len() == lenUnpadded );
- // need a footer for uncompressed buffer too so that WRITETODATAFILES is happy.
- // done this way as we do not need a checksum for that.
- JSectFooter uncompressedFooter;
- _b.appendStruct(uncompressedFooter);
- ((JSectHeader*)_b.atOfs(0))->setSectionLen(_b.len());
+ b.skip(L - lenUnpadded);
+ dassert( b.len() % Alignment == 0 );
}
try {
@@ -687,16 +696,16 @@ namespace mongo {
// must already be open -- so that _curFileId is correct for previous buffer building
assert( _curLogFile );
- stats.curr->_uncompressedBytes += _b.len();
- unsigned w = cb.len();
+ stats.curr->_uncompressedBytes += b.len();
+ unsigned w = b.len();
_written += w;
assert( w <= L );
stats.curr->_journaledBytes += L;
- _curLogFile->synchronousAppend((void *) cb.buf(), L);
+ _curLogFile->synchronousAppend((const void *) b.buf(), L);
_rotate();
}
catch(std::exception& e) {
- log() << "warning exception in dur::journal " << e.what() << endl;
+ log() << "error exception in dur::journal " << e.what() << endl;
throw;
}
}
11 db/dur_journal.h
View
@@ -37,11 +37,12 @@ namespace mongo {
/** assure journal/ dir exists. throws */
void journalMakeDir();
- /** write/append to journal file *
- @param buf - a buffer that will be written to the journal.
- will not return until on disk
- */
- void journal(AlignedBuilder& buf);
+ /** check if time to rotate files; assure a file is open.
+ done separately from the journal() call as we can do this part
+ outside of lock.
+ only called by durThread.
+ */
+ void journalRotate();
/** flag that something has gone wrong during writing to the journal
(not for recovery mode)
2  db/dur_journalformat.h
View
@@ -145,7 +145,7 @@ namespace mongo {
*/
bool checkHash(const void* begin, int len) const;
- void checkMagic() const { assert( *((unsigned*)magic) == 0x0a0a0a0a ); }
+ bool magicOk() const { return *((unsigned*)magic) == 0x0a0a0a0a; }
};
/** declares "the next entry(s) are for this database / file path prefix" */
12 db/dur_journalimpl.h
View
@@ -18,6 +18,7 @@
#pragma once
+#include "dur_journalformat.h"
#include "../util/logfile.h"
namespace mongo {
@@ -33,9 +34,16 @@ namespace mongo {
/** call during startup by journalMakeDir() */
void init();
- /** write to journal
+ /** check if time to rotate files. assure a file is open.
+ done separately from the journal() call as we can do this part
+ outside of lock.
+ thread: durThread()
+ */
+ void rotate();
+
+ /** append to the journal file
*/
- void journal(AlignedBuilder& b);
+ void journal(const JSectHeader& h, const AlignedBuilder& b);
boost::filesystem::path getFilePathFor(int filenumber) const;
15 db/dur_preplogbuffer.cpp
View
@@ -125,23 +125,20 @@ namespace mongo {
}
}
- void resetLogBuffer(AlignedBuilder& bb) {
+ void resetLogBuffer(/*out*/JSectHeader& h, AlignedBuilder& bb) {
bb.reset();
- // JSectHeader
- JSectHeader h;
h.setSectionLen(0xffffffff); // total length, will fill in later
h.seqNumber = getLastDataFileFlushTime();
h.fileId = j.curFileId();
-
- bb.appendStruct(h);
}
/** we will build an output buffer ourself and then use O_DIRECT
we could be in read lock for this
caller handles locking
+ @return partially populated sectheader and _ab set
*/
- void _PREPLOGBUFFER() {
+ void _PREPLOGBUFFER(JSectHeader& h) {
assert( cmdLine.dur );
{
@@ -153,7 +150,7 @@ namespace mongo {
}
AlignedBuilder& bb = commitJob._ab;
- resetLogBuffer(bb); // adds JSectHeader
+ resetLogBuffer(h, bb); // adds JSectHeader
// ops other than basic writes (DurOp's)
{
@@ -166,10 +163,10 @@ namespace mongo {
return;
}
- void PREPLOGBUFFER() {
+ void PREPLOGBUFFER(/*out*/ JSectHeader& h) {
Timer t;
j.assureLogFileOpen(); // so fileId is set
- _PREPLOGBUFFER();
+ _PREPLOGBUFFER(h);
stats.curr->_prepLogBufferMicros += t.micros();
}
150 db/dur_recover.cpp
View
@@ -94,89 +94,72 @@ namespace mongo {
throws
*/
class JournalSectionIterator : boost::noncopyable {
- unique_ptr<BufReader> _br;
- const JSectHeader* _sectHead;
+ unique_ptr<BufReader> _entries;
+ const JSectHeader _h;
const char *_lastDbName; // pointer into mmaped journal file
const bool _doDurOps;
string _uncompressed;
public:
- JournalSectionIterator(const void *p, unsigned len, bool doDurOpsRecovering)
- :
- _sectHead(static_cast<const JSectHeader*>(p))
- , _lastDbName(0)
+ JournalSectionIterator(const JSectHeader& h, const void *compressed, unsigned compressedLen, bool doDurOpsRecovering) :
+ _h(h),
+ _lastDbName(0)
, _doDurOps(doDurOpsRecovering)
{
- if( doDurOpsRecovering ) {
- // compressed case
- assert( sizeof(JSectHeader) + sizeof(JSectFooter) >= len );
- // this could be done in a streaming manner which would be better in terms of memory use - probably not worth the extra code complexity though
- bool ok = uncompress((const char *)p, len - sizeof(JSectHeader) - sizeof(JSectFooter), &_uncompressed);
- if( !ok ) {
- // it should always be ok (i think?) as there is a previous check to see that the JSectFooter is ok
- log() << "fatal error couldn't uncompress section during journal recovery" << endl;
- mongoAbort("journal recovery uncompress failure");
- }
- _br = unique_ptr<BufReader>( new BufReader(_uncompressed.c_str(), _uncompressed.size()) );
- }
- else {
- // we work with the uncompressed buffer when doing a WRITETODATAFILES (for speed)
- _br = unique_ptr<BufReader>( new BufReader(p, len) );
- _br->skip(sizeof(JSectHeader));
+ assert( doDurOpsRecovering );
+ bool ok = uncompress((const char *)compressed, compressedLen, &_uncompressed);
+ if( !ok ) {
+ // it should always be ok (i think?) as there is a previous check to see that the JSectFooter is ok
+ log() << "couldn't uncompress journal section" << endl;
+ msgasserted(0, "couldn't uncompress journal section");
}
+ const char *p = _uncompressed.c_str();
+ assert( compressedLen == _h.sectionLen() - sizeof(JSectFooter) - sizeof(JSectHeader) );
+ _entries = unique_ptr<BufReader>( new BufReader(p, _uncompressed.size()) );
}
- bool atEof() const { return _br->atEof(); }
+ // we work with the uncompressed buffer when doing a WRITETODATAFILES (for speed)
+ JournalSectionIterator(const JSectHeader &h, const void *p, unsigned len) :
+ _h(h),
+ _lastDbName(0)
+ , _doDurOps(false),
+ _entries( new BufReader((const char *) p, len) )
+ { }
- unsigned long long seqNumber() const { return _sectHead->seqNumber; }
+ bool atEof() const { return _entries->atEof(); }
+
+ unsigned long long seqNumber() const { return _h.seqNumber; }
/** get the next entry from the log. this function parses and combines JDbContext and JEntry's.
- * @return true if got an entry. false at successful end of section (and no entry returned).
* throws on premature end of section.
*/
- bool next(ParsedJournalEntry& e) {
+ void next(ParsedJournalEntry& e) {
unsigned lenOrOpCode;
- _br->read(lenOrOpCode);
+ _entries->read(lenOrOpCode);
if (lenOrOpCode > JEntry::OpCode_Min) {
switch( lenOrOpCode ) {
case JEntry::OpCode_Footer: {
- if (_doDurOps) {
- const char* pos = (const char*) _br->pos();
- pos -= sizeof(lenOrOpCode); // rewind to include OpCode
- const JSectFooter& footer = *(const JSectFooter*)pos;
- int len = pos - (char*)_sectHead;
- if( _doDurOps ) {
- if (!footer.checkHash(_sectHead, len)) {
- massert(13594, "journal checksum doesn't match", false);
- }
- }
- else {
- // no need to check on WRITEDODATAFILES. we check a little to self test
- RARELY assert( footer.checkHash(_sectHead, len) );
- }
- footer.checkMagic();
- }
- return false; // false return value denotes end of section
+ assert( false );
}
case JEntry::OpCode_FileCreated:
case JEntry::OpCode_DropDb: {
e.dbName = 0;
- boost::shared_ptr<DurOp> op = DurOp::read(lenOrOpCode, *_br);
+ boost::shared_ptr<DurOp> op = DurOp::read(lenOrOpCode, *_entries);
if (_doDurOps) {
e.op = op;
}
- return true;
+ return;
}
case JEntry::OpCode_DbContext: {
- _lastDbName = (const char*) _br->pos();
- const unsigned limit = std::min((unsigned)Namespace::MaxNsLen, _br->remaining());
+ _lastDbName = (const char*) _entries->pos();
+ const unsigned limit = std::min((unsigned)Namespace::MaxNsLen, _entries->remaining());
const unsigned len = strnlen(_lastDbName, limit);
massert(13533, "problem processing journal file during recovery", _lastDbName[len] == '\0');
- _br->skip(len+1); // skip '\0' too
- _br->read(lenOrOpCode);
+ _entries->skip(len+1); // skip '\0' too
+ _entries->read(lenOrOpCode); // read this for the fall through
}
// fall through as a basic operation always follows jdbcontext, and we don't have anything to return yet
@@ -188,13 +171,13 @@ namespace mongo {
// JEntry - a basic write
assert( lenOrOpCode && lenOrOpCode < JEntry::OpCode_Min );
- _br->rewind(4);
- e.e = (JEntry *) _br->skip(sizeof(JEntry));
+ _entries->rewind(4);
+ e.e = (JEntry *) _entries->skip(sizeof(JEntry));
e.dbName = e.e->isLocalDbContext() ? "local" : _lastDbName;
assert( e.e->len == lenOrOpCode );
- _br->skip(e.e->len);
- return true;
+ _entries->skip(e.e->len);
}
+
};
static string fileName(const char* dbName, int fileNo) {
@@ -313,12 +296,36 @@ namespace mongo {
log() << "END section" << endl;
}
- void RecoveryJob::processSection(const void *p, unsigned len) {
- JSectHeader *h = (JSectHeader *) p;
-
+ void RecoveryJob::processSection(const JSectHeader *h, const void *p, unsigned len, const JSectFooter *f) {
scoped_lock lk(_mx);
RACECHECK
+ /** todo: we should really verify the checksum to see that seqNumber is ok?
+ that is expensive maybe there is some sort of checksum of just the header
+ within the header itself
+ */
+ if( _recovering && _lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs ) {
+ if( h->seqNumber != _lastSeqMentionedInConsoleLog ) {
+ static int n;
+ if( ++n < 10 ) {
+ log() << "recover skipping application of section seq:" << h->seqNumber << " < lsn:" << _lastDataSyncedFromLastRun << endl;
+ }
+ else if( n == 10 ) {
+ log() << "recover skipping application of section more..." << endl;
+ }
+ _lastSeqMentionedInConsoleLog = h->seqNumber;
+ }
+ return;
+ }
+
+ unique_ptr<JournalSectionIterator> i;
+ if( _recovering ) {
+ i = unique_ptr<JournalSectionIterator>(new JournalSectionIterator(*h, p, len, _recovering));
+ }
+ else {
+ i = unique_ptr<JournalSectionIterator>(new JournalSectionIterator(*h, /*after header*/p, /*w/out header*/len));
+ }
+
// we use a static so that we don't have to reallocate every time through. occasionally we
// go back to a small allocation so that if there were a spiky growth it won't stick forever.
static vector<ParsedJournalEntry> entries;
@@ -330,23 +337,21 @@ namespace mongo {
}
}
- JournalSectionIterator i(p, len, _recovering);
-
- //DEV log() << "recovery processSection seq:" << i.seqNumber() << endl;
- if( _recovering && _lastDataSyncedFromLastRun > i.seqNumber() + ExtraKeepTimeMs ) {
- if( i.seqNumber() != _lastSeqMentionedInConsoleLog ) {
- log() << "recover skipping application of section seq:" << i.seqNumber() << " < lsn:" << _lastDataSyncedFromLastRun << endl;
- _lastSeqMentionedInConsoleLog = i.seqNumber();
- }
- return;
- }
-
// first read all entries to make sure this section is valid
ParsedJournalEntry e;
- while( i.next(e) ) {
+ while( !i->atEof() ) {
+ i->next(e);
entries.push_back(e);
}
+ // after the entries check the footer checksum
+ if( _recovering ) {
+ assert( ((const char *)h) + sizeof(JSectHeader) == p );
+ if( !f->checkHash(h, len + sizeof(JSectHeader)) ) {
+ msgasserted(13594, "journal checksum doesn't match");
+ }
+ }
+
// got all the entries for one group commit. apply them:
applyEntries(entries);
}
@@ -386,7 +391,12 @@ namespace mongo {
}
return true;
}
- processSection(br.skip(h.sectionLenWithPadding()), h.sectionLen());
+ unsigned slen = h.sectionLen();
+ unsigned dataLen = slen - sizeof(JSectHeader) - sizeof(JSectFooter);
+ const char *hdr = (const char *) br.skip(h.sectionLenWithPadding());
+ const char *data = hdr + sizeof(JSectHeader);
+ const char *footer = data + dataLen;
+ processSection((const JSectHeader*) hdr, data, dataLen, (const JSectFooter*) footer);
// ctrl c check
killCurrentOp.checkForInterrupt(false);
8 db/dur_recover.h
View
@@ -2,6 +2,7 @@
#pragma once
+#include "dur_journalformat.h"
#include "../util/concurrency/mutex.h"
#include "../util/file.h"
@@ -15,12 +16,13 @@ namespace mongo {
*/
class RecoveryJob : boost::noncopyable {
public:
- RecoveryJob() :_lastDataSyncedFromLastRun(0), _mx("recovery"), _recovering(false) { _lastSeqMentionedInConsoleLog = 1; }
+ RecoveryJob() : _lastDataSyncedFromLastRun(0),
+ _mx("recovery"), _recovering(false) { _lastSeqMentionedInConsoleLog = 1; }
void go(vector<path>& files);
~RecoveryJob();
- /** @param recovering indicates we are doing recovery and not a WRITETODATAFILES */
- void processSection(const void *, unsigned len);
+ /** @param data data between header and footer. compressed if recovering. */
+ void processSection(const JSectHeader *h, const void *data, unsigned len, const JSectFooter *f);
void close(); // locks and calls _close()
10 db/dur_writetodatafiles.cpp
View
@@ -47,9 +47,9 @@ namespace mongo {
@see https://docs.google.com/drawings/edit?id=1TklsmZzm7ohIZkwgeK6rMvsdaR13KjtJYMsfLr175Zc&hl=en
*/
- void WRITETODATAFILES_Impl1() {
+ void WRITETODATAFILES_Impl1(const JSectHeader& h, AlignedBuilder& uncompressed) {
RWLockRecursive::Shared lk(MongoFile::mmmutex);
- RecoveryJob::get().processSection(commitJob._ab.buf(), commitJob._ab.len());
+ RecoveryJob::get().processSection(&h, uncompressed.buf(), uncompressed.len(), 0);
}
#if 0
@@ -81,16 +81,14 @@ namespace mongo {
#endif
// concurrency: in mmmutex, not necessarily in dbMutex
- void WRITETODATAFILES() {
+ void WRITETODATAFILES(const JSectHeader& h, AlignedBuilder& uncompressed) {
Timer t;
#if defined(_EXPERIMENTAL)
WRITETODATAFILES_Impl3();
#else
- WRITETODATAFILES_Impl1();
+ WRITETODATAFILES_Impl1(h, uncompressed);
#endif
stats.curr->_writeToDataFilesMicros += t.micros();
-
-
}
}
2  util/bufreader.h
View
@@ -28,6 +28,7 @@ namespace mongo {
public:
class eof : public std::exception {
public:
+ eof() { }
virtual const char * what() { return "BufReader eof"; }
};
@@ -88,6 +89,7 @@ namespace mongo {
}
const void* pos() { return _pos; }
+ const void* start() { return _start; }
private:
const void *_start;
Please sign in to comment.
Something went wrong with that request. Please try again.