From fe603c7e063e4355d932f49eca77a6617820a46f Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Thu, 14 Sep 2017 21:56:04 +0200 Subject: [PATCH] Revert "Changes how MarshlandPool works with inner transactions" This reverts commit a966220998c8b8b5ba870e923aadd4090cc1a3ba. --- .../neo4j/collection/pool/MarshlandPool.java | 115 ++++-------------- .../collection/pool/MarshlandPoolTest.java | 53 +------- 2 files changed, 28 insertions(+), 140 deletions(-) diff --git a/community/primitive-collections/src/main/java/org/neo4j/collection/pool/MarshlandPool.java b/community/primitive-collections/src/main/java/org/neo4j/collection/pool/MarshlandPool.java index 5024d66ecc826..9ba8c8c331cc9 100644 --- a/community/primitive-collections/src/main/java/org/neo4j/collection/pool/MarshlandPool.java +++ b/community/primitive-collections/src/main/java/org/neo4j/collection/pool/MarshlandPool.java @@ -63,8 +63,8 @@ protected LocalSlot initialValue() }; // Used to reclaim objects from dead threads - private final Set> slotReferences = - newSetFromMap( new ConcurrentHashMap, Boolean>() ); + private final Set slotReferences = + newSetFromMap( new ConcurrentHashMap() ); private final ReferenceQueue> objectsFromDeadThreads = new ReferenceQueue<>(); public MarshlandPool( Factory objectFactory ) @@ -84,22 +84,22 @@ public T acquire() LocalSlot localSlot = puddle.get(); T object = localSlot.object; - if ( object != null && localSlot.acquire() ) + if ( object != null ) { + localSlot.set( null ); return object; } // Try the reference queue, containing objects from dead threads - @SuppressWarnings( "unchecked" ) - LocalSlotReference slotReference = (LocalSlotReference) objectsFromDeadThreads.poll(); + LocalSlotReference slotReference = (LocalSlotReference) objectsFromDeadThreads.poll(); if ( slotReference != null && slotReference.object != null ) { slotReferences.remove( slotReference ); - return localSlot.assignIfNotAssigned( slotReference.object ); + return slotReference.object; } // Fall back to the delegate pool - return localSlot.assignIfNotAssigned( pool.acquire() ); + return pool.acquire(); } @Override @@ -108,35 +108,40 @@ public void release( T obj ) // Return it locally if possible LocalSlot localSlot = puddle.get(); - if ( !localSlot.release( obj ) ) - { // Fall back to the delegate pool + if ( localSlot.object == null ) + { + localSlot.set( obj ); + } + + // Fall back to the delegate pool + else + { pool.release( obj ); } - // else it was released back into the slot } /** * Dispose of all objects in this pool, releasing them back to the delegate pool */ - @SuppressWarnings( "unchecked" ) public void disposeAll() { - for ( LocalSlotReference slotReference : slotReferences ) + for ( LocalSlotReference slotReference : slotReferences ) { - LocalSlot slot = slotReference.get(); + LocalSlot slot = (LocalSlot) slotReference.get(); if ( slot != null ) { - T obj = slot.clear(); + T obj = slot.object; if ( obj != null ) { + slot.set( null ); pool.release( obj ); } } } - for ( LocalSlotReference reference = (LocalSlotReference) objectsFromDeadThreads.poll(); + for ( LocalSlotReference reference = (LocalSlotReference) objectsFromDeadThreads.poll(); reference != null; - reference = (LocalSlotReference) objectsFromDeadThreads.poll() ) + reference = (LocalSlotReference) objectsFromDeadThreads.poll() ) { T instance = reference.object; if ( instance != null ) @@ -154,11 +159,11 @@ public void close() /** * This is used to trigger the GC to notify us whenever the thread local has been garbage collected. */ - private static class LocalSlotReference extends WeakReference> + private static class LocalSlotReference extends WeakReference { private T object; - private LocalSlotReference( LocalSlot referent, ReferenceQueue> q ) + private LocalSlotReference( LocalSlot referent, ReferenceQueue q ) { super( referent, q ); } @@ -170,82 +175,14 @@ private LocalSlotReference( LocalSlot referent, ReferenceQueue { private T object; - private final LocalSlotReference phantomReference; - private boolean acquired; + private final LocalSlotReference phantomReference; LocalSlot( ReferenceQueue> referenceQueue ) { - phantomReference = new LocalSlotReference<>( this, referenceQueue ); - } - - T clear() - { - T result = acquired ? null : object; - set( null ); - acquired = false; - return result; - } - - /** - * Will assign {@code obj} to this slot if not already assigned. - * When calling this method this slot may be in different states: - *
    - *
  • object = null, acquired = false: initial assignment
  • - *
  • object != null, acquired = true: already assigned, but someone has it already
  • - *
