Skip to content

Commit

Permalink
Counts store rotation post state changes and applied transactions tra…
Browse files Browse the repository at this point in the history
…cking

Rotation post state should apply all transactions that updating count store
but only transaction with ids that greater than rotation version should be treated
as applied in post state.
Otherwise during next count store rotation those transactions will be incorectly interpreted
as already applied during current count store rotation cycle.
  • Loading branch information
MishaDemianenko committed Aug 21, 2015
1 parent 4fb21b3 commit 9d4c027
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 199 deletions.
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;

Expand All @@ -35,6 +36,7 @@ class ConcurrentMapState<Key> extends ActiveState<Key>
private final File file;
private final AtomicLong highestAppliedVersion;
private final AtomicLong appliedChanges;
private final AtomicBoolean hasTrackedChanges;
private final long previousVersion;

ConcurrentMapState( ReadableState<Key> store, File file )
Expand All @@ -45,13 +47,15 @@ class ConcurrentMapState<Key> extends ActiveState<Key>
this.highestAppliedVersion = new AtomicLong( previousVersion );
this.changes = new ConcurrentHashMap<>();
this.appliedChanges = new AtomicLong();
hasTrackedChanges = new AtomicBoolean();
}

private ConcurrentMapState( Prototype<Key> prototype, ReadableState<Key> store, File file )
{
super( store );
this.previousVersion = store.version();
this.file = file;
this.hasTrackedChanges = prototype.hasTrackedChanges;
this.changes = prototype.changes;
this.highestAppliedVersion = prototype.highestAppliedVersion;
this.appliedChanges = prototype.appliedChanges;
Expand All @@ -71,12 +75,14 @@ public EntryUpdater<Key> updater( long version, Lock lock )
return EntryUpdater.noUpdates();
}
update( highestAppliedVersion, version );
hasTrackedChanges.set( true );
return new Updater<>( lock, store, changes, appliedChanges );
}

@Override
public EntryUpdater<Key> unsafeUpdater( Lock lock )
{
hasTrackedChanges.set( true );
return new Updater<>( lock, store, changes, null );
}

