Skip to content

Commit

Permalink
Use LocalSlotReferences to close all hanging transaction on pool close.
Browse files Browse the repository at this point in the history
Before we were using weak references and reference queue when pool was
closed to cleanup all of the objects that were allocated but not yes disposed.
From time to time that was not possible since we
were able to observe state where WeakReference was already cleaned
but not yet visible in reference queue. That would cause some of the
objects (in our case transactions) to not release their resources properly.
For example batch of allocated ids will never be returned and will be lost
until id generator file rebuild.
This PR changes that behaviour and use map of slotReferences to reliably
close all entries that available in the pool.
PR also makes pools AutoClosable.
  • Loading branch information
MishaDemianenko committed Mar 12, 2018
1 parent f0c8dc0 commit 81ef239
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 69 deletions.
Expand Up @@ -172,11 +172,6 @@ public KernelTransactions( StatementLocksFactory statementLocksFactory,
this.collectionsFactorySupplier = collectionsFactorySupplier; this.collectionsFactorySupplier = collectionsFactorySupplier;
} }


public Supplier<ExplicitIndexTransactionState> explicitIndexTxStateSupplier()
{
return explicitIndexTxStateSupplier;
}

public KernelTransaction newInstance( KernelTransaction.Type type, LoginContext loginContext, long timeout ) public KernelTransaction newInstance( KernelTransaction.Type type, LoginContext loginContext, long timeout )
{ {
assertCurrentThreadIsNotBlockingNewTransactions(); assertCurrentThreadIsNotBlockingNewTransactions();
Expand Down Expand Up @@ -232,8 +227,8 @@ public Set<KernelTransactionHandle> activeTransactions()
public void disposeAll() public void disposeAll()
{ {
terminateTransactions(); terminateTransactions();
localTxPool.disposeAll(); localTxPool.close();
globalTxPool.disposeAll(); globalTxPool.close();
} }


public void terminateTransactions() public void terminateTransactions()
Expand Down
Expand Up @@ -81,7 +81,7 @@ public void close()
this.idBatches.close(); this.idBatches.close();
} }


