Skip to content

Commit

Permalink
Simplify code
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed May 8, 2017
1 parent 507f311 commit d84082a
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 66 deletions.
Expand Up @@ -96,10 +96,9 @@ public class KernelTransactions extends LifecycleAdapter implements Supplier<Ker
*/
private final Set<KernelTransactionImplementation> allTransactions = newSetFromMap( new ConcurrentHashMap<>() );

// This is the factory that actually builds brand-new instances.
private final Factory<KernelTransactionImplementation> factory = new KernelTransactionImplementationFactory( allTransactions );
// Global pool of transactions, wrapped by the thread-local marshland pool and so is not used directly.
private final LinkedQueuePool<KernelTransactionImplementation> globalTxPool = new GlobalKernelTransactionPool( allTransactions, factory );
private final LinkedQueuePool<KernelTransactionImplementation> globalTxPool =
new GlobalKernelTransactionPool( allTransactions );
// Pool of unused transactions.
private final MarshlandPool<KernelTransactionImplementation> localTxPool = new MarshlandPool<>( globalTxPool );

Expand Down Expand Up @@ -288,17 +287,6 @@ KernelTransactionHandle createHandle( KernelTransactionImplementation tx )
return new KernelTransactionImplementationHandle( tx );
}

/**
* Get all transactions
* * <p>
* <b>Note:</b> this method is package-private for testing <b>only</b>.
* @return set of all kernel transaction
*/
Set<KernelTransactionImplementation> getAllTransactions()
{
return allTransactions;
}

private void assertRunning()
{
if ( availabilityGuard.isShutdown() )
Expand All @@ -320,17 +308,18 @@ private void assertCurrentThreadIsNotBlockingNewTransactions()
}
}

