From ba22c08fde5362fe2e89e9c417d2a90658c74283 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 16 Sep 2020 18:27:26 +0200 Subject: [PATCH] Alternative Pool Strategies (#5218) * Speculative idea to make a pluggable Pool strategy Signed-off-by: Greg Wilkins * Speculative idea to make a pluggable Pool strategy + javadoc Signed-off-by: Greg Wilkins * Speculative idea to make a pluggable Pool strategy + Added a ThreadLocalStrategy for a single cached item + Tell strategies about newly reserved entries + Fixed multiplexing test that was dependent on the impl of the cache Signed-off-by: Greg Wilkins * Speculative idea to make a pluggable Pool strategy + added tests Signed-off-by: Greg Wilkins * Feedback from review + Don't have a fallback iteration, instead make a SearchStrategy and DualStrategy * Feedback from review + split strategies into Cache and Strategies * Feedback from review + Added reserve and release * Improved Pool Strategies: + reverted to post notifications for removed, reserved and released. + Added a few more strategies that need to be benchmarked, that use the list iterator. Signed-off-by: Greg Wilkins * Testing all the different strategies Signed-off-by: Greg Wilkins * More simplifications and made LRU work (ish) Signed-off-by: Greg Wilkins * javadoc * More javadoc Signed-off-by: Greg Wilkins * JMH Test Signed-off-by: Greg Wilkins * one strategy Signed-off-by: gregw * test Signed-off-by: gregw * Split implementations: + pluggable strategies + hard coded Signed-off-by: Greg Wilkins * More benchmarks * Built in strategy * removed strategies version and simplified to single configurable solution. Signed-off-by: Greg Wilkins * updates from review Signed-off-by: Greg Wilkins * better javadoc Signed-off-by: Greg Wilkins * Updated ConnectionPool classes to use Pool strategies * Small javadocs fixes. Signed-off-by: Simone Bordet * Updates from review * javadoc Co-authored-by: Simone Bordet --- .../jetty/client/AbstractConnectionPool.java | 24 +- .../jetty/client/DuplexConnectionPool.java | 11 +- .../jetty/client/IndexedConnectionPool.java | 79 ----- .../client/LeakTrackingConnectionPool.java | 29 +- .../jetty/client/MultiplexConnectionPool.java | 11 +- .../jetty/client/RandomConnectionPool.java | 15 +- .../client/RoundRobinConnectionPool.java | 17 +- .../jetty/util/PoolStrategyBenchmark.java | 139 ++++++++ .../java/org/eclipse/jetty/util/Pool.java | 215 ++++++++----- .../java/org/eclipse/jetty/util/PoolTest.java | 296 ++++++++++++------ .../eclipse/jetty/xml/XmlConfiguration.java | 2 +- 11 files changed, 523 insertions(+), 315 deletions(-) delete mode 100644 jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java create mode 100644 jetty-jmh/src/main/java/org/eclipse/jetty/util/PoolStrategyBenchmark.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 4832590f7cb2..61a725adc895 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -33,6 +33,7 @@ import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -41,7 +42,7 @@ import static java.util.stream.Collectors.toCollection; @ManagedObject -public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable, Sweeper.Sweepable +public abstract class AbstractConnectionPool extends ContainerLifeCycle implements ConnectionPool, Dumpable, Sweeper.Sweepable { private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class); @@ -56,21 +57,26 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable @Deprecated protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester) { - this((HttpDestination)destination, maxConnections, true, requester); + this((HttpDestination)destination, maxConnections, false, requester); } protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester) + { + this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester); + } + + protected AbstractConnectionPool(HttpDestination destination, Pool pool, Callback requester) { this.destination = destination; this.requester = requester; - @SuppressWarnings("unchecked") - Pool pool = destination.getBean(Pool.class); - if (pool == null) - { - pool = new Pool<>(maxConnections, cache ? 1 : 0); - destination.addBean(pool); - } this.pool = pool; + addBean(pool); + } + + @Override + protected void doStop() throws Exception + { + pool.close(); } @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java index 7139a96b9511..f2e7e02645be 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java @@ -18,7 +18,9 @@ package org.eclipse.jetty.client; +import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; @@ -27,12 +29,17 @@ public class DuplexConnectionPool extends AbstractConnectionPool { public DuplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester) { - this(destination, maxConnections, true, requester); + this(destination, maxConnections, false, requester); } public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester) { - super(destination, maxConnections, cache, requester); + this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester); + } + + public DuplexConnectionPool(HttpDestination destination, Pool pool, Callback requester) + { + super(destination, pool, requester); } @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java deleted file mode 100644 index 587a79345856..000000000000 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java +++ /dev/null @@ -1,79 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.client; - -import org.eclipse.jetty.client.api.Connection; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.Pool; -import org.eclipse.jetty.util.annotation.ManagedObject; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; - -/** - *

A {@link MultiplexConnectionPool} that picks connections at a particular - * index between {@code 0} and {@link #getMaxConnectionCount()}.

- *

The algorithm that decides the index value is decided by subclasses.

- *

To acquire a connection, this class obtains the index value and attempts - * to activate the pool entry at that index. - * If this activation fails, another attempt to activate an alternative pool - * entry is performed, to avoid stalling connection acquisition if there is - * an available entry at a different index.

- */ -@ManagedObject -public abstract class IndexedConnectionPool extends MultiplexConnectionPool -{ - private static final Logger LOG = Log.getLogger(IndexedConnectionPool.class); - - private final Pool pool; - - public IndexedConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) - { - super(destination, maxConnections, false, requester, maxMultiplex); - pool = destination.getBean(Pool.class); - } - - /** - *

Must return an index between 0 (inclusive) and {@code maxConnections} (exclusive) - * used to attempt to acquire the connection at that index in the pool.

- * - * @param maxConnections the upper bound of the index (exclusive) - * @return an index between 0 (inclusive) and {@code maxConnections} (exclusive) - */ - protected abstract int getIndex(int maxConnections); - - @Override - protected Connection activate() - { - int index = getIndex(getMaxConnectionCount()); - Pool.Entry entry = pool.acquireAt(index); - if (LOG.isDebugEnabled()) - LOG.debug("activating at index={} entry={}", index, entry); - if (entry == null) - { - entry = pool.acquire(); - if (LOG.isDebugEnabled()) - LOG.debug("activating alternative entry={}", entry); - } - if (entry == null) - return null; - Connection connection = entry.getPooled(); - acquired(connection); - return connection; - } -} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java index 48150a9b6434..f8ffebab707d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java @@ -22,6 +22,7 @@ import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.LeakDetector; +import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -41,40 +42,16 @@ protected void leaked(LeakInfo leakInfo) public LeakTrackingConnectionPool(Destination destination, int maxConnections, Callback requester) { super((HttpDestination)destination, maxConnections, requester); - start(); - } - - private void start() - { - try - { - leakDetector.start(); - } - catch (Exception x) - { - throw new RuntimeException(x); - } + addBean(leakDetector); } @Override public void close() { - stop(); + LifeCycle.stop(this); super.close(); } - private void stop() - { - try - { - leakDetector.stop(); - } - catch (Exception x) - { - throw new RuntimeException(x); - } - } - @Override protected void acquired(Connection connection) { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java index e78b94504071..f4c76d1e87dc 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java @@ -18,7 +18,9 @@ package org.eclipse.jetty.client; +import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; @@ -27,12 +29,17 @@ public class MultiplexConnectionPool extends AbstractConnectionPool implements C { public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { - this(destination, maxConnections, true, requester, maxMultiplex); + this(destination, maxConnections, false, requester, maxMultiplex); } public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex) { - super(destination, maxConnections, cache, requester); + this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex); + } + + public MultiplexConnectionPool(HttpDestination destination, Pool pool, Callback requester, int maxMultiplex) + { + super(destination, pool, requester); setMaxMultiplex(maxMultiplex); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java index 6eb44042a900..54413bf79865 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java @@ -18,26 +18,19 @@ package org.eclipse.jetty.client; -import java.util.concurrent.ThreadLocalRandom; - import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.annotation.ManagedObject; /** - *

An indexed {@link ConnectionPool} that provides connections + *

A {@link ConnectionPool} that provides connections * randomly among the ones that are available.

*/ @ManagedObject -public class RandomConnectionPool extends IndexedConnectionPool +public class RandomConnectionPool extends MultiplexConnectionPool { public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { - super(destination, maxConnections, requester, maxMultiplex); - } - - @Override - protected int getIndex(int maxConnections) - { - return ThreadLocalRandom.current().nextInt(maxConnections); + super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java index d9cb2c590e57..88cc476e43fa 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java @@ -18,9 +18,8 @@ package org.eclipse.jetty.client; -import java.util.concurrent.atomic.AtomicInteger; - import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.annotation.ManagedObject; /** @@ -30,7 +29,7 @@ *
  • the server takes different times to serve different requests; if a request takes a long * time to be processed by the server, it would be a performance penalty to stall sending requests * waiting for that connection to be available - better skip it and try another connection
  • - *
  • connections may be closed by the client or by the server, so it should be a performance + *
  • connections may be closed by the client or by the server, so it would be a performance * penalty to stall sending requests waiting for a new connection to be opened
  • *
  • thread scheduling on both client and server may temporarily penalize a connection
  • * @@ -48,10 +47,8 @@ * @see RandomConnectionPool */ @ManagedObject -public class RoundRobinConnectionPool extends IndexedConnectionPool +public class RoundRobinConnectionPool extends MultiplexConnectionPool { - private final AtomicInteger offset = new AtomicInteger(); - public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester) { this(destination, maxConnections, requester, 1); @@ -59,17 +56,11 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { - super(destination, maxConnections, requester, maxMultiplex); + super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex); // If there are queued requests and connections get // closed due to idle timeout or overuse, we want to // aggressively try to open new connections to replace // those that were closed to process queued requests. setMaximizeConnections(true); } - - @Override - protected int getIndex(int maxConnections) - { - return offset.getAndUpdate(v -> ++v == maxConnections ? 0 : v); - } } diff --git a/jetty-jmh/src/main/java/org/eclipse/jetty/util/PoolStrategyBenchmark.java b/jetty-jmh/src/main/java/org/eclipse/jetty/util/PoolStrategyBenchmark.java new file mode 100644 index 000000000000..450840603209 --- /dev/null +++ b/jetty-jmh/src/main/java/org/eclipse/jetty/util/PoolStrategyBenchmark.java @@ -0,0 +1,139 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.util.concurrent.atomic.LongAdder; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.results.format.ResultFormatType; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@State(Scope.Benchmark) +public class PoolStrategyBenchmark +{ + private Pool pool; + + @Param({ + "Pool.Linear", + "Pool.Random", + "Pool.RoundRobin", + "Pool.ThreadId", + }) + public static String POOL_TYPE; + + @Param({ + "false", + "true", + }) + public static boolean CACHE; + + @Param({ + "4", + "16" + }) + public static int SIZE; + + private static final LongAdder misses = new LongAdder(); + private static final LongAdder hits = new LongAdder(); + private static final LongAdder total = new LongAdder(); + + @Setup + public void setUp() throws Exception + { + misses.reset(); + + switch (POOL_TYPE) + { + case "Pool.Linear" : + pool = new Pool<>(Pool.StrategyType.FIRST, SIZE, CACHE); + break; + case "Pool.Random" : + pool = new Pool<>(Pool.StrategyType.RANDOM, SIZE, CACHE); + break; + case "Pool.ThreadId" : + pool = new Pool<>(Pool.StrategyType.THREAD_ID, SIZE, CACHE); + break; + case "Pool.RoundRobin" : + pool = new Pool<>(Pool.StrategyType.ROUND_ROBIN, SIZE, CACHE); + break; + + default: + throw new IllegalStateException(); + } + + for (int i = 0; i < SIZE; i++) + { + pool.reserve(1).enable(Integer.toString(i), false); + } + } + + @TearDown + public void tearDown() + { + System.err.printf("%nMISSES = %d (%d%%)%n", misses.longValue(), 100 * misses.longValue() / (hits.longValue() + misses.longValue())); + System.err.printf("AVERAGE = %d%n", total.longValue() / hits.longValue()); + pool.close(); + pool = null; + } + + @Benchmark + public void testAcquireReleasePoolWithStrategy() + { + // Now really benchmark the strategy we are interested in + Pool.Entry entry = pool.acquire(); + if (entry == null || entry.isIdle()) + { + misses.increment(); + Blackhole.consumeCPU(20); + return; + } + // do some work + hits.increment(); + total.add(Long.parseLong(entry.getPooled())); + Blackhole.consumeCPU(entry.getPooled().hashCode() % 20); + + // release the entry + entry.release(); + } + + public static void main(String[] args) throws RunnerException + { + Options opt = new OptionsBuilder() + .include(PoolStrategyBenchmark.class.getSimpleName()) + .warmupIterations(3) + .measurementIterations(3) + .forks(1) + .threads(8) + .resultFormat(ResultFormatType.JSON) + .result("/tmp/poolStrategy-" + System.currentTimeMillis() + ".json") + // .addProfiler(GCProfiler.class) + .build(); + + new Runner(opt).run(); + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java index b00981f08313..fc0b63368af9 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java @@ -26,26 +26,20 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Locker; /** * A fast pool of objects, with optional support for - * multiplexing, max usage count and thread-local caching. - *

    - * The thread-local caching mechanism is about remembering up to N previously - * used entries into a thread-local single-threaded collection. - * When that collection is not empty, its entries are removed one by one - * during acquisition until an entry that can be acquired is found. - * This can greatly speed up acquisition when both the acquisition and the - * release of the entries is done on the same thread as this avoids iterating - * the global, thread-safe collection of entries. - *

    + * multiplexing, max usage count and several optimized strategies plus + * an optional {@link ThreadLocal} cache of the last release entry. *

    * When the method {@link #close()} is called, all {@link Closeable}s in the pool * are also closed. @@ -56,38 +50,88 @@ public class Pool implements AutoCloseable, Dumpable { private static final Logger LOGGER = Log.getLogger(Pool.class); - private final List sharedList = new CopyOnWriteArrayList<>(); + private final List entries = new CopyOnWriteArrayList<>(); + + private final int maxEntries; + private final AtomicInteger pending = new AtomicInteger(); + private final StrategyType strategyType; + /* * The cache is used to avoid hammering on the first index of the entry list. * Caches can become poisoned (i.e.: containing entries that are in use) when * the release isn't done by the acquiring thread or when the entry pool is * undersized compared to the load applied on it. * When an entry can't be found in the cache, the global list is iterated - * normally so the cache has no visible effect besides performance. + * with the configured strategy so the cache has no visible effect besides performance. */ - private final ThreadLocal> cache; private final Locker locker = new Locker(); - private final int maxEntries; - private final int cacheSize; - private final AtomicInteger pending = new AtomicInteger(); + private final ThreadLocal cache; + private final AtomicInteger nextIndex; private volatile boolean closed; private volatile int maxMultiplex = 1; private volatile int maxUsageCount = -1; /** - * Construct a Pool with the specified thread-local cache size. + * The type of the strategy to use for the pool. + * The strategy primarily determines where iteration over the pool entries begins. + */ + public enum StrategyType + { + /** + * A strategy that looks for an entry always starting from the first entry. + * It will favour the early entries in the pool, but may contend on them more. + */ + FIRST, + + /** + * A strategy that looks for an entry by iterating from a random starting + * index. No entries are favoured and contention is reduced. + */ + RANDOM, + + /** + * A strategy that uses the {@link Thread#getId()} of the current thread + * to select a starting point for an entry search. Whilst not as performant as + * using the {@link ThreadLocal} cache, it may be suitable when the pool is substantially smaller + * than the number of available threads. + * No entries are favoured and contention is reduced. + */ + THREAD_ID, + + /** + * A strategy that looks for an entry by iterating from a starting point + * that is incremented on every search. This gives similar results to the + * random strategy but with more predictable behaviour. + * No entries are favoured and contention is reduced. + */ + ROUND_ROBIN, + } + + /** + * Construct a Pool with a specified lookup strategy and no + * {@link ThreadLocal} cache. * + * @param strategyType The strategy to used for looking up entries. + * @param maxEntries the maximum amount of entries that the pool will accept. + */ + public Pool(StrategyType strategyType, int maxEntries) + { + this(strategyType, maxEntries, false); + } + + /** + * Construct a Pool with the specified thread-local cache size and + * an optional {@link ThreadLocal} cache. + * @param strategyType The strategy to used for looking up entries. * @param maxEntries the maximum amount of entries that the pool will accept. - * @param cacheSize the thread-local cache size. A value less than 1 means the cache is disabled. + * @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry. */ - public Pool(int maxEntries, int cacheSize) + public Pool(StrategyType strategyType, int maxEntries, boolean cache) { this.maxEntries = maxEntries; - this.cacheSize = cacheSize; - if (cacheSize > 0) - this.cache = ThreadLocal.withInitial(() -> new ArrayList(cacheSize)); - else - this.cache = null; + this.strategyType = strategyType; + this.cache = cache ? new ThreadLocal<>() : null; + nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null; } public int getReservedCount() @@ -97,12 +141,12 @@ public int getReservedCount() public int getIdleCount() { - return (int)sharedList.stream().filter(Entry::isIdle).count(); + return (int)entries.stream().filter(Entry::isIdle).count(); } public int getInUseCount() { - return (int)sharedList.stream().filter(Entry::isInUse).count(); + return (int)entries.stream().filter(Entry::isInUse).count(); } public int getMaxEntries() @@ -153,7 +197,7 @@ public Entry reserve(int allotment) if (closed) return null; - int space = maxEntries - sharedList.size(); + int space = maxEntries - entries.size(); if (space <= 0) return null; @@ -165,17 +209,18 @@ public Entry reserve(int allotment) pending.incrementAndGet(); Entry entry = new Entry(); - sharedList.add(entry); + entries.add(entry); return entry; } } /** * Acquire the entry from the pool at the specified index. This method bypasses the thread-local mechanism. - * + * @deprecated No longer supported. Instead use a {@link StrategyType} to configure the pool. * @param idx the index of the entry to acquire. * @return the specified entry or null if there is none at the specified index or if it is not available. */ + @Deprecated public Entry acquireAt(int idx) { if (closed) @@ -183,7 +228,7 @@ public Entry acquireAt(int idx) try { - Entry entry = sharedList.get(idx); + Entry entry = entries.get(idx); if (entry.tryAcquire()) return entry; } @@ -204,25 +249,52 @@ public Entry acquire() if (closed) return null; - // first check the thread-local cache + int size = entries.size(); + if (size == 0) + return null; + if (cache != null) { - List cachedList = cache.get(); - while (!cachedList.isEmpty()) + Pool.Entry entry = cache.get(); + if (entry != null && entry.tryAcquire()) + return entry; + } + + int index = startIndex(size); + + for (int tries = size; tries-- > 0;) + { + try + { + Pool.Entry entry = entries.get(index); + if (entry != null && entry.tryAcquire()) + return entry; + } + catch (IndexOutOfBoundsException e) { - Entry cachedEntry = cachedList.remove(cachedList.size() - 1); - if (cachedEntry.tryAcquire()) - return cachedEntry; + LOGGER.ignore(e); + size = entries.size(); } + index = (index + 1) % size; } + return null; + } - // then iterate the shared list - for (Entry entry : sharedList) + private int startIndex(int size) + { + switch (strategyType) { - if (entry.tryAcquire()) - return entry; + case FIRST: + return 0; + case RANDOM: + return ThreadLocalRandom.current().nextInt(size); + case ROUND_ROBIN: + return nextIndex.getAndUpdate(c -> Math.max(0, c + 1)) % size; + case THREAD_ID: + return (int)(Thread.currentThread().getId() % size); + default: + throw new IllegalArgumentException("Unknown strategy type: " + strategyType); } - return null; } /** @@ -278,17 +350,10 @@ public boolean release(Entry entry) if (closed) return false; - // first mark it as unused - boolean reusable = entry.tryRelease(); - - // then cache the released entry - if (cache != null && reusable) - { - List cachedList = cache.get(); - if (cachedList.size() < cacheSize) - cachedList.add(entry); - } - return reusable; + boolean released = entry.tryRelease(); + if (released && cache != null) + cache.set(entry); + return released; } /** @@ -309,12 +374,9 @@ public boolean remove(Entry entry) return false; } - boolean removed = sharedList.remove(entry); - if (!removed) - { - if (LOGGER.isDebugEnabled()) - LOGGER.debug("Attempt to remove an object from the pool that does not exist: {}", entry); - } + boolean removed = entries.remove(entry); + if (!removed && LOGGER.isDebugEnabled()) + LOGGER.debug("Attempt to remove an object from the pool that does not exist: {}", entry); return removed; } @@ -331,8 +393,8 @@ public void close() try (Locker.Lock l = locker.lock()) { closed = true; - copy = new ArrayList<>(sharedList); - sharedList.clear(); + copy = new ArrayList<>(entries); + entries.clear(); } // iterate the copy and close its entries @@ -345,29 +407,30 @@ public void close() public int size() { - return sharedList.size(); + return entries.size(); } public Collection values() { - return Collections.unmodifiableCollection(sharedList); + return Collections.unmodifiableCollection(entries); } @Override public void dump(Appendable out, String indent) throws IOException { - Dumpable.dumpObjects(out, indent, this); + Dumpable.dumpObjects(out, indent, this, + new DumpableCollection("entries", entries)); } @Override public String toString() { - return String.format("%s@%x[size=%d closed=%s entries=%s]", + return String.format("%s@%x[size=%d closed=%s pending=%d]", getClass().getSimpleName(), hashCode(), - sharedList.size(), + entries.size(), closed, - sharedList); + pending.get()); } public class Entry @@ -500,6 +563,13 @@ boolean tryRelease() return !(overUsed && newMultiplexingCount == 0); } + public boolean isOverUsed() + { + int currentMaxUsageCount = maxUsageCount; + int usageCount = state.getHi(); + return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount; + } + /** * Try to mark the entry as removed. * @return true if the entry has to be removed from the containing pool, false otherwise. @@ -549,12 +619,17 @@ public int getUsageCount() public String toString() { long encoded = state.get(); - return String.format("%s@%x{usage=%d/%d,multiplex=%d/%d,pooled=%s}", + int usageCount = AtomicBiInteger.getHi(encoded); + int multiplexCount = AtomicBiInteger.getLo(encoded); + + String state = usageCount < 0 ? "CLOSED" : multiplexCount == 0 ? "IDLE" : "INUSE"; + + return String.format("%s@%x{%s, usage=%d, multiplex=%d/%d, pooled=%s}", getClass().getSimpleName(), hashCode(), - AtomicBiInteger.getHi(encoded), - getMaxUsageCount(), - AtomicBiInteger.getLo(encoded), + state, + Math.max(usageCount, 0), + Math.max(multiplexCount, 0), getMaxMultiplex(), pooled); } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java index bf589e59dd46..00071f247af4 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java @@ -21,17 +21,27 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import static java.util.stream.Collectors.toList; +import static org.eclipse.jetty.util.Pool.StrategyType.FIRST; +import static org.eclipse.jetty.util.Pool.StrategyType.RANDOM; +import static org.eclipse.jetty.util.Pool.StrategyType.ROUND_ROBIN; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -40,20 +50,27 @@ public class PoolTest { - public static Stream cacheSize() + + interface Factory + { + Pool getPool(int maxSize); + } + + public static Stream strategy() { List data = new ArrayList<>(); - data.add(new Object[]{0}); - data.add(new Object[]{1}); - data.add(new Object[]{2}); + data.add(new Object[]{(Factory)s -> new Pool<>(FIRST, s)}); + data.add(new Object[]{(Factory)s -> new Pool<>(RANDOM, s)}); + data.add(new Object[]{(Factory)s -> new Pool<>(FIRST, s, true)}); + data.add(new Object[]{(Factory)s -> new Pool<>(ROUND_ROBIN, s)}); return data.stream(); } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testAcquireRelease(int cacheSize) + @MethodSource(value = "strategy") + public void testAcquireRelease(Factory factory) { - Pool pool = new Pool<>(1,cacheSize); + Pool pool = factory.getPool(1); pool.reserve(-1).enable("aaa", false); assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(0)); @@ -94,10 +111,10 @@ public void testAcquireRelease(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testRemoveBeforeRelease(int cacheSize) + @MethodSource(value = "strategy") + public void testRemoveBeforeRelease(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); pool.reserve(-1).enable("aaa", false); Pool.Entry e1 = pool.acquire(); @@ -107,10 +124,10 @@ public void testRemoveBeforeRelease(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testCloseBeforeRelease(int cacheSize) + @MethodSource(value = "strategy") + public void testCloseBeforeRelease(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); pool.reserve(-1).enable("aaa", false); Pool.Entry e1 = pool.acquire(); @@ -121,10 +138,10 @@ public void testCloseBeforeRelease(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testMaxPoolSize(int cacheSize) + @MethodSource(value = "strategy") + public void testMaxPoolSize(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); assertThat(pool.size(), is(0)); assertThat(pool.reserve(-1), notNullValue()); assertThat(pool.size(), is(1)); @@ -133,10 +150,10 @@ public void testMaxPoolSize(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testReserve(int cacheSize) + @MethodSource(value = "strategy") + public void testReserve(Factory factory) { - Pool pool = new Pool<>(2, cacheSize); + Pool pool = factory.getPool(2); // Reserve an entry Pool.Entry e1 = pool.reserve(-1); @@ -196,10 +213,10 @@ public void testReserve(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testReserveMaxPending(int cacheSize) + @MethodSource(value = "strategy") + public void testReserveMaxPending(Factory factory) { - Pool pool = new Pool<>(2, cacheSize); + Pool pool = factory.getPool(2); assertThat(pool.reserve(0), nullValue()); assertThat(pool.reserve(1), notNullValue()); assertThat(pool.reserve(1), nullValue()); @@ -210,20 +227,20 @@ public void testReserveMaxPending(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testReserveNegativeMaxPending(int cacheSize) + @MethodSource(value = "strategy") + public void testReserveNegativeMaxPending(Factory factory) { - Pool pool = new Pool<>(2, cacheSize); + Pool pool = factory.getPool(2); assertThat(pool.reserve(-1), notNullValue()); assertThat(pool.reserve(-1), notNullValue()); assertThat(pool.reserve(-1), nullValue()); } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testClose(int cacheSize) + @MethodSource(value = "strategy") + public void testClose(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); pool.reserve(-1).enable("aaa", false); assertThat(pool.isClosed(), is(false)); pool.close(); @@ -235,12 +252,11 @@ public void testClose(int cacheSize) assertThat(pool.reserve(-1), nullValue()); } - @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testClosingCloseable(int cacheSize) + @Test + public void testClosingCloseable() { AtomicBoolean closed = new AtomicBoolean(); - Pool pool = new Pool<>(1,0); + Pool pool = new Pool<>(FIRST, 1); Closeable pooled = () -> closed.set(true); pool.reserve(-1).enable(pooled, false); assertThat(closed.get(), is(false)); @@ -249,10 +265,10 @@ public void testClosingCloseable(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testRemove(int cacheSize) + @MethodSource(value = "strategy") + public void testRemove(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); pool.reserve(-1).enable("aaa", false); Pool.Entry e1 = pool.acquire(); @@ -264,10 +280,10 @@ public void testRemove(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testValuesSize(int cacheSize) + @MethodSource(value = "strategy") + public void testValuesSize(Factory factory) { - Pool pool = new Pool<>(2, cacheSize); + Pool pool = factory.getPool(2); assertThat(pool.size(), is(0)); assertThat(pool.values().isEmpty(), is(true)); @@ -278,10 +294,10 @@ public void testValuesSize(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testValuesContainsAcquiredEntries(int cacheSize) + @MethodSource(value = "strategy") + public void testValuesContainsAcquiredEntries(Factory factory) { - Pool pool = new Pool<>(2, cacheSize); + Pool pool = factory.getPool(2); pool.reserve(-1).enable("aaa", false); pool.reserve(-1).enable("bbb", false); @@ -292,10 +308,10 @@ public void testValuesContainsAcquiredEntries(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testAcquireAt(int cacheSize) + @MethodSource(value = "strategy") + public void testAcquireAt(Factory factory) { - Pool pool = new Pool<>(2, cacheSize); + Pool pool = factory.getPool(2); pool.reserve(-1).enable("aaa", false); pool.reserve(-1).enable("bbb", false); @@ -308,10 +324,10 @@ public void testAcquireAt(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testMaxUsageCount(int cacheSize) + @MethodSource(value = "strategy") + public void testMaxUsageCount(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); pool.setMaxUsageCount(3); pool.reserve(-1).enable("aaa", false); @@ -331,39 +347,54 @@ public void testMaxUsageCount(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testMaxMultiplex(int cacheSize) + @MethodSource(value = "strategy") + public void testMaxMultiplex(Factory factory) { - Pool pool = new Pool<>(2, cacheSize); + Pool pool = factory.getPool(2); pool.setMaxMultiplex(3); - pool.reserve(-1).enable("aaa", false); - pool.reserve(-1).enable("bbb", false); - Pool.Entry e1 = pool.acquire(); - Pool.Entry e2 = pool.acquire(); - Pool.Entry e3 = pool.acquire(); - Pool.Entry e4 = pool.acquire(); - assertThat(e1.getPooled(), equalTo("aaa")); - assertThat(e1, sameInstance(e2)); - assertThat(e1, sameInstance(e3)); - assertThat(e4.getPooled(), equalTo("bbb")); - assertThat(pool.release(e1), is(true)); - Pool.Entry e5 = pool.acquire(); - assertThat(e2, sameInstance(e5)); - Pool.Entry e6 = pool.acquire(); - assertThat(e4, sameInstance(e6)); + Map counts = new HashMap<>(); + AtomicInteger a = new AtomicInteger(); + AtomicInteger b = new AtomicInteger(); + counts.put("a", a); + counts.put("b", b); + pool.reserve(-1).enable("a", false); + pool.reserve(-1).enable("b", false); + + counts.get(pool.acquire().getPooled()).incrementAndGet(); + counts.get(pool.acquire().getPooled()).incrementAndGet(); + counts.get(pool.acquire().getPooled()).incrementAndGet(); + counts.get(pool.acquire().getPooled()).incrementAndGet(); + + assertThat(a.get(), greaterThan(0)); + assertThat(a.get(), lessThanOrEqualTo(3)); + assertThat(b.get(), greaterThan(0)); + assertThat(b.get(), lessThanOrEqualTo(3)); + + counts.get(pool.acquire().getPooled()).incrementAndGet(); + counts.get(pool.acquire().getPooled()).incrementAndGet(); + + assertThat(a.get(), is(3)); + assertThat(b.get(), is(3)); + + assertNull(pool.acquire()); } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testRemoveMultiplexed(int cacheSize) + @MethodSource(value = "strategy") + public void testRemoveMultiplexed(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); pool.reserve(-1).enable("aaa", false); Pool.Entry e1 = pool.acquire(); + assertThat(e1, notNullValue()); Pool.Entry e2 = pool.acquire(); + assertThat(e2, notNullValue()); + assertThat(e2, sameInstance(e1)); + assertThat(e2.getUsageCount(), is(2)); + assertThat(pool.values().stream().findFirst().get().isIdle(), is(false)); assertThat(pool.remove(e1), is(false)); @@ -380,10 +411,10 @@ public void testRemoveMultiplexed(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testMultiplexRemoveThenAcquireThenReleaseRemove(int cacheSize) + @MethodSource(value = "strategy") + public void testMultiplexRemoveThenAcquireThenReleaseRemove(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); pool.reserve(-1).enable("aaa", false); @@ -398,10 +429,10 @@ public void testMultiplexRemoveThenAcquireThenReleaseRemove(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testNonMultiplexRemoveAfterAcquire(int cacheSize) + @MethodSource(value = "strategy") + public void testNonMultiplexRemoveAfterAcquire(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); pool.reserve(-1).enable("aaa", false); @@ -411,10 +442,10 @@ public void testNonMultiplexRemoveAfterAcquire(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testMultiplexRemoveAfterAcquire(int cacheSize) + @MethodSource(value = "strategy") + public void testMultiplexRemoveAfterAcquire(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); pool.reserve(-1).enable("aaa", false); @@ -436,10 +467,10 @@ public void testMultiplexRemoveAfterAcquire(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testReleaseThenRemoveNonEnabledEntry(int cacheSize) + @MethodSource(value = "strategy") + public void testReleaseThenRemoveNonEnabledEntry(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); Pool.Entry e = pool.reserve(-1); assertThat(pool.size(), is(1)); assertThat(pool.release(e), is(false)); @@ -449,10 +480,10 @@ public void testReleaseThenRemoveNonEnabledEntry(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testRemoveNonEnabledEntry(int cacheSize) + @MethodSource(value = "strategy") + public void testRemoveNonEnabledEntry(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); Pool.Entry e = pool.reserve(-1); assertThat(pool.size(), is(1)); assertThat(pool.remove(e), is(true)); @@ -460,10 +491,10 @@ public void testRemoveNonEnabledEntry(int cacheSize) } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testMultiplexMaxUsageReachedAcquireThenRemove(int cacheSize) + @MethodSource(value = "strategy") + public void testMultiplexMaxUsageReachedAcquireThenRemove(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); pool.setMaxUsageCount(3); pool.reserve(-1).enable("aaa", false); @@ -479,12 +510,12 @@ public void testMultiplexMaxUsageReachedAcquireThenRemove(int cacheSize) assertThat(pool.remove(e0), is(true)); assertThat(pool.size(), is(0)); } - + @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(int cacheSize) + @MethodSource(value = "strategy") + public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); pool.setMaxUsageCount(3); pool.reserve(-1).enable("aaa", false); @@ -506,10 +537,10 @@ public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(int cacheSi } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testUsageCountAfterReachingMaxMultiplexLimit(int cacheSize) + @MethodSource(value = "strategy") + public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory) { - Pool pool = new Pool<>(1, cacheSize); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); pool.setMaxUsageCount(10); pool.reserve(-1).enable("aaa", false); @@ -517,25 +548,25 @@ public void testUsageCountAfterReachingMaxMultiplexLimit(int cacheSize) Pool.Entry e1 = pool.acquire(); assertThat(e1.getUsageCount(), is(1)); Pool.Entry e2 = pool.acquire(); + assertThat(e2, sameInstance(e1)); assertThat(e1.getUsageCount(), is(2)); assertThat(pool.acquire(), nullValue()); assertThat(e1.getUsageCount(), is(2)); } - @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testConfigLimits(int cacheSize) + @Test + public void testConfigLimits() { - assertThrows(IllegalArgumentException.class, () -> new Pool(1, 0).setMaxMultiplex(0)); - assertThrows(IllegalArgumentException.class, () -> new Pool(1, 0).setMaxMultiplex(-1)); - assertThrows(IllegalArgumentException.class, () -> new Pool(1, 0).setMaxUsageCount(0)); + assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxMultiplex(0)); + assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxMultiplex(-1)); + assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxUsageCount(0)); } @ParameterizedTest - @MethodSource(value = "cacheSize") - public void testAcquireWithCreator(int cacheSize) + @MethodSource(value = "strategy") + public void testAcquireWithCreator(Factory factory) { - Pool pool = new Pool<>(2, cacheSize); + Pool pool = factory.getPool(2); assertThat(pool.size(), is(0)); assertThat(pool.acquire(e -> null), nullValue()); @@ -590,6 +621,67 @@ public void testAcquireWithCreator(int cacheSize) assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getInUseCount(), is(1)); + } + + @Test + public void testRoundRobinStrategy() + { + Pool pool = new Pool<>(ROUND_ROBIN, 4); + + Pool.Entry e1 = pool.acquire(e -> new AtomicInteger()); + Pool.Entry e2 = pool.acquire(e -> new AtomicInteger()); + Pool.Entry e3 = pool.acquire(e -> new AtomicInteger()); + Pool.Entry e4 = pool.acquire(e -> new AtomicInteger()); + assertNull(pool.acquire(e -> new AtomicInteger())); + + pool.release(e1); + pool.release(e2); + pool.release(e3); + pool.release(e4); + Pool.Entry last = null; + for (int i = 0; i < 8; i++) + { + Pool.Entry e = pool.acquire(); + if (last != null) + assertThat(e, not(sameInstance(last))); + e.getPooled().incrementAndGet(); + pool.release(e); + last = e; + } + + assertThat(e1.getPooled().get(), is(2)); + assertThat(e2.getPooled().get(), is(2)); + assertThat(e3.getPooled().get(), is(2)); + assertThat(e4.getPooled().get(), is(2)); + } + + @Test + public void testRandomStrategy() + { + Pool pool = new Pool<>(RANDOM, 4); + + Pool.Entry e1 = pool.acquire(e -> new AtomicInteger()); + Pool.Entry e2 = pool.acquire(e -> new AtomicInteger()); + Pool.Entry e3 = pool.acquire(e -> new AtomicInteger()); + Pool.Entry e4 = pool.acquire(e -> new AtomicInteger()); + assertNull(pool.acquire(e -> new AtomicInteger())); + + pool.release(e1); + pool.release(e2); + pool.release(e3); + pool.release(e4); + + for (int i = 0; i < 400; i++) + { + Pool.Entry e = pool.acquire(); + e.getPooled().incrementAndGet(); + pool.release(e); + } + + assertThat(e1.getPooled().get(), greaterThan(10)); + assertThat(e2.getPooled().get(), greaterThan(10)); + assertThat(e3.getPooled().get(), greaterThan(10)); + assertThat(e4.getPooled().get(), greaterThan(10)); } } diff --git a/jetty-xml/src/main/java/org/eclipse/jetty/xml/XmlConfiguration.java b/jetty-xml/src/main/java/org/eclipse/jetty/xml/XmlConfiguration.java index c79664d7b956..24541c2eff43 100644 --- a/jetty-xml/src/main/java/org/eclipse/jetty/xml/XmlConfiguration.java +++ b/jetty-xml/src/main/java/org/eclipse/jetty/xml/XmlConfiguration.java @@ -101,7 +101,7 @@ public class XmlConfiguration }; private static final Iterable __factoryLoader = ServiceLoader.load(ConfigurationProcessorFactory.class); private static final Pool __parsers = - new Pool<>(Math.min(8, Runtime.getRuntime().availableProcessors()),1); + new Pool<>(Pool.StrategyType.THREAD_ID, Math.min(8, Runtime.getRuntime().availableProcessors())); public static final Comparator EXECUTABLE_COMPARATOR = (e1, e2) -> { // Favour methods with less parameters