Skip to content

Commit

Permalink
Alternative Pool Strategies (#5218)
Browse files Browse the repository at this point in the history
* Speculative idea to make a pluggable Pool strategy

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Speculative idea to make a pluggable Pool strategy

 + javadoc

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Speculative idea to make a pluggable Pool strategy

 + Added a ThreadLocalStrategy for a single cached item
 + Tell strategies about newly reserved entries
 + Fixed multiplexing test that was dependent on the impl of the cache

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Speculative idea to make a pluggable Pool strategy

 + added tests

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Feedback from review

 + Don't have a fallback iteration, instead make a SearchStrategy and DualStrategy

* Feedback from review

 + split strategies into Cache and Strategies

* Feedback from review

 + Added reserve and release

* Improved Pool Strategies:

+ reverted to post notifications for removed, reserved and released.
+ Added a few more strategies that need to be benchmarked, that use the list iterator.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Testing all the different strategies

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* More simplifications and made LRU work (ish)

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* javadoc

* More javadoc

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* JMH Test

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* one strategy

Signed-off-by: gregw <gregw@webtide.com>

* test

Signed-off-by: gregw <gregw@webtide.com>

* Split implementations:

 + pluggable strategies
 + hard coded

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* More benchmarks

* Built in strategy

* removed strategies version and simplified to single configurable solution.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* updates from review

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* better javadoc

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Updated ConnectionPool classes to use Pool strategies

* Small javadocs fixes.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>

* Updates from review

* javadoc

Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
gregw and sbordet committed Sep 16, 2020
1 parent a37ab38 commit ba22c08
Show file tree
Hide file tree
Showing 11 changed files with 523 additions and 315 deletions.
Expand Up @@ -33,6 +33,7 @@
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
Expand All @@ -41,7 +42,7 @@
import static java.util.stream.Collectors.toCollection;

@ManagedObject
public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable, Sweeper.Sweepable
public abstract class AbstractConnectionPool extends ContainerLifeCycle implements ConnectionPool, Dumpable, Sweeper.Sweepable
{
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);

Expand All @@ -56,21 +57,26 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
@Deprecated
protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester)
{
this((HttpDestination)destination, maxConnections, true, requester);
this((HttpDestination)destination, maxConnections, false, requester);
}

protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
}

protected AbstractConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
{
this.destination = destination;
this.requester = requester;
@SuppressWarnings("unchecked")
Pool<Connection> pool = destination.getBean(Pool.class);
if (pool == null)
{
pool = new Pool<>(maxConnections, cache ? 1 : 0);
destination.addBean(pool);
}
this.pool = pool;
addBean(pool);
}

@Override
protected void doStop() throws Exception
{
pool.close();
}

@Override
Expand Down
Expand Up @@ -18,7 +18,9 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;

Expand All @@ -27,12 +29,17 @@ public class DuplexConnectionPool extends AbstractConnectionPool
{
public DuplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
this(destination, maxConnections, true, requester);
this(destination, maxConnections, false, requester);
}

public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
super(destination, maxConnections, cache, requester);
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
}

public DuplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
{
super(destination, pool, requester);
}

@Override
Expand Down

This file was deleted.

Expand Up @@ -22,6 +22,7 @@
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

Expand All @@ -41,40 +42,16 @@ protected void leaked(LeakInfo leakInfo)
public LeakTrackingConnectionPool(Destination destination, int maxConnections, Callback requester)
{
super((HttpDestination)destination, maxConnections, requester);
start();
}

private void start()
{
try
{
leakDetector.start();
}
catch (Exception x)
{
throw new RuntimeException(x);
}
addBean(leakDetector);
}

@Override
public void close()
{
stop();
LifeCycle.stop(this);
super.close();
}

private void stop()
{
try
{
leakDetector.stop();
}
catch (Exception x)
{
throw new RuntimeException(x);
}
}

@Override
protected void acquired(Connection connection)
{
Expand Down
Expand Up @@ -18,7 +18,9 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;

Expand All @@ -27,12 +29,17 @@ public class MultiplexConnectionPool extends AbstractConnectionPool implements C
{
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
this(destination, maxConnections, true, requester, maxMultiplex);
this(destination, maxConnections, false, requester, maxMultiplex);
}

public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, cache, requester);
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex);
}

public MultiplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester, int maxMultiplex)
{
super(destination, pool, requester);
setMaxMultiplex(maxMultiplex);
}

Expand Down
Expand Up @@ -18,26 +18,19 @@

package org.eclipse.jetty.client;

import java.util.concurrent.ThreadLocalRandom;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;

/**
* <p>An indexed {@link ConnectionPool} that provides connections
* <p>A {@link ConnectionPool} that provides connections
* randomly among the ones that are available.</p>
*/
@ManagedObject
public class RandomConnectionPool extends IndexedConnectionPool
public class RandomConnectionPool extends MultiplexConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, requester, maxMultiplex);
}

@Override
protected int getIndex(int maxConnections)
{
return ThreadLocalRandom.current().nextInt(maxConnections);
super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex);
}
}
Expand Up @@ -18,9 +18,8 @@

package org.eclipse.jetty.client;

import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;

/**
Expand All @@ -30,7 +29,7 @@
* <li>the server takes different times to serve different requests; if a request takes a long
* time to be processed by the server, it would be a performance penalty to stall sending requests
* waiting for that connection to be available - better skip it and try another connection</li>
* <li>connections may be closed by the client or by the server, so it should be a performance
* <li>connections may be closed by the client or by the server, so it would be a performance
* penalty to stall sending requests waiting for a new connection to be opened</li>
* <li>thread scheduling on both client and server may temporarily penalize a connection</li>
* </ul>
Expand All @@ -48,28 +47,20 @@
* @see RandomConnectionPool
*/
@ManagedObject
public class RoundRobinConnectionPool extends IndexedConnectionPool
public class RoundRobinConnectionPool extends MultiplexConnectionPool
{
private final AtomicInteger offset = new AtomicInteger();

public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
this(destination, maxConnections, requester, 1);
}

public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, requester, maxMultiplex);
super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex);
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
// those that were closed to process queued requests.
setMaximizeConnections(true);
}

@Override
protected int getIndex(int maxConnections)
{
return offset.getAndUpdate(v -> ++v == maxConnections ? 0 : v);
}
}

0 comments on commit ba22c08

Please sign in to comment.