private class KernelTransactionImplementationFactory implements Factory<KernelTransactionImplementation>
private class GlobalKernelTransactionPool extends LinkedQueuePool<KernelTransactionImplementation>
{
private Set<KernelTransactionImplementation> transactions;

KernelTransactionImplementationFactory( Set<KernelTransactionImplementation> transactions )
GlobalKernelTransactionPool( Set<KernelTransactionImplementation> transactions )
{
super( 8 );
this.transactions = transactions;
}

@Override
public KernelTransactionImplementation newInstance()
protected KernelTransactionImplementation create()
{
KernelTransactionImplementation tx =
new KernelTransactionImplementation( statementOperations, schemaWriteGuard, hooks,
Expand All @@ -341,18 +330,6 @@ public KernelTransactionImplementation newInstance()
this.transactions.add( tx );
return tx;
}
}

private class GlobalKernelTransactionPool extends LinkedQueuePool<KernelTransactionImplementation>
{
private Set<KernelTransactionImplementation> transactions;

GlobalKernelTransactionPool( Set<KernelTransactionImplementation> transactions,
Factory<KernelTransactionImplementation> factory )
{
super( 8, factory );
this.transactions = transactions;
}

@Override
protected void dispose( KernelTransactionImplementation tx )
Expand Down
Expand Up @@ -24,9 +24,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;

import org.neo4j.function.Factory;

public class LinkedQueuePool<R> implements Pool<R>
public abstract class LinkedQueuePool<R> implements Pool<R>
{
public interface Monitor<R>
{
Expand Down Expand Up @@ -105,39 +103,34 @@ public boolean shouldCheck()
}
}

public static final int DEFAULT_CHECK_INTERVAL = 60 * 1000;
private static final int DEFAULT_CHECK_INTERVAL = 60 * 1000;

private final Queue<R> unused = new ConcurrentLinkedQueue<>();
private final Monitor monitor;
private final Monitor<R> monitor;
private final int minSize;
private final Factory<R> factory;
private final CheckStrategy checkStrategy;
// Guarded by nothing. Those are estimates, losing some values doesn't matter much
private final AtomicInteger allocated = new AtomicInteger( 0 );
private final AtomicInteger queueSize = new AtomicInteger( 0 );
private final AtomicInteger allocated = new AtomicInteger();
private final AtomicInteger queueSize = new AtomicInteger();
private int currentPeakSize;
private int targetSize;

public LinkedQueuePool( int minSize, Factory<R> factory )
public LinkedQueuePool( int minSize )
{
this( minSize, factory, new CheckStrategy.TimeoutCheckStrategy( DEFAULT_CHECK_INTERVAL ),
new Monitor.Adapter() );
this( minSize, new CheckStrategy.TimeoutCheckStrategy( DEFAULT_CHECK_INTERVAL ),
new Monitor.Adapter<>() );
}

public LinkedQueuePool( int minSize, Factory<R> factory, CheckStrategy strategy, Monitor monitor )
LinkedQueuePool( int minSize, CheckStrategy strategy, Monitor<R> monitor )
{
this.minSize = minSize;
this.factory = factory;
this.currentPeakSize = 0;
this.targetSize = minSize;
this.checkStrategy = strategy;
this.monitor = monitor;
}

protected R create()
{
return factory.newInstance();
}
protected abstract R create();

protected void dispose( R resource )
{
Expand Down
Expand Up @@ -24,8 +24,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.neo4j.function.Factory;

import static java.util.Collections.newSetFromMap;

/**
Expand Down Expand Up @@ -67,11 +65,6 @@ protected LocalSlot<T> initialValue()
newSetFromMap( new ConcurrentHashMap<LocalSlotReference, Boolean>() );
private final ReferenceQueue<LocalSlot<T>> objectsFromDeadThreads = new ReferenceQueue<>();

public MarshlandPool( Factory<T> objectFactory )
{
this( new LinkedQueuePool<>( 4, objectFactory ) );
}

public MarshlandPool( Pool<T> delegatePool )
{
this.pool = delegatePool;
Expand All @@ -91,6 +84,7 @@ public T acquire()
}

// Try the reference queue, containing objects from dead threads
@SuppressWarnings( "unchecked" )
LocalSlotReference<T> slotReference = (LocalSlotReference) objectsFromDeadThreads.poll();
if ( slotReference != null && slotReference.object != null )
{
Expand All @@ -112,17 +106,17 @@ public void release( T obj )
{
localSlot.set( obj );
}

// Fall back to the delegate pool
else
{
// Fall back to the delegate pool
pool.release( obj );
}
}

/**
* Dispose of all objects in this pool, releasing them back to the delegate pool
*/
@SuppressWarnings( "unchecked" )
public void disposeAll()
{
for ( LocalSlotReference slotReference : slotReferences )
Expand Down Expand Up @@ -151,11 +145,6 @@ public void disposeAll()
}
}

public void close()
{
disposeAll();
}

/**
* This is used to trigger the GC to notify us whenever the thread local has been garbage collected.
*/
Expand Down
Expand Up @@ -300,8 +300,15 @@ private void buildAPeakOfAcquiredFlyweightsAndTriggerAlarmWithSideEffects( int M

private LinkedQueuePool<Object> getLinkedQueuePool( StatefulMonitor stateMonitor, FakeClock clock, int minSize )
{
return new LinkedQueuePool<>( minSize, Object::new,
new LinkedQueuePool.CheckStrategy.TimeoutCheckStrategy( 100, clock ), stateMonitor );
return new LinkedQueuePool<Object>( minSize,
new LinkedQueuePool.CheckStrategy.TimeoutCheckStrategy( 100, clock ), stateMonitor )
{
@Override
protected Object create()
{
return new Object();
}
};
}

private <R> List<FlyweightHolder<R>> acquireFromPool( final LinkedQueuePool<R> pool, int times )
Expand Down
Expand Up @@ -76,7 +76,7 @@ public void shouldReturnToDelegatePoolIfLocalPoolIsFull() throws Exception
}

@Test
public void shouldReleaseAllSlotsOnClose() throws Exception
public void shouldReleaseAllSlotsOnDisposeAll() throws Exception
{
// Given
Pool<Object> delegatePool = mock( Pool.class );
Expand All @@ -88,7 +88,7 @@ public void shouldReleaseAllSlotsOnClose() throws Exception
pool.release( first );

// When
pool.close();
pool.disposeAll();

// Then
verify( delegatePool, times( 1 ) ).acquire();
Expand Down
Expand Up @@ -254,7 +254,7 @@ public void close()
private static class ForsetiClientFlyweightPool extends LinkedQueuePool<ForsetiClient>
{
/** Client id counter **/
private final AtomicInteger clientIds = new AtomicInteger( 0 );
private final AtomicInteger clientIds = new AtomicInteger();

/** Re-use ids, forseti uses these in arrays, so we want to keep them low and not loose them. */
// TODO we could use a synchronised SimpleBitSet instead, since we know that we only care about reusing a
Expand All @@ -270,7 +270,7 @@ private static class ForsetiClientFlyweightPool extends LinkedQueuePool<ForsetiC
ForsetiClientFlyweightPool( Config config, Clock clock, ConcurrentMap<Long,Lock>[] lockMaps,
WaitStrategy<AcquireLockTimeoutException>[] waitStrategies )
{
super( 128, null );
super( 128 );
this.config = config;
this.clock = clock;
this.lockMaps = lockMaps;
Expand Down

0 comments on commit d84082a

Please sign in to comment.