public TransactionRecordState createTransactionRecordState( IntegrityValidator integrityValidator, long lastTransactionIdWhenStarted, TransactionRecordState createTransactionRecordState( IntegrityValidator integrityValidator, long lastTransactionIdWhenStarted,
ResourceLocker locks ) ResourceLocker locks )
{ {
RecordChangeSet recordChangeSet = new RecordChangeSet( loaders ); RecordChangeSet recordChangeSet = new RecordChangeSet( loaders );
Expand Down
Expand Up @@ -37,7 +37,7 @@ public class RenewableBatchIdSequence implements IdSequence, Resource
private IdSequence currentBatch; private IdSequence currentBatch;
private boolean closed; private boolean closed;


public RenewableBatchIdSequence( IdSequence source, int batchSize, LongConsumer excessIdConsumer ) RenewableBatchIdSequence( IdSequence source, int batchSize, LongConsumer excessIdConsumer )
{ {
this.source = source; this.source = source;
this.batchSize = batchSize; this.batchSize = batchSize;
Expand Down
Expand Up @@ -166,7 +166,7 @@ public void shouldDisposeTransactionsWhenAsked() throws Throwable
assertThat( postDispose, not( equalTo( first ) ) ); assertThat( postDispose, not( equalTo( first ) ) );
assertThat( postDispose, not( equalTo( second ) ) ); assertThat( postDispose, not( equalTo( second ) ) );


assertTrue( leftOpen.getReasonIfTerminated() != null ); assertNotNull( leftOpen.getReasonIfTerminated() );
} }


@Test @Test
Expand Down
Expand Up @@ -24,11 +24,11 @@


import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicIntegerArray;


import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.impl.util.collection.SimpleBitSet;
import org.neo4j.test.rule.DatabaseRule; import org.neo4j.test.rule.DatabaseRule;
import org.neo4j.test.rule.EmbeddedDatabaseRule; import org.neo4j.test.rule.EmbeddedDatabaseRule;


Expand Down Expand Up @@ -76,7 +76,7 @@ public void shouldBeAbleToReuseAllIdsInConcurrentCommitsWithRestart() throws Exc
int threads = Runtime.getRuntime().availableProcessors(); int threads = Runtime.getRuntime().availableProcessors();
int batchSize = Integer.parseInt( GraphDatabaseSettings.record_id_batch_size.getDefaultValue() ); int batchSize = Integer.parseInt( GraphDatabaseSettings.record_id_batch_size.getDefaultValue() );
ExecutorService executor = Executors.newFixedThreadPool( threads ); ExecutorService executor = Executors.newFixedThreadPool( threads );
AtomicIntegerArray createdIds = new AtomicIntegerArray( threads * batchSize ); SimpleBitSet usedIds = new SimpleBitSet( threads * batchSize );
for ( int i = 0; i < threads; i++ ) for ( int i = 0; i < threads; i++ )
{ {
executor.submit( () -> executor.submit( () ->
Expand All @@ -86,7 +86,7 @@ public void shouldBeAbleToReuseAllIdsInConcurrentCommitsWithRestart() throws Exc
for ( int j = 0; j < batchSize / 2; j++ ) for ( int j = 0; j < batchSize / 2; j++ )
{ {
int index = toIntExact( db.createNode().getId() ); int index = toIntExact( db.createNode().getId() );
createdIds.set( index, 1 ); usedIds.put( index );
} }
tx.success(); tx.success();
} }
Expand All @@ -96,27 +96,27 @@ public void shouldBeAbleToReuseAllIdsInConcurrentCommitsWithRestart() throws Exc
while ( !executor.awaitTermination( 1, SECONDS ) ) while ( !executor.awaitTermination( 1, SECONDS ) )
{ // Just wait longer { // Just wait longer
} }
assertFalse( allSet( createdIds ) ); assertFalse( allSet( usedIds ) );


// when/then // when/then
db.restartDatabase(); db.restartDatabase();
try ( Transaction tx = db.beginTx() ) try ( Transaction tx = db.beginTx() )
{ {
while ( !allSet( createdIds ) ) while ( !allSet( usedIds ) )
{ {
int index = toIntExact( db.createNode().getId() ); int index = toIntExact( db.createNode().getId() );
assert createdIds.get( index ) != 1; assert !usedIds.contains( index );
createdIds.set( index, 1 ); usedIds.put( index );
} }
tx.success(); tx.success();
} }
} }


private static boolean allSet( AtomicIntegerArray values ) private static boolean allSet( SimpleBitSet bitSet )
{ {
for ( int i = 0; i < values.length(); i++ ) for ( int i = 0; i < bitSet.size(); i++ )
{ {
if ( values.get( i ) == 0 ) if ( !bitSet.contains( i ) )
{ {
return false; return false;
} }
Expand Down
Expand Up @@ -189,7 +189,8 @@ public void release( R toRelease )
/** /**
* Dispose of all pooled objects. * Dispose of all pooled objects.
*/ */
public void disposeAll() @Override
public void close()
{ {
for ( R resource = unused.poll(); resource != null; resource = unused.poll() ) for ( R resource = unused.poll(); resource != null; resource = unused.poll() )
{ {
Expand Down
Expand Up @@ -24,8 +24,6 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;


import org.neo4j.function.Factory;

import static java.util.Collections.newSetFromMap; import static java.util.Collections.newSetFromMap;


/** /**
Expand Down Expand Up @@ -57,21 +55,15 @@ public class MarshlandPool<T> implements Pool<T>
protected LocalSlot<T> initialValue() protected LocalSlot<T> initialValue()
{ {
LocalSlot<T> localSlot = new LocalSlot<>( objectsFromDeadThreads ); LocalSlot<T> localSlot = new LocalSlot<>( objectsFromDeadThreads );
slotReferences.add( localSlot.phantomReference ); slotReferences.add( localSlot.slotWeakReference );
return localSlot; return localSlot;
} }
}; };


// Used to reclaim objects from dead threads // Used to reclaim objects from dead threads
private final Set<LocalSlotReference> slotReferences = private final Set<LocalSlotReference> slotReferences = newSetFromMap( new ConcurrentHashMap<>() );
newSetFromMap( new ConcurrentHashMap<LocalSlotReference, Boolean>() );
private final ReferenceQueue<LocalSlot<T>> objectsFromDeadThreads = new ReferenceQueue<>(); private final ReferenceQueue<LocalSlot<T>> objectsFromDeadThreads = new ReferenceQueue<>();


public MarshlandPool( Factory<T> objectFactory )
{
this( new LinkedQueuePool<>( 4, objectFactory ) );
}

public MarshlandPool( Pool<T> delegatePool ) public MarshlandPool( Pool<T> delegatePool )
{ {
this.pool = delegatePool; this.pool = delegatePool;
Expand Down Expand Up @@ -123,69 +115,58 @@ public void release( T obj )
/** /**
* Dispose of all objects in this pool, releasing them back to the delegate pool * Dispose of all objects in this pool, releasing them back to the delegate pool
*/ */
public void disposeAll() @Override
public void close()
{ {
for ( LocalSlotReference slotReference : slotReferences ) for ( LocalSlotReference slotReference : slotReferences )
{ {
LocalSlot<T> slot = (LocalSlot) slotReference.get(); T object = (T) slotReference.object;
if ( slot != null ) if ( object != null )
{ {
T obj = slot.object; slotReference.object = null;
if ( obj != null ) LocalSlot<T> slot = (LocalSlot<T>) slotReference.get();
if ( slot != null )
{ {
slot.set( null ); slot.set( null );
pool.release( obj );
} }
slotReference.clear();
pool.release( object );
} }
} }

slotReferences.clear();
for ( LocalSlotReference<T> reference = (LocalSlotReference) objectsFromDeadThreads.poll();
reference != null;
reference = (LocalSlotReference) objectsFromDeadThreads.poll() )
{
T instance = reference.object;
if ( instance != null )
{
pool.release( instance );
}
}
}

public void close()
{
disposeAll();
} }


/** /**
* This is used to trigger the GC to notify us whenever the thread local has been garbage collected. * Container for the "puddle", the small local pool each thread keeps.
*/ */
private static class LocalSlotReference<T> extends WeakReference<LocalSlot> private static class LocalSlot<T>
{ {

private T object; private T object;
private final LocalSlotReference slotWeakReference;


private LocalSlotReference( LocalSlot referent, ReferenceQueue<? super LocalSlot> q ) LocalSlot( ReferenceQueue<LocalSlot<T>> referenceQueue )
{ {
super( referent, q ); slotWeakReference = new LocalSlotReference( this, referenceQueue );
}

public void set( T obj )
{
slotWeakReference.object = obj;
this.object = obj;
} }
} }


/** /**
* Container for the "puddle", the small local pool each thread keeps. * This is used to trigger the GC to notify us whenever the thread local has been garbage collected.
*/ */
private static class LocalSlot<T> private static class LocalSlotReference<T> extends WeakReference<LocalSlot>
{ {
private T object; private T object;
private final LocalSlotReference phantomReference;

LocalSlot( ReferenceQueue<LocalSlot<T>> referenceQueue )
{
phantomReference = new LocalSlotReference( this, referenceQueue );
}


public void set( T obj ) private LocalSlotReference( LocalSlot referent, ReferenceQueue<? super LocalSlot> q )
{ {
phantomReference.object = obj; super( referent, q );
this.object = obj;
} }
} }
} }
Expand Up @@ -19,7 +19,7 @@
*/ */
package org.neo4j.collection.pool; package org.neo4j.collection.pool;


public interface Pool<T> public interface Pool<T> extends AutoCloseable
{ {
T acquire(); T acquire();


Expand Down

0 comments on commit 81ef239

Please sign in to comment.