Skip to content

Commit

Permalink
Extract channel pool from CatchUpClient.
Browse files Browse the repository at this point in the history
ALso fix a leak where the pool could only store one channel per address.
  • Loading branch information
apcj authored and Max Sumrall committed Sep 16, 2016
1 parent 70ebef8 commit caf8577
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 39 deletions.
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

import org.neo4j.coreedge.discovery.NoKnownAddressesException;
import org.neo4j.helpers.AdvertisedSocketAddress;

class CatchUpChannelPool<CHANNEL extends CatchUpChannelPool.Channel>
{
private final Map<AdvertisedSocketAddress, LinkedList<CHANNEL>> idleChannels = new HashMap<>();
private final Set<CHANNEL> activeChannels = new HashSet<>();
private final Function<AdvertisedSocketAddress, CHANNEL> factory;

CatchUpChannelPool( Function<AdvertisedSocketAddress, CHANNEL> factory )
{
this.factory = factory;
}

synchronized CHANNEL acquire( AdvertisedSocketAddress catchUpAddress ) throws NoKnownAddressesException
{
CHANNEL channel;
LinkedList<CHANNEL> channels = idleChannels.get( catchUpAddress );
if ( channels == null )
{
channel = factory.apply( catchUpAddress );
}
else
{
channel = channels.poll();
if ( channels.isEmpty() )
{
idleChannels.remove( catchUpAddress );
}
}

activeChannels.add( channel );
return channel;
}

synchronized void dispose( CHANNEL channel )
{
activeChannels.remove( channel );
channel.close();
}

synchronized void release( CHANNEL channel )
{
activeChannels.remove( channel );
idleChannels.computeIfAbsent( channel.destination(), (address) -> new LinkedList<>() ).add( channel );
}

synchronized void close()
{
idleChannels.values().stream().flatMap( Collection::stream )
.forEach( Channel::close );
activeChannels.forEach( Channel::close );
}

interface Channel
{
AdvertisedSocketAddress destination();

void close();
}
}
Expand Up @@ -28,10 +28,6 @@
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;


import java.time.Clock; import java.time.Clock;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


Expand All @@ -50,14 +46,12 @@


