Skip to content

Commit

Permalink
Don't add the same connection multiple times
Browse files Browse the repository at this point in the history
The same connections was being added to the queue if close was called more
than once. Eventually the queue will be full and we close the connection, but
since the queue now contains multiple identical copies we will also close these.
  • Loading branch information
pontusmelke committed Oct 6, 2016
1 parent d17b10a commit cdc79ca
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.neo4j.driver.internal.net.pooling;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.Collector;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class PooledConnection implements Connection
private Runnable onError = null;
private final Clock clock;
private long lastUsed;
private final AtomicBoolean released = new AtomicBoolean( false );

public PooledConnection( Connection delegate, Consumer<PooledConnection> release, Clock clock )
{
Expand All @@ -67,8 +69,9 @@ public PooledConnection( Connection delegate, Consumer<PooledConnection> release
this.lastUsed = clock.millis();
}

public void updateUsageTimestamp()
public void setInUse()
{
released.set(false);
lastUsed = clock.millis();
}

Expand Down Expand Up @@ -197,7 +200,10 @@ public void receiveOne()
*/
public void close()
{
release.accept( this );
if ( released.compareAndSet( false, true ))
{
release.accept( this );
}
// put the full logic of deciding whether to dispose the connection or to put it back to
// the pool into the release object
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public Connection acquire( BoltServerAddress address )
conn = new PooledConnection( connect( address ), new
PooledConnectionReleaseConsumer( connections, stopped, new PooledConnectionValidator( this, poolSettings ) ), clock );
}
conn.updateUsageTimestamp();
conn.setInUse();
return conn;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,40 @@ public void dispose()
assertThat( flags[0], equalTo( false ) );
}

@Test
public void shouldOnlyReturnOnceEventhougCloseIsBeingCalledMultipleTimes() throws Throwable
{
// Given
final BlockingQueue<PooledConnection> pool = new LinkedBlockingQueue<>(2);

final boolean[] flags = {false};

Connection conn = mock( Connection.class );
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
new AtomicBoolean( false ), VALID_CONNECTION );

PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
{
@Override
public void dispose()
{
flags[0] = true;
}
};

// When
pooledConnection.close();
pooledConnection.close();
pooledConnection.close();
pooledConnection.close();
pooledConnection.close();

// Then
assertThat( pool, hasItem(pooledConnection) );
assertThat( pool.size(), equalTo( 1 ) );
assertThat( flags[0], equalTo( false ) );
}

@Test
public void shouldDisposeConnectionIfValidConnectionAndIdlePoolIsFull() throws Throwable
{
Expand Down

0 comments on commit cdc79ca

Please sign in to comment.