Skip to content

Commit

Permalink
Fixing Voron race condition that can happen when a read transaction i…
Browse files Browse the repository at this point in the history
…s started _while_ the flush is in progress.

This can cause the read transaction to read data that started after it opened, so we need to prevent that.
  • Loading branch information
ppekrol authored and ayende committed Oct 26, 2016
1 parent 59b3199 commit 637b7f8
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ public void AllScratchPagesShouldBeReleased()

env.FlushLogToDataFile(); // non read nor write transactions, so it should flush and release everything from scratch

Assert.Equal(0, env.ScratchBufferPool.GetNumberOfAllocations(0));
// we keep track of the pages in scratch for one additional transaction, to avoid race
// condition with FlushLogToDataFile concurrently with new read transactions
Assert.Equal(2, env.ScratchBufferPool.GetNumberOfAllocations(0));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void CanAddContinuallyGrowingValue()
// also in the meanwhile the free space handling is doing its job so it needs some pages too
// and we allocate not the exact size but the nearest power of two e.g. we write 257 pages but in scratch we request 512 ones
int i = 0;
while (size < 256 * AbstractPager.PageSize)
while (size < 128 * AbstractPager.PageSize)
{
using (Env.NewTransaction(TransactionFlags.Read))
{
Expand Down
2 changes: 1 addition & 1 deletion Raven.Voron/Voron/Impl/Compaction/StorageCompaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static void Execute(StorageEnvironmentOptions srcOptions, StorageEnvironm

compactedEnv.FlushLogToDataFile(allowToFlushOverwrittenPages: true);

compactedEnv.Journal.Applicator.SyncDataFile(compactedEnv.OldestTransaction);
compactedEnv.Journal.Applicator.SyncDataFile();
compactedEnv.Journal.Applicator.DeleteCurrentAlreadyFlushedJournal();

minimalCompactedDataFileSize = compactedEnv.NextPageNumber*AbstractPager.PageSize;
Expand Down
25 changes: 16 additions & 9 deletions Raven.Voron/Voron/Impl/Journal/WriteAheadJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ private IDisposable ForceFlushingPagesOlderThan(long oldestActiveTransaction)
});
}