- * - * @param obj instance to assign - * @return the {@code obj} for convenience for the caller - */ - T assignIfNotAssigned( T obj ) - { - if ( object == null ) - { - boolean wasAcquired = acquire(); - assert wasAcquired; - set( obj ); - } - else - { - assert acquired; - } - return obj; - } - - /** - * Marks this slot as not acquired anymore. This will only succeed if the released object matches the - * object in this slot. - * - * @param obj the object to release and to match with this slot object. - * @return whether or not {@code obj} matches the slot object. - */ - boolean release( T obj ) - { - if ( obj == object ) - { - assert acquired; - acquired = false; - return true; - } - return false; - } - - /** - * Marks this slot as acquired. Object must have been assigned at this point. - * - * @return {@code true} if this slot wasn't acquired when calling this method, otherwise {@code false}. - */ - boolean acquire() - { - if ( acquired ) - { - return false; - } - acquired = true; - return true; + phantomReference = new LocalSlotReference( this, referenceQueue ); } - private void set( T obj ) + public void set( T obj ) { phantomReference.object = obj; this.object = obj; diff --git a/community/primitive-collections/src/test/java/org/neo4j/collection/pool/MarshlandPoolTest.java b/community/primitive-collections/src/test/java/org/neo4j/collection/pool/MarshlandPoolTest.java index 3310eb2060d5f..bd207ff412fb3 100644 --- a/community/primitive-collections/src/test/java/org/neo4j/collection/pool/MarshlandPoolTest.java +++ b/community/primitive-collections/src/test/java/org/neo4j/collection/pool/MarshlandPoolTest.java @@ -23,10 +23,8 @@ import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.any; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -58,7 +56,7 @@ public void shouldReturnToDelegatePoolIfLocalPoolIsFull() throws Exception { // Given Pool delegatePool = mock( Pool.class ); - when( delegatePool.acquire() ).thenReturn( 1337, 1338L, 1339L ); + when(delegatePool.acquire()).thenReturn( 1337 ); final MarshlandPool pool = new MarshlandPool<>( delegatePool ); @@ -98,53 +96,6 @@ public void shouldReleaseAllSlotsOnClose() throws Exception verifyNoMoreInteractions( delegatePool ); } - /* - * This test is about how the LocalSlot works with nested use, i.e. that acquiring an instance from the local slot - * and while having it (before releasing it) acquiring a "nested" instance which gets released before releasing the - * first one. The test is about how that interacts with the delegate pool and that the nested instance should not - * take precedence over the first instance in the local slot. - */ - @Test - public void shouldHaveNestedUsageFallBackToDelegatePool() throws Exception - { - // given - Pool delegatePool = mock( Pool.class ); - when( delegatePool.acquire() ).thenReturn( 5, 6 ); - MarshlandPool pool = new MarshlandPool<>( delegatePool ); - - // when - Integer top = pool.acquire(); - assertEquals( 5, top.intValue() ); - verify( delegatePool, times( 1 ) ).acquire(); - - // do this a couple of times, just to verify that too - int hoops = 2; - for ( int i = 1; i <= hoops; i++ ) - { - // and when w/o releasing the top one, acquire a nested - Integer nested = pool.acquire(); - assertEquals( 6, nested.intValue() ); - verify( delegatePool, times( 1 + i ) ).acquire(); - - // releasing the nested - pool.release( nested ); - verify( delegatePool, times( i ) ).release( nested ); - } - - // when finally releasing the top one - verify( delegatePool, times( hoops ) ).release( anyInt() ); - pool.release( top ); - verify( delegatePool, times( hoops ) ).release( anyInt() ); - - // then the next acquire should see the top one - verify( delegatePool, times( 1 + hoops ) ).acquire(); - Integer topAgain = pool.acquire(); - verify( delegatePool, times( 1 + hoops ) ).acquire(); - assertEquals( 5, topAgain.intValue() ); - pool.release( topAgain ); - verify( delegatePool, times( hoops ) ).release( anyInt() ); - } - private void assertPoolEventuallyReturns( Pool pool, int expected ) throws InterruptedException { long maxTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis( 10 );