public class CatchUpClient extends LifecycleAdapter public class CatchUpClient extends LifecycleAdapter
{ {
private final Map<AdvertisedSocketAddress, CatchUpChannel> idleChannels = new HashMap<>();
private final Set<CatchUpChannel> activeChannels = new HashSet<>();

private final LogProvider logProvider; private final LogProvider logProvider;
private final TopologyService discoveryService; private final TopologyService discoveryService;
private final Log log; private final Log log;
private final Clock clock; private final Clock clock;
private final Monitors monitors; private final Monitors monitors;
private final CatchUpChannelPool<CatchUpChannel> pool = new CatchUpChannelPool<>( CatchUpChannel::new );


private NioEventLoopGroup eventLoopGroup; private NioEventLoopGroup eventLoopGroup;


Expand All @@ -76,16 +70,18 @@ public <T> T makeBlockingRequest( MemberId memberId, CatchUpRequest request,
throws CatchUpClientException, NoKnownAddressesException throws CatchUpClientException, NoKnownAddressesException
{ {
CompletableFuture<T> future = new CompletableFuture<>(); CompletableFuture<T> future = new CompletableFuture<>();
CatchUpChannel channel = acquireChannel( memberId ); AdvertisedSocketAddress catchUpAddress =
discoveryService.coreServers().find( memberId ).getCatchupServer();
CatchUpChannel channel = pool.acquire( catchUpAddress );


future.whenComplete( ( result, e ) -> { future.whenComplete( ( result, e ) -> {
if ( e == null ) if ( e == null )
{ {
release( channel ); pool.release( channel );
} }
else else
{ {
dispose( channel ); pool.dispose( channel );
} }
} ); } );


Expand All @@ -98,32 +94,7 @@ public <T> T makeBlockingRequest( MemberId memberId, CatchUpRequest request,
channel::millisSinceLastResponse, inactivityTimeout, timeUnit ); channel::millisSinceLastResponse, inactivityTimeout, timeUnit );
} }


private synchronized void dispose( CatchUpChannel channel ) private class CatchUpChannel implements CatchUpChannelPool.Channel
{
activeChannels.remove( channel );
channel.close();
}

private synchronized void release( CatchUpChannel channel )
{
activeChannels.remove( channel );
idleChannels.put( channel.destination, channel );
}

private synchronized CatchUpChannel acquireChannel( MemberId memberId ) throws NoKnownAddressesException
{
AdvertisedSocketAddress catchUpAddress =
discoveryService.coreServers().find( memberId ).getCatchupServer();
CatchUpChannel channel = idleChannels.remove( catchUpAddress );
if ( channel == null )
{
channel = new CatchUpChannel( catchUpAddress );
}
activeChannels.add( channel );
return channel;
}

private class CatchUpChannel
{ {
private final TrackingResponseHandler handler; private final TrackingResponseHandler handler;
private final AdvertisedSocketAddress destination; private final AdvertisedSocketAddress destination;
Expand Down Expand Up @@ -166,7 +137,14 @@ long millisSinceLastResponse()
return clock.millis() - handler.lastResponseTime(); return clock.millis() - handler.lastResponseTime();
} }


void close() @Override
public AdvertisedSocketAddress destination()
{
return destination;
}

@Override
public void close()
{ {
nettyChannel.close(); nettyChannel.close();
} }
Expand All @@ -184,8 +162,7 @@ public void stop() throws Throwable
log.info( "CatchUpClient stopping" ); log.info( "CatchUpClient stopping" );
try try
{ {
idleChannels.values().forEach( CatchUpChannel::close ); pool.close();
activeChannels.forEach( CatchUpChannel::close );
eventLoopGroup.shutdownGracefully( 0, 0, MICROSECONDS ).sync(); eventLoopGroup.shutdownGracefully( 0, 0, MICROSECONDS ).sync();
} }
catch ( InterruptedException e ) catch ( InterruptedException e )
Expand Down
@@ -0,0 +1,133 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.catchup;


import org.junit.Test;

import org.neo4j.helpers.AdvertisedSocketAddress;

import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

public class CatchUpChannelPoolTest
{
@Test
public void shouldReUseAChannelThatWasReleased() throws Exception
{
// given
CatchUpChannelPool<TestChannel> pool = new CatchUpChannelPool<>( TestChannel::new );

// when
TestChannel channelA = pool.acquire( localAddress( 1 ) );
pool.release( channelA );
TestChannel channelB = pool.acquire( localAddress( 1 ) );

// then
assertSame( channelA, channelB );
}

@Test
public void shouldCreateANewChannelIfFirstChannelIsDisposed() throws Exception
{
// given
CatchUpChannelPool<TestChannel> pool = new CatchUpChannelPool<>( TestChannel::new );

// when
TestChannel channelA = pool.acquire( localAddress( 1 ) );
pool.dispose( channelA );
TestChannel channelB = pool.acquire( localAddress( 1 ) );

// then
assertNotSame( channelA, channelB );
}

@Test
public void shouldCreateANewChannelIfFirstChannelIsStillActive() throws Exception
{
// given
CatchUpChannelPool<TestChannel> pool = new CatchUpChannelPool<>( TestChannel::new );

// when
TestChannel channelA = pool.acquire( localAddress( 1 ) );
TestChannel channelB = pool.acquire( localAddress( 1 ) );

// then
assertNotSame( channelA, channelB );
}

@Test
public void shouldCleanUpOnClose() throws Exception
{
// given
CatchUpChannelPool<TestChannel> pool = new CatchUpChannelPool<>( TestChannel::new );

TestChannel channelA = pool.acquire( localAddress( 1 ) );
TestChannel channelB = pool.acquire( localAddress( 1 ) );
TestChannel channelC = pool.acquire( localAddress( 1 ) );

pool.release( channelA );
pool.release( channelC );

TestChannel channelD = pool.acquire( localAddress( 2 ) );
TestChannel channelE = pool.acquire( localAddress( 2 ) );
TestChannel channelF = pool.acquire( localAddress( 2 ) );

// when
pool.close();

// then
assertTrue( channelA.closed );
assertTrue( channelB.closed );
assertTrue( channelC.closed );
assertTrue( channelD.closed );
assertTrue( channelE.closed );
assertTrue( channelF.closed );
}

private static class TestChannel implements CatchUpChannelPool.Channel
{
private final AdvertisedSocketAddress address;
private boolean closed;

TestChannel( AdvertisedSocketAddress address )
{
this.address = address;
}

@Override
public AdvertisedSocketAddress destination()
{
return address;
}

@Override
public void close()
{
closed = true;
}
}

private static AdvertisedSocketAddress localAddress( int port )
{
return new AdvertisedSocketAddress( "localhost", port );
}
}

0 comments on commit caf8577

Please sign in to comment.