public void ApplyLogsToDataFile(long oldestActiveTransaction, CancellationToken token, Transaction transaction = null, bool allowToFlushOverwrittenPages = false)
public void ApplyLogsToDataFile(CancellationToken token, Transaction transaction = null, bool allowToFlushOverwrittenPages = false)
{
if (token.IsCancellationRequested)
return;
Expand Down Expand Up @@ -451,6 +451,8 @@ public void ApplyLogsToDataFile(long oldestActiveTransaction, CancellationToken

long lastFlushedTransactionId = -1;

long oldestActiveTransaction = _waj._env.OldestTransaction;

foreach (var journalFile in jrnls.Where(x => x.Number >= _lastSyncedJournal))
{
var currentJournalMaxTransactionId = -1L;
Expand Down Expand Up @@ -529,7 +531,7 @@ public void ApplyLogsToDataFile(long oldestActiveTransaction, CancellationToken

using (ForceFlushingPagesOlderThan(oldestActiveTransaction))
{
ApplyLogsToDataFile(oldestActiveTransaction, token, transaction, false);
ApplyLogsToDataFile(token, transaction, false);
}
}

Expand Down Expand Up @@ -578,7 +580,7 @@ public void ApplyLogsToDataFile(long oldestActiveTransaction, CancellationToken
if (_totalWrittenButUnsyncedBytes > DelayedDataFileSynchronizationBytesLimit ||
DateTime.UtcNow - _lastDataFileSyncTime > _delayedDataFileSynchronizationTimeLimit)
{
SyncDataFile(oldestActiveTransaction);
SyncDataFile();
}

}
Expand All @@ -589,11 +591,11 @@ public void ApplyLogsToDataFile(long oldestActiveTransaction, CancellationToken
}
}

internal void SyncDataFile(long oldestActiveTransaction)
internal void SyncDataFile()
{
_waj._dataPager.Sync();

UpdateFileHeaderAfterDataFileSync(_lastFlushedJournal, oldestActiveTransaction);
UpdateFileHeaderAfterDataFileSync(_lastFlushedJournal, _lastSyncedTransactionId);

foreach (var toDelete in _journalsToDelete.Values)
{
Expand Down Expand Up @@ -684,17 +686,22 @@ private void EnsureDataPagerSpacing(Transaction transaction, Page last, int numb

private void FreeScratchPages(IEnumerable<JournalFile> unusedJournalFiles, Transaction txw)
{
// we release up to the last read transaction, because there might be new read transactions that are currently
// running, that started after the flush
var lastSyncedTransactionId = Math.Min(_lastSyncedTransactionId, _waj._env.CurrentReadTransactionId - 1);

// we have to free pages of the unused journals before the remaining ones that are still in use
// to prevent reading from them by any read transaction (read transactions search journals from the newest
// to read the most updated version)

foreach (var journalFile in unusedJournalFiles.OrderBy(x => x.Number))
{
journalFile.FreeScratchPagesOlderThan(txw, _lastSyncedTransactionId, forceToFreeAllPages: forcedFlushOfOldPages);
journalFile.FreeScratchPagesOlderThan(txw, lastSyncedTransactionId, forceToFreeAllPages: forcedFlushOfOldPages);
}

foreach (var jrnl in _waj._files.OrderBy(x => x.Number))
{
jrnl.FreeScratchPagesOlderThan(txw, _lastSyncedTransactionId, forceToFreeAllPages: forcedFlushOfOldPages);
jrnl.FreeScratchPagesOlderThan(txw, lastSyncedTransactionId, forceToFreeAllPages: forcedFlushOfOldPages);
}
}

Expand All @@ -716,7 +723,7 @@ private List<JournalFile> GetUnusedJournalFiles(IEnumerable<JournalSnapshot> jrn
return unusedJournalFiles;
}

public void UpdateFileHeaderAfterDataFileSync(JournalFile file, long oldestActiveTransaction)
public void UpdateFileHeaderAfterDataFileSync(JournalFile file, long maxTransactionId)
{
var txHeaders = stackalloc TransactionHeader[2];
var readTxHeader = &txHeaders[0];
Expand All @@ -729,7 +736,7 @@ public void UpdateFileHeaderAfterDataFileSync(JournalFile file, long oldestActiv
break;
if (readTxHeader->HeaderMarker != Constants.TransactionHeaderMarker)
break;
if (readTxHeader->TransactionId + 1 == oldestActiveTransaction)
if (readTxHeader->TransactionId > maxTransactionId)
break;

lastReadTxHeader = *readTxHeader;
Expand Down
44 changes: 33 additions & 11 deletions Raven.Voron/Voron/StorageEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,14 @@ public Transaction NewTransaction(TransactionFlags flags, TimeSpan? timeout = nu
RecordTransactionState(tx, DebugActionType.TransactionStart);
tx.RecordTransactionState = RecordTransactionState;
}

_activeTransactions.Add(tx);
}
finally
{
_txCommit.ExitReadLock();
}

_activeTransactions.Add(tx);

tx.EnsurePagerStateReference(_dataPager.PagerState);

if (flags == TransactionFlags.ReadWrite)
Expand All @@ -461,6 +462,32 @@ private void RecordTransactionState(Transaction tx, DebugActionType state)
DebugJournal.RecordTransactionAction(tx, state);
}

public long CurrentReadTransactionId
{
get { return Thread.VolatileRead(ref _transactionsCounter); }
}

internal ExitWriteLock PreventNewReadTransactions()
{
_txCommit.EnterWriteLock();
return new ExitWriteLock(_txCommit);
}

public struct ExitWriteLock : IDisposable
{
readonly ReaderWriterLockSlim _rwls;

public ExitWriteLock(ReaderWriterLockSlim rwls)
{
_rwls = rwls;
}

public void Dispose()
{
_rwls.ExitWriteLock();
}
}

public long NextWriteTransactionId
{
get { return Thread.VolatileRead(ref _transactionsCounter) + 1; }
Expand All @@ -470,19 +497,14 @@ private void TransactionAfterCommit(Transaction tx)
{
if (_activeTransactions.Contains(tx) == false)
return;

_txCommit.EnterWriteLock();
try

using (PreventNewReadTransactions())
{
if (tx.Committed && tx.FlushedToJournal)
_transactionsCounter = tx.Id;

State = tx.State;
}
finally
{
_txCommit.ExitWriteLock();
}

if (tx.FlushedToJournal == false)
return;
Expand Down Expand Up @@ -623,7 +645,7 @@ private Task FlushWritesToDataFileAsync()
try
{
_journal.Applicator.ApplyLogsToDataFile(OldestTransaction, _cancellationTokenSource.Token);
_journal.Applicator.ApplyLogsToDataFile(_cancellationTokenSource.Token);
}
catch (TimeoutException)
{
Expand Down Expand Up @@ -657,7 +679,7 @@ internal void ForceLogFlushToDataFile(Transaction tx, bool allowToFlushOverwritt
_debugJournal.RecordFlushAction(DebugActionType.FlushStart, tx);
}

_journal.Applicator.ApplyLogsToDataFile(OldestTransaction, _cancellationTokenSource.Token, tx, allowToFlushOverwrittenPages);
_journal.Applicator.ApplyLogsToDataFile(_cancellationTokenSource.Token, tx, allowToFlushOverwrittenPages);

if (IsDebugRecording)
{
Expand Down

0 comments on commit 637b7f8

Please sign in to comment.