Expand Down Expand Up @@ -214,10 +220,14 @@ private static class Prototype<Key> extends PrototypeState<Key>
{
final ConcurrentMap<Key, byte[]> changes = new ConcurrentHashMap<>();
final AtomicLong highestAppliedVersion, appliedChanges = new AtomicLong();
final AtomicBoolean hasTrackedChanges;
private final long threshold;

Prototype( ConcurrentMapState<Key> state, long version )
{
super( state );
threshold = version;
hasTrackedChanges = new AtomicBoolean();
this.highestAppliedVersion = new AtomicLong( version );
}

Expand All @@ -231,19 +241,28 @@ protected ActiveState<Key> create( ReadableState<Key> sub, File file )
protected EntryUpdater<Key> updater( long version, Lock lock )
{
update( highestAppliedVersion, version );
return new Updater<>( lock, store, changes, appliedChanges );
if ( version > threshold )
{
hasTrackedChanges.set( true );
return new Updater<>( lock, store, changes, appliedChanges );
}
else
{
return new Updater<>( lock, store, changes, null );
}
}

@Override
protected EntryUpdater<Key> unsafeUpdater( Lock lock )
{
hasTrackedChanges.set( true );
return new Updater<>( lock, store, changes, null );
}

@Override
protected boolean hasChanges()
{
return !changes.isEmpty();
return hasTrackedChanges.get() && !changes.isEmpty();
}

@Override
Expand Down Expand Up @@ -296,7 +315,7 @@ protected long applied()
@Override
protected boolean hasChanges()
{
return !changes.isEmpty();
return hasTrackedChanges.get() && !changes.isEmpty();
}

@Override
Expand Down
Expand Up @@ -23,7 +23,6 @@
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -34,11 +33,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import org.neo4j.function.Functions;
Expand Down Expand Up @@ -149,7 +148,6 @@ public void setUp()
@After
public void tearDown()
{
lifeRule.shutdown();
neoStore.close();
}

Expand Down Expand Up @@ -357,7 +355,7 @@ public Long call()

private void randomSleep() throws InterruptedException
{
Thread.sleep( new Random( System.currentTimeMillis() ).nextInt( 50 ) );
Thread.sleep( ThreadLocalRandom.current().nextInt( 50 ) );
}

private PhysicalTransactionRepresentation createPhysicalTransactionRepresentation()
Expand Down
Expand Up @@ -55,26 +55,28 @@

public class CountsTrackerTest
{
public final @Rule Resources the = new Resources( FILE_IN_EXISTING_DIRECTORY );
public final @Rule ThreadingRule threading = new ThreadingRule();
@Rule
public final Resources resourceManager = new Resources( FILE_IN_EXISTING_DIRECTORY );
@Rule
public final ThreadingRule threading = new ThreadingRule();

@Test
public void shouldBeAbleToStartAndStopTheStore() throws Exception
{
// given
the.managed( newTracker() );
resourceManager.managed( newTracker() );

// when
the.lifeStarts();
the.lifeShutsDown();
resourceManager.lifeStarts();
resourceManager.lifeShutsDown();
}

@Test
@Resources.Life(STARTED)
public void shouldBeAbleToWriteDataToCountsTracker() throws Exception
{
// given
CountsTracker tracker = the.managed( newTracker() );
CountsTracker tracker = resourceManager.managed( newTracker() );
CountsOracle oracle = new CountsOracle();
{
CountsOracle.Node a = oracle.node( 1 );
Expand Down Expand Up @@ -200,7 +202,8 @@ public void shouldBeAbleToReadUpToDateValueWhileAnotherThreadIsPerformingRotatio
{
final Barrier.Control barrier = new Barrier.Control();
CountsTracker tracker = life.add( new CountsTracker(
the.logProvider(), the.fileSystem(), the.pageCache(), new Config(), the.testPath() )
resourceManager.logProvider(), resourceManager.fileSystem(), resourceManager.pageCache(),
new Config(), resourceManager.testPath() )
{
@Override
protected boolean include( CountsKey countsKey, ReadableBuffer value )
Expand Down Expand Up @@ -255,7 +258,7 @@ public void shouldOrderStoreByTxIdInHeaderThenMinorVersion() throws Exception
public void shouldNotRotateIfNoDataChanges() throws Exception
{
// given
CountsTracker tracker = the.managed( newTracker() );
CountsTracker tracker = resourceManager.managed( newTracker() );
File before = tracker.currentFile();

// when
Expand All @@ -270,7 +273,7 @@ public void shouldNotRotateIfNoDataChanges() throws Exception
public void shouldRotateOnDataChangesEvenIfTransactionIsUnchanged() throws Exception
{
// given
CountsTracker tracker = the.managed( newTracker() );
CountsTracker tracker = resourceManager.managed( newTracker() );
File before = tracker.currentFile();
try ( CountsAccessor.IndexStatsUpdater updater = tracker.updateIndexCounts() )
{
Expand All @@ -289,7 +292,7 @@ public void shouldRotateOnDataChangesEvenIfTransactionIsUnchanged() throws Excep
public void shouldSupportTransactionsAppliedOutOfOrderOnRotation() throws Exception
{
// given
final CountsTracker tracker = the.managed( newTracker() );
final CountsTracker tracker = resourceManager.managed( newTracker() );
try ( CountsAccessor.Updater tx = tracker.apply( 2 ).get() )
{
tx.incrementNodeCount( 1, 1 );
Expand Down Expand Up @@ -344,7 +347,8 @@ public boolean test( Thread thread )

private CountsTracker newTracker()
{
return new CountsTracker( the.logProvider(), the.fileSystem(), the.pageCache(), new Config(), the.testPath() )
return new CountsTracker( resourceManager.logProvider(), resourceManager.fileSystem(),
resourceManager.pageCache(), new Config(), resourceManager.testPath() )
.setInitializer( new DataInitializer<CountsAccessor.Updater>()
{
@Override
Expand Down

0 comments on commit 9d4c027

Please sign in to comment.