Skip to content

Commit

Permalink
SERVER-20176: For writeConcern j:1 only sync once at end of operation
Browse files Browse the repository at this point in the history
  • Loading branch information
GeertBosch committed Sep 19, 2015
1 parent 3f710be commit 37b328a
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 63 deletions.
6 changes: 3 additions & 3 deletions src/mongo/db/storage/recovery_unit.h
Expand Up @@ -67,9 +67,9 @@ class RecoveryUnit {
virtual void abortUnitOfWork() = 0;

/**
* Waits until all writes prior to this call are durable. Returns true, unless the storage
* engine cannot guarantee durability, which should never happen when isDurable() returned
* true.
* Waits until all commits that happened before this call are durable. Returns true, unless the
* storage engine cannot guarantee durability, which should never happen when isDurable()
* returned true.
*/
virtual bool waitUntilDurable() = 0;

Expand Down
58 changes: 8 additions & 50 deletions src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
Expand Up @@ -47,43 +47,13 @@

namespace mongo {

namespace {
struct WaitUntilDurableData {
WaitUntilDurableData() : numWaitingForSync(0), lastSyncTime(0) {}

void syncHappend() {
stdx::lock_guard<stdx::mutex> lk(mutex);
lastSyncTime++;
condvar.notify_all();
}

// return true if happened
bool waitUntilDurable() {
stdx::unique_lock<stdx::mutex> lk(mutex);
long long start = lastSyncTime;
numWaitingForSync.fetchAndAdd(1);
condvar.wait_for(lk, stdx::chrono::milliseconds(50));
numWaitingForSync.fetchAndAdd(-1);
return lastSyncTime > start;
}

AtomicUInt32 numWaitingForSync;

stdx::mutex mutex; // this just protects lastSyncTime
stdx::condition_variable condvar;
long long lastSyncTime;
} waitUntilDurableData;
}

WiredTigerRecoveryUnit::WiredTigerRecoveryUnit(WiredTigerSessionCache* sc)
: _sessionCache(sc),
_session(NULL),
_inUnitOfWork(false),
_active(false),
_myTransactionCount(1),
_everStartedWrite(false),
_currentlySquirreled(false),
_syncing(false),
_noTicketNeeded(false) {}

WiredTigerRecoveryUnit::~WiredTigerRecoveryUnit() {
Expand Down Expand Up @@ -156,7 +126,6 @@ void WiredTigerRecoveryUnit::_abort() {
void WiredTigerRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) {
invariant(!_areWriteUnitOfWorksBanned);
invariant(!_inUnitOfWork);
invariant(!_currentlySquirreled);
_inUnitOfWork = true;
_everStartedWrite = true;
_getTicket(opCtx);
Expand All @@ -174,21 +143,15 @@ void WiredTigerRecoveryUnit::abortUnitOfWork() {
_abort();
}

void WiredTigerRecoveryUnit::goingToWaitUntilDurable() {
if (_active) {
// too late, can't change config
return;
void WiredTigerRecoveryUnit::_ensureSession() {
if (!_session) {
_session = _sessionCache->getSession();
}
// yay, we've configured ourselves for sync
_syncing = true;
}

bool WiredTigerRecoveryUnit::waitUntilDurable() {
if (_syncing && _everStartedWrite) {
// we did a sync, so we're good
return true;
}
waitUntilDurableData.waitUntilDurable();
_ensureSession();
_sessionCache->waitUntilDurable(_session);
return true;
}

Expand All @@ -207,9 +170,7 @@ void WiredTigerRecoveryUnit::assertInActiveTxn() const {
}

WiredTigerSession* WiredTigerRecoveryUnit::getSession(OperationContext* opCtx) {
if (!_session) {
_session = _sessionCache->getSession();
}
_ensureSession();

if (!_active) {
_txnOpen(opCtx);
Expand Down Expand Up @@ -304,8 +265,6 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) {
if (commit) {
invariantWTOK(s->commit_transaction(s, NULL));
LOG(2) << "WT commit_transaction";
if (_syncing)
waitUntilDurableData.syncHappend();
} else {
invariantWTOK(s->rollback_transaction(s, NULL));
LOG(2) << "WT rollback_transaction";
Expand Down Expand Up @@ -373,13 +332,12 @@ void WiredTigerRecoveryUnit::_txnOpen(OperationContext* opCtx) {
_getTicket(opCtx);

WT_SESSION* s = _session->getSession();
_syncing = _syncing || waitUntilDurableData.numWaitingForSync.load() > 0;

if (_readFromMajorityCommittedSnapshot) {
_majorityCommittedSnapshot =
_sessionCache->snapshotManager().beginTransactionOnCommittedSnapshot(s, _syncing);
_sessionCache->snapshotManager().beginTransactionOnCommittedSnapshot(s);
} else {
invariantWTOK(s->begin_transaction(s, _syncing ? "sync=true" : NULL));
invariantWTOK(s->begin_transaction(s, NULL));
}

LOG(2) << "WT begin_transaction";
Expand Down
6 changes: 2 additions & 4 deletions src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
Expand Up @@ -62,9 +62,8 @@ class WiredTigerRecoveryUnit final : public RecoveryUnit {
void abortUnitOfWork() final;

virtual bool waitUntilDurable();
virtual void goingToWaitUntilDurable();

virtual void registerChange(Change*);
virtual void registerChange(Change* change);

virtual void abandonSnapshot();

Expand Down Expand Up @@ -122,6 +121,7 @@ class WiredTigerRecoveryUnit final : public RecoveryUnit {
void _abort();
void _commit();

void _ensureSession();
void _txnClose(bool commit);
void _txnOpen(OperationContext* opCtx);

Expand All @@ -134,8 +134,6 @@ class WiredTigerRecoveryUnit final : public RecoveryUnit {
uint64_t _myTransactionCount;
bool _everStartedWrite;
Timer _timer;
bool _currentlySquirreled;
bool _syncing;
RecordId _oplogReadTill;
bool _readFromMajorityCommittedSnapshot = false;
SnapshotName _majorityCommittedSnapshot = SnapshotName::min();
Expand Down
17 changes: 17 additions & 0 deletions src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
Expand Up @@ -151,6 +151,23 @@ void WiredTigerSessionCache::shuttingDown() {
_snapshotManager.shutdown();
}

void WiredTigerSessionCache::waitUntilDurable(WiredTigerSession* session) {
uint32_t start = _lastSyncTime.load();
// Do the remainder in a critical section that ensures only a single thread at a time
// will attempt to synchronize.
stdx::unique_lock<stdx::mutex> lk(_lastSyncMutex);
uint32_t current = _lastSyncTime.loadRelaxed(); // synchronized with writes through mutex
if (current != start) {
// Someone else synced already since we read lastSyncTime, so we're done!
return;
}
_lastSyncTime.store(current + 1);

// Nobody has synched yet, so we have to sync ourselves.
WT_SESSION* s = session->getSession();
invariantWTOK(s->log_flush(s, "sync=on"));
}

void WiredTigerSessionCache::closeAll() {
// Increment the epoch as we are now closing all sessions with this epoch
SessionCache swap;
Expand Down
11 changes: 10 additions & 1 deletion src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
Expand Up @@ -152,6 +152,11 @@ class WiredTigerSessionCache {
*/
void shuttingDown();

/**
* Waits until all commits that happened before this call are durable.
*/
void waitUntilDurable(WiredTigerSession* session);

WT_CONNECTION* conn() const {
return _conn;
}
Expand Down Expand Up @@ -180,5 +185,9 @@ class WiredTigerSessionCache {

// Bumped when all open sessions need to be closed
AtomicUInt64 _epoch; // atomic so we can check it outside of the lock

// Counter and critical section mutex for waitUntilDurable
AtomicUInt32 _lastSyncTime;
stdx::mutex _lastSyncMutex;
};
}
} // namespace
Expand Up @@ -90,8 +90,8 @@ boost::optional<SnapshotName> WiredTigerSnapshotManager::getMinSnapshotForNextCo
return _committedSnapshot;
}

SnapshotName WiredTigerSnapshotManager::beginTransactionOnCommittedSnapshot(WT_SESSION* session,
bool sync) const {
SnapshotName WiredTigerSnapshotManager::beginTransactionOnCommittedSnapshot(
WT_SESSION* session) const {
stdx::lock_guard<stdx::mutex> lock(_mutex);

uassert(ErrorCodes::ReadConcernMajorityNotAvailableYet,
Expand All @@ -100,8 +100,6 @@ SnapshotName WiredTigerSnapshotManager::beginTransactionOnCommittedSnapshot(WT_S

StringBuilder config;
config << "snapshot=" << _committedSnapshot->asU64();
if (sync)
config << ",sync=true";
invariantWTOK(session->begin_transaction(session, config.str().c_str()));

return *_committedSnapshot;
Expand Down
Expand Up @@ -71,7 +71,7 @@ class WiredTigerSnapshotManager final : public SnapshotManager {
*
* Throws if there is currently no committed snapshot.
*/
SnapshotName beginTransactionOnCommittedSnapshot(WT_SESSION* session, bool sync) const;
SnapshotName beginTransactionOnCommittedSnapshot(WT_SESSION* session) const;

/**
* Returns lowest SnapshotName that could possibly be used by a future call to
Expand Down

0 comments on commit 37b328a

Please sign in to comment.