diff --git a/.circleci/config.yml b/.circleci/config.yml index dcf444f56ded..c033ca37e018 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,10 +3,10 @@ jobs: j8_jvm_upgrade_dtests: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 1 + parallelism: 2 steps: - attach_workspace: at: /home/cassandra @@ -89,7 +89,7 @@ jobs: j8_cqlsh-dtests-py2-with-vnodes: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -165,7 +165,7 @@ jobs: j11_unit_tests: docker: - image: spod/cassandra-testing-ubuntu1810-java11:20181210 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -252,7 +252,7 @@ jobs: j11_cqlsh-dtests-py3-with-vnodes: docker: - image: spod/cassandra-testing-ubuntu1810-java11:20181210 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -329,7 +329,7 @@ jobs: j11_cqlsh-dtests-py3-no-vnodes: docker: - image: spod/cassandra-testing-ubuntu1810-java11:20181210 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -406,7 +406,7 @@ jobs: j8_cqlsh-dtests-py3-with-vnodes: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -482,7 +482,7 @@ jobs: j8_cqlsh-dtests-py2-no-vnodes: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -558,7 +558,7 @@ jobs: j11_cqlsh-dtests-py2-with-vnodes: docker: - image: spod/cassandra-testing-ubuntu1810-java11:20181210 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -635,7 +635,7 @@ jobs: j11_dtests-with-vnodes: docker: - image: spod/cassandra-testing-ubuntu1810-java11:20181210 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -715,7 +715,7 @@ jobs: j8_dtests-no-vnodes: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -772,7 +772,7 @@ jobs: j8_upgradetests-no-vnodes: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -870,7 +870,7 @@ jobs: utests_stress: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -915,7 +915,7 @@ jobs: j8_unit_tests: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -1001,10 +1001,10 @@ jobs: j11_jvm_dtests: docker: - image: spod/cassandra-testing-ubuntu1810-java11:20181210 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 1 + parallelism: 2 steps: - attach_workspace: at: /home/cassandra @@ -1169,7 +1169,7 @@ jobs: j11_cqlsh-dtests-py2-no-vnodes: docker: - image: spod/cassandra-testing-ubuntu1810-java11:20181210 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -1246,7 +1246,7 @@ jobs: j8_dtests-with-vnodes: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -1303,10 +1303,10 @@ jobs: j8_jvm_dtests: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 1 + parallelism: 2 steps: - attach_workspace: at: /home/cassandra @@ -1469,7 +1469,7 @@ jobs: j8_cqlsh-dtests-py3-no-vnodes: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -1545,7 +1545,7 @@ jobs: utests_long: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -1590,7 +1590,7 @@ jobs: utests_fqltool: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -1635,7 +1635,7 @@ jobs: j11_dtests-no-vnodes: docker: - image: spod/cassandra-testing-ubuntu1810-java11:20181210 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 @@ -1715,7 +1715,7 @@ jobs: utests_compression: docker: - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 4 diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java b/src/java/org/apache/cassandra/cache/ChunkCache.java index 0edb681a6296..aac7f59d27f1 100644 --- a/src/java/org/apache/cassandra/cache/ChunkCache.java +++ b/src/java/org/apache/cassandra/cache/ChunkCache.java @@ -34,6 +34,7 @@ import org.apache.cassandra.io.util.*; import org.apache.cassandra.metrics.ChunkCacheMetrics; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; public class ChunkCache implements CacheLoader, RemovalListener, CacheSize @@ -45,6 +46,8 @@ public class ChunkCache private static boolean enabled = cacheSize > 0; public static final ChunkCache instance = enabled ? new ChunkCache() : null; + private static final BufferPool bufferPool = BufferPoolManager.longLived(); + private final LoadingCache cache; public final ChunkCacheMetrics metrics; @@ -130,7 +133,7 @@ public long offset() public void release() { if (references.decrementAndGet() == 0) - BufferPool.put(buffer); + bufferPool.put(buffer); } } @@ -149,7 +152,7 @@ private ChunkCache() @Override public Buffer load(Key key) { - ByteBuffer buffer = BufferPool.get(key.file.chunkSize(), key.file.preferredBufferType()); + ByteBuffer buffer = bufferPool.get(key.file.chunkSize(), key.file.preferredBufferType()); assert buffer != null; key.file.readChunk(key.position, buffer); return new Buffer(buffer, key.position); diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 0e72e2eb97be..dad897de40c9 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -301,6 +301,8 @@ public class Config private static boolean isClientMode = false; private static Supplier overrideLoadConfig = null; + public Integer network_cache_size_in_mb; + public Integer file_cache_size_in_mb; /** diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index a6ff089c82d7..fe27ca83aa55 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -453,6 +453,9 @@ else if (conf.disk_access_mode == Config.DiskAccessMode.mmap_index_only) if (conf.concurrent_replicates != null) logger.warn("concurrent_replicates has been deprecated and should be removed from cassandra.yaml"); + if (conf.network_cache_size_in_mb == null) + conf.network_cache_size_in_mb = Math.min(128, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576))); + if (conf.file_cache_size_in_mb == null) conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576))); @@ -2417,6 +2420,17 @@ public static int getFileCacheSizeInMB() return conf.file_cache_size_in_mb; } + public static int getNetworkCacheSizeInMB() + { + if (conf.network_cache_size_in_mb == null) + { + // In client mode the value is not set. + assert DatabaseDescriptor.isClientInitialized(); + return 0; + } + return conf.network_cache_size_in_mb; + } + public static boolean getFileCacheRoundUp() { if (conf.file_cache_round_up == null) diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java index ffc663dd18cb..81f11530b560 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java @@ -41,7 +41,7 @@ import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.async.StreamCompressionSerializer; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; import static org.apache.cassandra.net.MessagingService.current_version; @@ -153,7 +153,7 @@ protected long write(ChannelProxy proxy, ChecksumValidator validator, AsyncStrea // this buffer will hold the data from disk. as it will be compressed on the fly by // AsyncChannelCompressedStreamWriter.write(ByteBuffer), we can release this buffer as soon as we can. - ByteBuffer buffer = BufferPool.get(minReadable, BufferType.OFF_HEAP); + ByteBuffer buffer = BufferPoolManager.ephemeral().get(minReadable, BufferType.OFF_HEAP); try { int readCount = proxy.read(buffer, start); @@ -172,7 +172,7 @@ protected long write(ChannelProxy proxy, ChecksumValidator validator, AsyncStrea } finally { - BufferPool.put(buffer); + BufferPoolManager.ephemeral().put(buffer); } return toTransfer; diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java index 30d18fa7b66e..a1c940e835d0 100644 --- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java @@ -29,6 +29,7 @@ import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.NativeLibrary; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; /** * A {@link RandomAccessReader} wrapper that calculates the CRC in place. @@ -43,6 +44,8 @@ */ public class ChecksummedDataInput extends RebufferingInputStream { + private static final BufferPool bufferPool = BufferPoolManager.longLived(); + private final CRC32 crc; private int crcPosition; private boolean crcUpdateDisabled; @@ -55,7 +58,7 @@ public class ChecksummedDataInput extends RebufferingInputStream ChecksummedDataInput(ChannelProxy channel, BufferType bufferType) { - super(BufferPool.get(RandomAccessReader.DEFAULT_BUFFER_SIZE, bufferType)); + super(bufferPool.get(RandomAccessReader.DEFAULT_BUFFER_SIZE, bufferType)); crc = new CRC32(); crcPosition = 0; @@ -244,7 +247,7 @@ private void updateCrc() @Override public void close() { - BufferPool.put(buffer); + bufferPool.put(buffer); channel.close(); } diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java index 0381b00e88d1..a3e5f79a4099 100644 --- a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java @@ -28,9 +28,12 @@ import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.utils.memory.BufferPool; import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.memory.BufferPoolManager; public final class CompressedChecksummedDataInput extends ChecksummedDataInput { + private static final BufferPool bufferPool = BufferPoolManager.longLived(); + private final ICompressor compressor; private volatile long filePosition = 0; // Current position in file, advanced when reading chunk. private volatile long sourcePosition = 0; // Current position in file to report, advanced after consuming chunk. @@ -117,9 +120,9 @@ protected void readBuffer() int bufferSize = compressedSize + (compressedSize / 20); // allocate +5% to cover variability in compressed size if (compressedBuffer != null) { - BufferPool.put(compressedBuffer); + bufferPool.put(compressedBuffer); } - compressedBuffer = BufferPool.get(bufferSize, compressor.preferredBufferType()); + compressedBuffer = bufferPool.get(bufferSize, compressor.preferredBufferType()); } compressedBuffer.clear(); @@ -131,8 +134,8 @@ protected void readBuffer() if (buffer.capacity() < uncompressedSize) { int bufferSize = uncompressedSize + (uncompressedSize / 20); - BufferPool.put(buffer); - buffer = BufferPool.get(bufferSize, compressor.preferredBufferType()); + bufferPool.put(buffer); + buffer = bufferPool.get(bufferSize, compressor.preferredBufferType()); } buffer.clear(); @@ -151,7 +154,7 @@ protected void readBuffer() @Override public void close() { - BufferPool.put(compressedBuffer); + bufferPool.put(compressedBuffer); super.close(); } diff --git a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java index f3b9a8824cf4..b5533192e1bc 100644 --- a/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java +++ b/src/java/org/apache/cassandra/io/util/BufferManagingRebufferer.java @@ -23,7 +23,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; -import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; /** * Buffer manager used for reading from a ChunkReader when cache is not in use. Instances of this class are @@ -42,14 +42,14 @@ public abstract class BufferManagingRebufferer implements Rebufferer, Rebufferer protected BufferManagingRebufferer(ChunkReader wrapped) { this.source = wrapped; - buffer = BufferPool.get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN); + buffer = BufferPoolManager.longLived().get(wrapped.chunkSize(), wrapped.preferredBufferType()).order(ByteOrder.BIG_ENDIAN); buffer.limit(0); } @Override public void closeReader() { - BufferPool.put(buffer); + BufferPoolManager.longLived().put(buffer); offset = -1; } diff --git a/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java b/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java index c9c859a1422c..c5ed08ab1e9c 100644 --- a/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java +++ b/src/java/org/apache/cassandra/metrics/BufferPoolMetrics.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -25,7 +25,8 @@ public class BufferPoolMetrics { - private static final MetricNameFactory factory = new DefaultNameFactory("BufferPool"); + /** Total number of hits */ + public final Meter hits; /** Total number of misses */ public final Meter misses; @@ -33,16 +34,14 @@ public class BufferPoolMetrics /** Total size of buffer pools, in bytes */ public final Gauge size; - public BufferPoolMetrics() + public BufferPoolMetrics(String name, BufferPool bufferPool) { + MetricNameFactory factory = new DefaultNameFactory("BufferPool", name); + + hits = Metrics.meter(factory.createMetricName("Hits")); + misses = Metrics.meter(factory.createMetricName("Misses")); - size = Metrics.register(factory.createMetricName("Size"), new Gauge() - { - public Long getValue() - { - return BufferPool.sizeInBytes(); - } - }); + size = Metrics.register(factory.createMetricName("Size"), bufferPool::sizeInBytes); } } diff --git a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java index a2dff41a8ad7..873c1a1d1515 100644 --- a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java +++ b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java @@ -34,6 +34,7 @@ import org.apache.cassandra.net.SharedDefaultFileRegion.SharedFileChannel; import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; import static java.lang.Math.min; @@ -50,6 +51,8 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus { private static final Logger logger = LoggerFactory.getLogger(AsyncStreamingOutputPlus.class); + private static final BufferPool bufferPool = BufferPoolManager.ephemeral(); + final int defaultLowWaterMark; final int defaultHighWaterMark; @@ -65,7 +68,7 @@ public AsyncStreamingOutputPlus(Channel channel) private void allocateBuffer() { // this buffer is only used for small quantities of data - buffer = BufferPool.getAtLeast(8 << 10, BufferType.OFF_HEAP); + buffer = bufferPool.getAtLeast(8 << 10, BufferType.OFF_HEAP); } @Override @@ -137,7 +140,7 @@ class Holder throw new IllegalStateException("Can only allocate one ByteBuffer"); limiter.acquire(size); holder.promise = beginFlush(size, defaultLowWaterMark, defaultHighWaterMark); - holder.buffer = BufferPool.get(size, BufferType.OFF_HEAP); + holder.buffer = bufferPool.get(size, BufferType.OFF_HEAP); return holder.buffer; }); } @@ -145,14 +148,14 @@ class Holder { // we don't currently support cancelling the flush, but at this point we are recoverable if we want if (holder.buffer != null) - BufferPool.put(holder.buffer); + bufferPool.put(holder.buffer); if (holder.promise != null) holder.promise.tryFailure(t); throw t; } ByteBuffer buffer = holder.buffer; - BufferPool.putUnusedPortion(buffer); + bufferPool.putUnusedPortion(buffer); int length = buffer.limit(); channel.writeAndFlush(GlobalBufferPoolAllocator.wrap(buffer), holder.promise); @@ -213,7 +216,7 @@ public void discard() { if (buffer != null) { - BufferPool.put(buffer); + bufferPool.put(buffer); buffer = null; } } diff --git a/src/java/org/apache/cassandra/net/BufferPoolAllocator.java b/src/java/org/apache/cassandra/net/BufferPoolAllocator.java index 8782c030693b..6173351d17fb 100644 --- a/src/java/org/apache/cassandra/net/BufferPoolAllocator.java +++ b/src/java/org/apache/cassandra/net/BufferPoolAllocator.java @@ -25,6 +25,7 @@ import io.netty.buffer.UnpooledUnsafeDirectByteBuf; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; /** * A trivial wrapper around BufferPool for integrating with Netty, but retaining ownership of pooling behaviour @@ -32,6 +33,8 @@ */ abstract class BufferPoolAllocator extends AbstractByteBufAllocator { + private static final BufferPool bufferPool = BufferPoolManager.ephemeral(); + BufferPoolAllocator() { super(true); @@ -60,22 +63,22 @@ protected ByteBuf newDirectBuffer(int minCapacity, int maxCapacity) ByteBuffer get(int size) { - return BufferPool.get(size, BufferType.OFF_HEAP); + return bufferPool.get(size, BufferType.OFF_HEAP); } ByteBuffer getAtLeast(int size) { - return BufferPool.getAtLeast(size, BufferType.OFF_HEAP); + return bufferPool.getAtLeast(size, BufferType.OFF_HEAP); } void put(ByteBuffer buffer) { - BufferPool.put(buffer); + bufferPool.put(buffer); } void putUnusedPortion(ByteBuffer buffer) { - BufferPool.putUnusedPortion(buffer); + bufferPool.putUnusedPortion(buffer); } void release() @@ -100,7 +103,7 @@ public static class Wrapped extends UnpooledUnsafeDirectByteBuf public void deallocate() { if (wrapped != null) - BufferPool.put(wrapped); + bufferPool.put(wrapped); } public ByteBuffer adopt() diff --git a/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java b/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java index f2556a5c880e..5f5d599ba1f4 100644 --- a/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java +++ b/src/java/org/apache/cassandra/net/FrameDecoderLegacyLZ4.java @@ -32,6 +32,7 @@ import net.jpountz.xxhash.XXHash32; import net.jpountz.xxhash.XXHashFactory; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; import static java.lang.Integer.reverseBytes; import static java.lang.String.format; @@ -46,6 +47,8 @@ */ class FrameDecoderLegacyLZ4 extends FrameDecoderLegacy { + private static final BufferPool bufferPool = BufferPoolManager.ephemeral(); + FrameDecoderLegacyLZ4(BufferPoolAllocator allocator, int messagingVersion) { super(allocator, messagingVersion); @@ -122,7 +125,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws CorruptLZ4 assert msg instanceof BufferPoolAllocator.Wrapped; ByteBuffer buf = ((BufferPoolAllocator.Wrapped) msg).adopt(); // netty will probably have mis-predicted the space needed - BufferPool.putUnusedPortion(buf); + bufferPool.putUnusedPortion(buf); CorruptLZ4Frame error = null; try @@ -251,7 +254,7 @@ private ShareableBytes decompressFrame(ShareableBytes bytes, int begin, int end, } catch (Throwable t) { - BufferPool.put(out); + bufferPool.put(out); throw t; } } @@ -268,7 +271,7 @@ public void channelInactive(ChannelHandlerContext ctx) { if (null != stash) { - BufferPool.put(stash); + bufferPool.put(stash); stash = null; } @@ -347,7 +350,7 @@ private ByteBuffer ensureCapacity(ByteBuffer in, int capacity) ByteBuffer out = allocator.getAtLeast(capacity); in.flip(); out.put(in); - BufferPool.put(in); + bufferPool.put(in); return out; } diff --git a/src/java/org/apache/cassandra/net/FrameEncoder.java b/src/java/org/apache/cassandra/net/FrameEncoder.java index d9df1666b785..e04e9c61c7e3 100644 --- a/src/java/org/apache/cassandra/net/FrameEncoder.java +++ b/src/java/org/apache/cassandra/net/FrameEncoder.java @@ -25,9 +25,12 @@ import io.netty.channel.ChannelPromise; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; abstract class FrameEncoder extends ChannelOutboundHandlerAdapter { + protected static final BufferPool bufferPool = BufferPoolManager.ephemeral(); + /** * An abstraction useful for transparently allocating buffers that can be written to upstream * of the {@code FrameEncoder} without knowledge of the encoder's frame layout, while ensuring @@ -57,7 +60,7 @@ static class Payload this.headerLength = headerLength; this.trailerLength = trailerLength; - buffer = BufferPool.getAtLeast(payloadCapacity + headerLength + trailerLength, BufferType.OFF_HEAP); + buffer = bufferPool.getAtLeast(payloadCapacity + headerLength + trailerLength, BufferType.OFF_HEAP); assert buffer.capacity() >= payloadCapacity + headerLength + trailerLength; buffer.position(headerLength); buffer.limit(buffer.capacity() - trailerLength); @@ -103,12 +106,12 @@ void finish() isFinished = true; buffer.limit(buffer.position() + trailerLength); buffer.position(0); - BufferPool.putUnusedPortion(buffer); + bufferPool.putUnusedPortion(buffer); } void release() { - BufferPool.put(buffer); + bufferPool.put(buffer); } } diff --git a/src/java/org/apache/cassandra/net/FrameEncoderCrc.java b/src/java/org/apache/cassandra/net/FrameEncoderCrc.java index 2d07d6d1cbc0..5049f292677d 100644 --- a/src/java/org/apache/cassandra/net/FrameEncoderCrc.java +++ b/src/java/org/apache/cassandra/net/FrameEncoderCrc.java @@ -91,7 +91,7 @@ ByteBuf encode(boolean isSelfContained, ByteBuffer frame) } catch (Throwable t) { - BufferPool.put(frame); + bufferPool.put(frame); throw t; } } diff --git a/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java b/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java index 12351ce887ba..2d76170b027d 100644 --- a/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java +++ b/src/java/org/apache/cassandra/net/FrameEncoderLZ4.java @@ -27,7 +27,6 @@ import net.jpountz.lz4.LZ4Factory; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.memory.BufferPool; import static org.apache.cassandra.net.Crc.*; @@ -74,7 +73,7 @@ public ByteBuf encode(boolean isSelfContained, ByteBuffer in) throw new IllegalArgumentException("Maximum uncompressed payload size is 128KiB"); int maxOutputLength = compressor.maxCompressedLength(uncompressedLength); - frame = BufferPool.getAtLeast(HEADER_AND_TRAILER_LENGTH + maxOutputLength, BufferType.OFF_HEAP); + frame = bufferPool.getAtLeast(HEADER_AND_TRAILER_LENGTH + maxOutputLength, BufferType.OFF_HEAP); int compressedLength = compressor.compress(in, in.position(), uncompressedLength, frame, HEADER_LENGTH, maxOutputLength); @@ -101,18 +100,18 @@ public ByteBuf encode(boolean isSelfContained, ByteBuffer in) frame.putInt(frameCrc); frame.position(0); - BufferPool.putUnusedPortion(frame); + bufferPool.putUnusedPortion(frame); return GlobalBufferPoolAllocator.wrap(frame); } catch (Throwable t) { if (frame != null) - BufferPool.put(frame); + bufferPool.put(frame); throw t; } finally { - BufferPool.put(in); + bufferPool.put(in); } } } diff --git a/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java b/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java index 3b29ecb7ae56..000fab7b0087 100644 --- a/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java +++ b/src/java/org/apache/cassandra/net/FrameEncoderLegacyLZ4.java @@ -68,7 +68,7 @@ ByteBuf encode(boolean isSelfContained, ByteBuffer payload) ByteBuffer frame = null; try { - frame = BufferPool.getAtLeast(calculateMaxFrameLength(payload), BufferType.OFF_HEAP); + frame = bufferPool.getAtLeast(calculateMaxFrameLength(payload), BufferType.OFF_HEAP); int frameOffset = 0; int payloadOffset = 0; @@ -82,19 +82,19 @@ ByteBuf encode(boolean isSelfContained, ByteBuffer payload) } frame.limit(frameOffset); - BufferPool.putUnusedPortion(frame); + bufferPool.putUnusedPortion(frame); return GlobalBufferPoolAllocator.wrap(frame); } catch (Throwable t) { if (null != frame) - BufferPool.put(frame); + bufferPool.put(frame); throw t; } finally { - BufferPool.put(payload); + bufferPool.put(payload); } } diff --git a/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java b/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java index 3bca41c25532..6158713eae23 100644 --- a/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java +++ b/src/java/org/apache/cassandra/net/FrameEncoderUnprotected.java @@ -59,7 +59,7 @@ ByteBuf encode(boolean isSelfContained, ByteBuffer frame) } catch (Throwable t) { - BufferPool.put(frame); + bufferPool.put(frame); throw t; } } diff --git a/src/java/org/apache/cassandra/net/HandshakeProtocol.java b/src/java/org/apache/cassandra/net/HandshakeProtocol.java index 47d0ec6dffa3..f9a441ae7fbb 100644 --- a/src/java/org/apache/cassandra/net/HandshakeProtocol.java +++ b/src/java/org/apache/cassandra/net/HandshakeProtocol.java @@ -33,6 +33,7 @@ import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer; @@ -135,7 +136,7 @@ int encodeFlags() ByteBuf encode() { - ByteBuffer buffer = BufferPool.get(MAX_LENGTH, BufferType.OFF_HEAP); + ByteBuffer buffer = BufferPoolManager.ephemeral().get(MAX_LENGTH, BufferType.OFF_HEAP); try (DataOutputBufferFixed out = new DataOutputBufferFixed(buffer)) { out.writeInt(Message.PROTOCOL_MAGIC); @@ -347,7 +348,7 @@ static class ConfirmOutboundPre40 ByteBuf encode() { - ByteBuffer buffer = BufferPool.get(MAX_LENGTH, BufferType.OFF_HEAP); + ByteBuffer buffer = BufferPoolManager.ephemeral().get(MAX_LENGTH, BufferType.OFF_HEAP); try (DataOutputBufferFixed out = new DataOutputBufferFixed(buffer)) { out.writeInt(maxMessagingVersion); diff --git a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java index c390ba4287e2..ce825343ef3f 100644 --- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java @@ -53,6 +53,7 @@ import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.streaming.async.StreamingInboundHandler; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; import static java.lang.Math.*; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -391,7 +392,7 @@ private void setupStreamingPipeline(InetAddressAndPort from, ChannelHandlerConte from = InetAddressAndPort.getByAddressOverrideDefaults(address.getAddress(), address.getPort()); } - BufferPool.setRecycleWhenFreeForCurrentThread(false); + BufferPoolManager.ephemeral().setRecycleWhenFreeForCurrentThread(false); pipeline.replace(this, "streamInbound", new StreamingInboundHandler(from, current_version, null)); } @@ -402,7 +403,7 @@ void setupMessagingPipeline(InetAddressAndPort from, int useMessagingVersion, in // record the "true" endpoint, i.e. the one the peer is identified with, as opposed to the socket it connected over instance().versions.set(from, maxMessagingVersion); - BufferPool.setRecycleWhenFreeForCurrentThread(false); + BufferPoolManager.ephemeral().setRecycleWhenFreeForCurrentThread(false); BufferPoolAllocator allocator = GlobalBufferPoolAllocator.instance; if (initiate.type == ConnectionType.LARGE_MESSAGES) { diff --git a/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java b/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java index b2d487f888be..d88d15686b12 100644 --- a/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java +++ b/src/java/org/apache/cassandra/net/LocalBufferPoolAllocator.java @@ -21,6 +21,7 @@ import io.netty.channel.EventLoop; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; /** * Equivalent to {@link GlobalBufferPoolAllocator}, except explicitly using a specified @@ -36,7 +37,7 @@ class LocalBufferPoolAllocator extends BufferPoolAllocator LocalBufferPoolAllocator(EventLoop eventLoop) { - this.pool = new BufferPool.LocalPool().recycleWhenFree(false); + this.pool = BufferPoolManager.ephemeral().create().recycleWhenFree(false); this.eventLoop = eventLoop; } diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java index fdfb2dfa74e1..a406efa30e1c 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java @@ -55,6 +55,7 @@ import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; import static java.util.concurrent.TimeUnit.*; import static org.apache.cassandra.net.MessagingService.VERSION_40; @@ -338,7 +339,7 @@ else if (peerMessagingVersion < settings.acceptVersions.min) ChannelPipeline pipeline = ctx.pipeline(); if (result.isSuccess()) { - BufferPool.setRecycleWhenFreeForCurrentThread(false); + BufferPoolManager.ephemeral().setRecycleWhenFreeForCurrentThread(false); if (type.isMessaging()) { assert frameEncoder != null; diff --git a/src/java/org/apache/cassandra/net/ShareableBytes.java b/src/java/org/apache/cassandra/net/ShareableBytes.java index e4f24608e4d2..89fcc344bcdf 100644 --- a/src/java/org/apache/cassandra/net/ShareableBytes.java +++ b/src/java/org/apache/cassandra/net/ShareableBytes.java @@ -21,9 +21,10 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; /** - * A wrapper for possibly sharing portions of a single, {@link BufferPool} managed, {@link ByteBuffer}; + * A wrapper for possibly sharing portions of a single, {@link BufferPoolManager#ephemeral()} managed, {@link ByteBuffer}; * optimised for the case where no sharing is necessary. * * When sharing is necessary, {@link #share()} method must be invoked by the owning thread @@ -136,7 +137,7 @@ else if (count > 0) throw new IllegalStateException("Already released"); if (count == RELEASED) - BufferPool.put(bytes); + BufferPoolManager.ephemeral().put(bytes); } boolean isReleased() diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java index 3fc799205ae7..ea6a26939c9b 100644 --- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java +++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java @@ -29,6 +29,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Supplier; @@ -41,7 +42,6 @@ import org.slf4j.LoggerFactory; import io.netty.util.concurrent.FastThreadLocal; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.BufferPoolMetrics; @@ -68,23 +68,21 @@ public class BufferPool public static final int TINY_ALLOCATION_UNIT = TINY_CHUNK_SIZE / 64; public static final int TINY_ALLOCATION_LIMIT = TINY_CHUNK_SIZE / 2; - private final static BufferPoolMetrics metrics = new BufferPoolMetrics(); - - // TODO: this should not be using FileCacheSizeInMB - @VisibleForTesting - public static long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L; - - private static Debug debug; - private static final Logger logger = LoggerFactory.getLogger(BufferPool.class); private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES); private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0); + private Debug debug; + + protected final String name; + protected final BufferPoolMetrics metrics; + private long memoryUsageThreshold; + /** A global pool of chunks (page aligned buffers) */ - private static final GlobalPool globalPool = new GlobalPool(); + private final GlobalPool globalPool; /** A thread local pool of chunks, where chunks come from the global pool */ - private static final FastThreadLocal localPool = new FastThreadLocal() + private final FastThreadLocal localPool = new FastThreadLocal() { @Override protected LocalPool initialValue() @@ -98,7 +96,23 @@ protected void onRemoval(LocalPool value) } }; - public static ByteBuffer get(int size, BufferType bufferType) + public BufferPool(String name, long memoryUsageThreshold) + { + this.name = name; + this.memoryUsageThreshold = memoryUsageThreshold; + this.globalPool = new GlobalPool(); + this.metrics = new BufferPoolMetrics(name, this); + } + + /** + * @return a local pool instance and caller is responsible to release the pool + */ + public LocalPool create() + { + return new LocalPool(); + } + + public ByteBuffer get(int size, BufferType bufferType) { if (bufferType == BufferType.ON_HEAP) return allocate(size, bufferType); @@ -106,7 +120,7 @@ public static ByteBuffer get(int size, BufferType bufferType) return localPool.get().get(size); } - public static ByteBuffer getAtLeast(int size, BufferType bufferType) + public ByteBuffer getAtLeast(int size, BufferType bufferType) { if (bufferType == BufferType.ON_HEAP) return allocate(size, bufferType); @@ -115,12 +129,12 @@ public static ByteBuffer getAtLeast(int size, BufferType bufferType) } /** Unlike the get methods, this will return null if the pool is exhausted */ - public static ByteBuffer tryGet(int size) + public ByteBuffer tryGet(int size) { return localPool.get().tryGet(size, false); } - public static ByteBuffer tryGetAtLeast(int size) + public ByteBuffer tryGetAtLeast(int size) { return localPool.get().tryGet(size, true); } @@ -132,13 +146,13 @@ private static ByteBuffer allocate(int size, BufferType bufferType) : ByteBuffer.allocateDirect(size); } - public static void put(ByteBuffer buffer) + public void put(ByteBuffer buffer) { if (isExactlyDirect(buffer)) localPool.get().put(buffer); } - public static void putUnusedPortion(ByteBuffer buffer) + public void putUnusedPortion(ByteBuffer buffer) { if (isExactlyDirect(buffer)) { @@ -150,30 +164,55 @@ public static void putUnusedPortion(ByteBuffer buffer) } } - public static void setRecycleWhenFreeForCurrentThread(boolean recycleWhenFree) + public void setRecycleWhenFreeForCurrentThread(boolean recycleWhenFree) { localPool.get().recycleWhenFree(recycleWhenFree); } - public static long sizeInBytes() + public long sizeInBytes() { return globalPool.sizeInBytes(); } + public long memoryUsageThreshold() + { + return memoryUsageThreshold; + } + + @VisibleForTesting + public void unsafeSetMemoryUsageThreshold(long memoryUsageThreshold) + { + this.memoryUsageThreshold = memoryUsageThreshold; + } + interface Debug { void registerNormal(Chunk chunk); void recycleNormal(Chunk oldVersion, Chunk newVersion); + void recyclePartial(Chunk chunk); } - public static void debug(Debug setDebug) + public void debug(Debug setDebug) { - debug = setDebug; + this.debug = setDebug; } interface Recycler { + /** + * Recycle a fully freed chunk + */ void recycle(Chunk chunk); + + /** + * @return true if chunk can be reused before fully freed. + */ + boolean canRecyclePartially(); + + /** + * Recycle a partially freed chunk + */ + void recyclePartially(Chunk chunk); } /** @@ -183,26 +222,26 @@ interface Recycler * * This class is shared by multiple thread local pools and must be thread-safe. */ - static final class GlobalPool implements Supplier, Recycler + final class GlobalPool implements Supplier, Recycler { /** The size of a bigger chunk, 1 MiB, must be a multiple of NORMAL_CHUNK_SIZE */ static final int MACRO_CHUNK_SIZE = 64 * NORMAL_CHUNK_SIZE; - static + private final Queue macroChunks = new ConcurrentLinkedQueue<>(); + // TODO (future): it would be preferable to use a CLStack to improve cache occupancy; it would also be preferable to use "CoreLocal" storage + // It contains fully free chunk and partially freed chunk which is recirculated whenever chunk has free spaces to + // improve buffer utilization when chunk cache is holding a piece of buffering for long period. + // Note: fragmentation still exists, as holes are with different space. + private final Queue chunks = new ConcurrentLinkedQueue<>(); + private final AtomicLong memoryUsage = new AtomicLong(); + + public GlobalPool() { assert Integer.bitCount(NORMAL_CHUNK_SIZE) == 1; // must be a power of 2 assert Integer.bitCount(MACRO_CHUNK_SIZE) == 1; // must be a power of 2 assert MACRO_CHUNK_SIZE % NORMAL_CHUNK_SIZE == 0; // must be a multiple - - logger.info("Global buffer pool limit is {}", - prettyPrintMemory(MEMORY_USAGE_THRESHOLD)); } - private final Queue macroChunks = new ConcurrentLinkedQueue<>(); - // TODO (future): it would be preferable to use a CLStack to improve cache occupancy; it would also be preferable to use "CoreLocal" storage - private final Queue chunks = new ConcurrentLinkedQueue<>(); - private final AtomicLong memoryUsage = new AtomicLong(); - /** Return a chunk, the caller will take owership of the parent chunk. */ public Chunk get() { @@ -227,12 +266,13 @@ private Chunk allocateMoreChunks() while (true) { long cur = memoryUsage.get(); - if (cur + MACRO_CHUNK_SIZE > MEMORY_USAGE_THRESHOLD) + if (cur + MACRO_CHUNK_SIZE > memoryUsageThreshold) { - if (MEMORY_USAGE_THRESHOLD > 0) + if (memoryUsageThreshold > 0) { - noSpamLogger.info("Maximum memory usage reached ({}), cannot allocate chunk of {}", - prettyPrintMemory(MEMORY_USAGE_THRESHOLD), + noSpamLogger.info("{} pool maximum memory usage reached ({}), cannot allocate chunk of {}", + name, + prettyPrintMemory(memoryUsageThreshold), prettyPrintMemory(MACRO_CHUNK_SIZE)); } return null; @@ -249,9 +289,10 @@ private Chunk allocateMoreChunks() } catch (OutOfMemoryError oom) { - noSpamLogger.error("Buffer pool failed to allocate chunk of {}, current size {} ({}). " + + noSpamLogger.error("{} buffer pool failed to allocate chunk of {}, current size {} ({}). " + "Attempting to continue; buffers will be allocated in on-heap memory which can degrade performance. " + "Make sure direct memory size (-XX:MaxDirectMemorySize) is large enough to accommodate off-heap memtables and caches.", + name, prettyPrintMemory(MACRO_CHUNK_SIZE), prettyPrintMemory(sizeInBytes()), oom.toString()); @@ -274,6 +315,7 @@ private Chunk allocateMoreChunks() return callerChunk; } + @Override public void recycle(Chunk chunk) { Chunk recycleAs = new Chunk(chunk); @@ -282,6 +324,20 @@ public void recycle(Chunk chunk) chunks.add(recycleAs); } + @Override + public void recyclePartially(Chunk chunk) + { + if (debug != null) + debug.recyclePartial(chunk); + chunks.add(chunk); + } + + @Override + public boolean canRecyclePartially() + { + return true; + } + public long sizeInBytes() { return memoryUsage.get(); @@ -515,7 +571,7 @@ private void unsafeRecycle() * A thread local class that grabs chunks from the global pool for this thread allocations. * Only one thread can do the allocations but multiple threads can release the allocations. */ - public static final class LocalPool implements Recycler + public final class LocalPool implements Recycler { private final Queue reuseObjects; private final Supplier parent; @@ -545,9 +601,7 @@ private LocalPool(LocalPool parent) { this.parent = () -> { ByteBuffer buffer = parent.tryGetInternal(TINY_CHUNK_SIZE, false); - if (buffer == null) - return null; - return new Chunk(parent, buffer); + return buffer == null ? null : new Chunk(parent, buffer); }; this.tinyLimit = 0; // we only currently permit one layer of nesting (which brings us down to 32 byte allocations, so is plenty) this.reuseObjects = parent.reuseObjects; // we share the same ByteBuffer object reuse pool, as we both have the same exclusive access to it @@ -580,22 +634,36 @@ public void put(ByteBuffer buffer, Chunk chunk) } // ask the free method to take exclusive ownership of the act of recycling - // if we are either: already not owned by anyone, or owned by ourselves - long free = chunk.free(buffer, owner == null || (owner == this && recycleWhenFree)); + // if chunk is owned by ourselves + long free = chunk.free(buffer, owner == this && recycleWhenFree); + // free: + // * 0L: current pool must be the owner. we can fully recyle the chunk. + // * -1L: + // * for normal chunk: + // a) if current pool is owner, we can continue using the chunk. do nothing. + // b) if current pool is not owner, the chunk must have been partially recycled. do nothing + // * for tiny chunk: + // a) if current pool is owner, we can continue using the chunk. do nothing. + // b) if current pool is not owner, recycle the tiny chunk back to parent chunk + // * others: parent will try to partially recycle the chunk if (free == 0L) { + assert owner == this; // 0L => we own recycling responsibility, so must recycle; - // if we are the owner, we must remove the Chunk from our local queue - if (owner == this) - remove(chunk); + // We must remove the Chunk from our local queue + remove(chunk); chunk.recycle(); } - else if (((free == -1L) && owner != this) && chunk.owner == null) + else if (free == -1L && owner == null && !chunk.recycler.canRecyclePartially()) { - // although we try to take recycle ownership cheaply, it is not always possible to do so if the owner is racing to unset. - // we must also check after completely freeing if the owner has since been unset, and try to recycle + // recycle evicted tiny chunk chunk.tryRecycle(); } + else if (chunk.recycler.canRecyclePartially() && chunk.status() == Chunk.Status.OFFLINE && chunk.setOnline()) + { + // re-cirlate partially freed normal chunk to global list + chunk.partiallyRecycle(); + } if (owner == this) { @@ -628,7 +696,10 @@ private ByteBuffer get(int size, boolean sizeIsLowerBound) { ByteBuffer ret = tryGet(size, sizeIsLowerBound); if (ret != null) + { + metrics.hits.mark(); return ret; + } if (size > NORMAL_CHUNK_SIZE) { @@ -647,16 +718,6 @@ private ByteBuffer get(int size, boolean sizeIsLowerBound) return allocate(size, BufferType.OFF_HEAP); } - public ByteBuffer tryGet(int size) - { - return tryGet(size, false); - } - - public ByteBuffer tryGetAtLeast(int size) - { - return tryGet(size, true); - } - private ByteBuffer tryGet(int size, boolean sizeIsLowerBound) { LocalPool pool = this; @@ -685,7 +746,9 @@ private ByteBuffer tryGetInternal(int size, boolean sizeIsLowerBound) ByteBuffer reuse = this.reuseObjects.poll(); ByteBuffer buffer = chunks.get(size, sizeIsLowerBound, reuse); if (buffer != null) + { return buffer; + } // else ask the global pool Chunk chunk = addChunkFromParent(); @@ -701,7 +764,8 @@ private ByteBuffer tryGetInternal(int size, boolean sizeIsLowerBound) return null; } - // recycle + // recycle entire tiny chunk from tiny pool back to local pool + @Override public void recycle(Chunk chunk) { ByteBuffer buffer = chunk.slab; @@ -709,6 +773,18 @@ public void recycle(Chunk chunk) put(buffer, parentChunk); } + @Override + public void recyclePartially(Chunk chunk) + { + throw new UnsupportedOperationException("Tiny chunk doesn't support partial recycle."); + } + + @Override + public boolean canRecyclePartially() + { + return false; + } + private void remove(Chunk chunk) { chunks.remove(chunk); @@ -735,6 +811,7 @@ private void addChunk(Chunk chunk) if (tinyPool != null) tinyPool.chunks.removeIf((child, parent) -> Chunk.getParentChunk(child.slab) == parent, evict); evict.release(); + evict.setOffline(); } } @@ -778,12 +855,12 @@ public void release() } } - private static final Set localPoolReferences = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set localPoolReferences = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private static final ReferenceQueue localPoolRefQueue = new ReferenceQueue<>(); - private static final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("LocalPool-Cleaner", BufferPool::cleanupOneReference).start(); + private final ReferenceQueue localPoolRefQueue = new ReferenceQueue<>(); + private final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("LocalPool-Cleaner", this::cleanupOneReference).start(); - private static void cleanupOneReference() throws InterruptedException + private void cleanupOneReference() throws InterruptedException { Object obj = localPoolRefQueue.remove(100); if (obj instanceof LocalPoolRef) @@ -835,8 +912,16 @@ private static ByteBuffer allocateDirectAligned(int capacity) */ final static class Chunk { + enum Status + { + /** The slab is serving requests */ + ONLINE, + /** The slab is not serving requests (for example because it is full) */ + OFFLINE + } + private final ByteBuffer slab; - private final long baseAddress; + final long baseAddress; private final int shift; private volatile long freeSlots; @@ -848,6 +933,10 @@ final static class Chunk private volatile LocalPool owner; private final Recycler recycler; + private static final AtomicReferenceFieldUpdater statusUpdater = + AtomicReferenceFieldUpdater.newUpdater(Chunk.class, Status.class, "status"); + private volatile Status status = Status.ONLINE; + @VisibleForTesting Object debugAttachment; @@ -909,6 +998,13 @@ void recycle() recycler.recycle(this); } + public void partiallyRecycle() + { + assert owner == null; + assert free() > 0; + recycler.recyclePartially(this); + } + /** * We stash the chunk in the attachment of a buffer * that was returned by get(), this method simply @@ -1151,6 +1247,12 @@ public String toString() return String.format("[slab %s, slots bitmap %s, capacity %d, free %d]", slab, Long.toBinaryString(freeSlots), capacity(), free()); } + @VisibleForTesting + public LocalPool owner() + { + return this.owner; + } + @VisibleForTesting void unsafeFree() { @@ -1170,6 +1272,26 @@ static void unsafeRecycle(Chunk chunk) chunk.recycle(); } } + + Status status() + { + return status; + } + + private boolean setStatus(Status current, Status update) + { + return statusUpdater.compareAndSet(this, current, update); + } + + boolean setOnline() + { + return setStatus(Status.OFFLINE, Status.ONLINE); + } + + boolean setOffline() + { + return setStatus(Status.ONLINE, Status.OFFLINE); + } } @VisibleForTesting @@ -1188,20 +1310,20 @@ public static int roundUp(int size, int unit) } @VisibleForTesting - public static void shutdownLocalCleaner(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException + public void shutdownLocalCleaner(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { shutdownNow(of(EXEC)); awaitTermination(timeout, unit, of(EXEC)); } - public static long unsafeGetBytesInUse() + public long unsafeGetBytesInUse() { long totalMemory = globalPool.memoryUsage.get(); class L { long v; } final L availableMemory = new L(); for (Chunk chunk : globalPool.chunks) { - availableMemory.v += chunk.capacity(); + availableMemory.v += chunk.free(); } for (LocalPoolRef ref : localPoolReferences) { @@ -1212,20 +1334,20 @@ class L { long v; } /** This is not thread safe and should only be used for unit testing. */ @VisibleForTesting - static void unsafeReset() + void unsafeReset() { localPool.get().unsafeRecycle(); globalPool.unsafeFree(); } @VisibleForTesting - static Chunk unsafeCurrentChunk() + Chunk unsafeCurrentChunk() { return localPool.get().chunks.chunk0; } @VisibleForTesting - static int unsafeNumChunks() + int unsafeNumChunks() { LocalPool pool = localPool.get(); return (pool.chunks.chunk0 != null ? 1 : 0) diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPoolManager.java b/src/java/org/apache/cassandra/utils/memory/BufferPoolManager.java new file mode 100644 index 000000000000..4e306f417c3b --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/BufferPoolManager.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.memory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; + +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +public class BufferPoolManager +{ + private static final Logger logger = LoggerFactory.getLogger(BufferPoolManager.class); + + // TODO: this should not be using FileCacheSizeInMB + @VisibleForTesting + public static long FILE_MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L; + private static final BufferPool LONG_LIVED = new BufferPool("Permanent", FILE_MEMORY_USAGE_THRESHOLD); + + public static long NETWORK_MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getNetworkCacheSizeInMB() * 1024L * 1024L; + private static final BufferPool EPHEMERAL = new BufferPool("Ephemeral", NETWORK_MEMORY_USAGE_THRESHOLD); + + static + { + logger.info("Global buffer pool limit is {} for {} and {} for {}", + prettyPrintMemory(FILE_MEMORY_USAGE_THRESHOLD), LONG_LIVED.name, + prettyPrintMemory(NETWORK_MEMORY_USAGE_THRESHOLD), EPHEMERAL.name); + } + /** + * Long-lived buffers used for chunk cache and other disk access + */ + public static BufferPool longLived() + { + return LONG_LIVED; + } + + /** + * Short-lived buffers used for internode messaging or client-server connections. + */ + public static BufferPool ephemeral() + { + return EPHEMERAL; + } + + public static void shutdownLocalCleaner(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException + { + LONG_LIVED.shutdownLocalCleaner(timeout, unit); + EPHEMERAL.shutdownLocalCleaner(timeout, unit); + } + +} diff --git a/src/java/org/apache/cassandra/utils/memory/PermanentBufferPool.java b/src/java/org/apache/cassandra/utils/memory/PermanentBufferPool.java new file mode 100644 index 000000000000..6425c137db6b --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/PermanentBufferPool.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.memory; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; + +public class PermanentBufferPool +{ + public static void main(String args[]) + { + // netty + ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT; + } +} diff --git a/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java b/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java index 57eb726c4ecf..8ab2cd0f0526 100644 --- a/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java +++ b/test/burn/org/apache/cassandra/net/ConnectionBurnTest.java @@ -58,6 +58,7 @@ import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; import static java.lang.Math.min; import static org.apache.cassandra.net.MessagingService.current_version; @@ -430,7 +431,7 @@ public void run() throws ExecutionException, InterruptedException, NoSuchFieldEx checkStoppedTo .accept(endpoint, getConnections(endpoint, true )); checkStoppedFrom.accept(endpoint, getConnections(endpoint, false)); } - long inUse = BufferPool.unsafeGetBytesInUse(); + long inUse = BufferPoolManager.ephemeral().unsafeGetBytesInUse(); if (inUse > 0) { // try diff --git a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java index c8368dd626f0..adca3e0dc251 100644 --- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java +++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -70,6 +71,8 @@ public class LongBufferPoolTest private static final int STDEV_BUFFER_SIZE = 10 << 10; // picked to ensure exceeding buffer size is rare, but occurs private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); + private static BufferPool bufferPool; + static final class Debug implements BufferPool.Debug { static class DebugChunk @@ -84,7 +87,6 @@ static DebugChunk get(BufferPool.Chunk chunk) } long recycleRound = 1; final List normalChunks = new ArrayList<>(); - final List tinyChunks = new ArrayList<>(); public synchronized void registerNormal(BufferPool.Chunk chunk) { chunk.debugAttachment = new DebugChunk(); @@ -95,13 +97,14 @@ public void recycleNormal(BufferPool.Chunk oldVersion, BufferPool.Chunk newVersi newVersion.debugAttachment = oldVersion.debugAttachment; DebugChunk.get(oldVersion).lastRecycled = recycleRound; } + public void recyclePartial(BufferPool.Chunk chunk) + { + DebugChunk.get(chunk).lastRecycled = recycleRound; + } public synchronized void check() { -// for (BufferPool.Chunk chunk : tinyChunks) -// assert DebugChunk.get(chunk).lastRecycled == recycleRound; for (BufferPool.Chunk chunk : normalChunks) assert DebugChunk.get(chunk).lastRecycled == recycleRound; - tinyChunks.clear(); // they don't survive a recycleRound recycleRound++; } } @@ -110,6 +113,13 @@ public synchronized void check() public static void setup() throws Exception { DatabaseDescriptor.daemonInitialization(); + bufferPool = BufferPoolManager.longLived(); + } + + @AfterClass + public static void teardown() + { + bufferPool.unsafeReset(); } @Test @@ -244,11 +254,11 @@ public void testAllocate(int threadCount, long duration, int poolSize) throws In DATE_FORMAT.format(new Date()), threadCount, TimeUnit.NANOSECONDS.toMinutes(duration))); - long prevPoolSize = BufferPool.MEMORY_USAGE_THRESHOLD; + long prevPoolSize = bufferPool.memoryUsageThreshold(); logger.info("Overriding configured BufferPool.MEMORY_USAGE_THRESHOLD={} and enabling BufferPool.DEBUG", poolSize); - BufferPool.MEMORY_USAGE_THRESHOLD = poolSize; + bufferPool.unsafeSetMemoryUsageThreshold(poolSize); Debug debug = new Debug(); - BufferPool.debug(debug); + bufferPool.debug(debug); TestEnvironment testEnv = new TestEnvironment(threadCount, duration, poolSize); @@ -281,15 +291,15 @@ public void testAllocate(int threadCount, long duration, int poolSize) throws In while ( null != (check = queue.poll()) ) { check.validate(); - BufferPool.put(check.buffer); + bufferPool.put(check.buffer); } } assertEquals(0, testEnv.executorService.shutdownNow().size()); logger.info("Reverting BufferPool.MEMORY_USAGE_THRESHOLD={}", prevPoolSize); - BufferPool.MEMORY_USAGE_THRESHOLD = prevPoolSize; - BufferPool.debug(null); + bufferPool.unsafeSetMemoryUsageThreshold(prevPoolSize); + bufferPool.debug(null); testEnv.assertCheckedThreadsSucceeded(); @@ -362,7 +372,7 @@ else if (!recycleFromNeighbour()) else { check.validate(); - BufferPool.put(check.buffer); + bufferPool.put(check.buffer); totalSize -= size; } } @@ -407,7 +417,7 @@ void cleanup() while (checks.size() > 0) { BufferCheck check = checks.get(0); - BufferPool.put(check.buffer); + bufferPool.put(check.buffer); checks.remove(check.listnode); } testEnv.latch.countDown(); @@ -419,13 +429,13 @@ boolean recycleFromNeighbour() if (check == null) return false; check.validate(); - BufferPool.put(check.buffer); + bufferPool.put(check.buffer); return true; } BufferCheck allocate(int size) { - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); BufferCheck check = new BufferCheck(buffer, rand.nextLong()); assertEquals(size, buffer.capacity()); @@ -494,8 +504,9 @@ void testOne() throws Exception return; } - ByteBuffer buffer = rand.nextInt(4) < 1 ? BufferPool.tryGet(BufferPool.NORMAL_CHUNK_SIZE) - : BufferPool.tryGet(BufferPool.TINY_ALLOCATION_LIMIT); + ByteBuffer buffer = rand.nextInt(4) < 1 + ? bufferPool.tryGet(BufferPool.NORMAL_CHUNK_SIZE) + : bufferPool.tryGet(BufferPool.TINY_ALLOCATION_LIMIT); if (buffer == null) { Thread.yield(); @@ -505,7 +516,7 @@ void testOne() throws Exception // 50/50 chance of returning the buffer from the producer thread, or // pass it on to the consumer. if (rand.nextBoolean()) - BufferPool.put(buffer); + bufferPool.put(buffer); else burn.add(buffer); @@ -526,7 +537,7 @@ void testOne() throws Exception Thread.yield(); return; } - BufferPool.put(buffer); + bufferPool.put(buffer); } void cleanup() { @@ -562,7 +573,7 @@ public Boolean call() throws Exception { logger.error("Got exception {}, current chunk {}", ex.getMessage(), - BufferPool.unsafeCurrentChunk()); + bufferPool.unsafeCurrentChunk()); ex.printStackTrace(); return false; } @@ -570,7 +581,7 @@ public Boolean call() throws Exception { logger.error("Got throwable {}, current chunk {}", tr.getMessage(), - BufferPool.unsafeCurrentChunk()); + bufferPool.unsafeCurrentChunk()); tr.printStackTrace(); return false; } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 90d747ef9d88..73f976bc0b22 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -100,6 +100,7 @@ import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.Ref; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; @@ -533,7 +534,7 @@ public Future shutdown(boolean graceful) () -> IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES), () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES), () -> PendingRangeCalculatorService.instance.shutdownAndWait(1L, MINUTES), - () -> BufferPool.shutdownLocalCleaner(1L, MINUTES), + () -> BufferPoolManager.shutdownLocalCleaner(1L, MINUTES), () -> Ref.shutdownReferenceReaper(1L, MINUTES), () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES), () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES), diff --git a/test/unit/org/apache/cassandra/net/FramingTest.java b/test/unit/org/apache/cassandra/net/FramingTest.java index 8a7f4283bdfa..70e2f00f109c 100644 --- a/test/unit/org/apache/cassandra/net/FramingTest.java +++ b/test/unit/org/apache/cassandra/net/FramingTest.java @@ -42,6 +42,7 @@ import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.memory.BufferPool; +import org.apache.cassandra.utils.memory.BufferPoolManager; import org.apache.cassandra.utils.vint.VIntCoding; import static java.lang.Math.*; @@ -197,7 +198,7 @@ private static SequenceOfFrames sequenceOfFrames(Random random, FrameEncoder enc cumulativeCompressedLength[i] = (i == 0 ? 0 : cumulativeCompressedLength[i - 1]) + buffer.readableBytes(); } - ByteBuffer frames = BufferPool.getAtLeast(cumulativeCompressedLength[frameCount - 1], BufferType.OFF_HEAP); + ByteBuffer frames = BufferPoolManager.ephemeral().getAtLeast(cumulativeCompressedLength[frameCount - 1], BufferType.OFF_HEAP); for (ByteBuf buffer : compressed) { frames.put(buffer.internalNioBuffer(buffer.readerIndex(), buffer.readableBytes())); @@ -412,7 +413,7 @@ private static SequenceOfFrames sequenceOfMessages(Random random, float largeRat cumulativeLength[i] = (i == 0 ? 0 : cumulativeLength[i - 1]) + message.length; } - ByteBuffer frames = BufferPool.getAtLeast(cumulativeLength[messageCount - 1], BufferType.OFF_HEAP); + ByteBuffer frames = BufferPoolManager.ephemeral().getAtLeast(cumulativeLength[messageCount - 1], BufferType.OFF_HEAP); for (byte[] buffer : messages) frames.put(buffer); frames.flip(); diff --git a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java index 806f49d2a3f8..a0078486ef52 100644 --- a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java +++ b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java @@ -37,6 +37,8 @@ public class BufferPoolTest { + private BufferPool bufferPool = BufferPoolManager.longLived(); + @BeforeClass public static void setupDD() { @@ -46,13 +48,13 @@ public static void setupDD() @Before public void setUp() { - BufferPool.MEMORY_USAGE_THRESHOLD = 8 * 1024L * 1024L; + bufferPool.unsafeSetMemoryUsageThreshold(8 * 1024L * 1024L); } @After public void cleanUp() { - BufferPool.unsafeReset(); + bufferPool.unsafeReset(); } @Test @@ -60,18 +62,18 @@ public void testGetPut() throws InterruptedException { final int size = RandomAccessReader.DEFAULT_BUFFER_SIZE; - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(size, buffer.capacity()); assertEquals(true, buffer.isDirect()); - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); - assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes()); + assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes()); - BufferPool.put(buffer); - assertEquals(null, BufferPool.unsafeCurrentChunk()); - assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes()); + bufferPool.put(buffer); + assertEquals(null, bufferPool.unsafeCurrentChunk()); + assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes()); } @@ -89,7 +91,7 @@ public void testPageAligned() private void checkPageAligned(int size) { - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(size, buffer.capacity()); assertTrue(buffer.isDirect()); @@ -97,7 +99,7 @@ private void checkPageAligned(int size) long address = MemoryUtil.getAddress(buffer); assertTrue((address % MemoryUtil.pageSize()) == 0); - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test @@ -106,23 +108,23 @@ public void testDifferentSizes() throws InterruptedException final int size1 = 1024; final int size2 = 2048; - ByteBuffer buffer1 = BufferPool.get(size1, BufferType.OFF_HEAP); + ByteBuffer buffer1 = bufferPool.get(size1, BufferType.OFF_HEAP); assertNotNull(buffer1); assertEquals(size1, buffer1.capacity()); - ByteBuffer buffer2 = BufferPool.get(size2, BufferType.OFF_HEAP); + ByteBuffer buffer2 = bufferPool.get(size2, BufferType.OFF_HEAP); assertNotNull(buffer2); assertEquals(size2, buffer2.capacity()); - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); - assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes()); + assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes()); - BufferPool.put(buffer1); - BufferPool.put(buffer2); + bufferPool.put(buffer1); + bufferPool.put(buffer2); - assertEquals(null, BufferPool.unsafeCurrentChunk()); - assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes()); + assertEquals(null, bufferPool.unsafeCurrentChunk()); + assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, bufferPool.sizeInBytes()); } @Test @@ -140,14 +142,14 @@ public void testMaxMemoryExceededHeap() @Test public void testMaxMemoryExceeded_SameAsChunkSize() { - BufferPool.MEMORY_USAGE_THRESHOLD = BufferPool.GlobalPool.MACRO_CHUNK_SIZE; + bufferPool.unsafeSetMemoryUsageThreshold(BufferPool.GlobalPool.MACRO_CHUNK_SIZE); requestDoubleMaxMemory(); } @Test public void testMaxMemoryExceeded_SmallerThanChunkSize() { - BufferPool.MEMORY_USAGE_THRESHOLD = BufferPool.GlobalPool.MACRO_CHUNK_SIZE / 2; + bufferPool.unsafeSetMemoryUsageThreshold(BufferPool.GlobalPool.MACRO_CHUNK_SIZE / 2); requestDoubleMaxMemory(); } @@ -159,7 +161,7 @@ public void testRecycle() private void requestDoubleMaxMemory() { - requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, (int)(2 * BufferPool.MEMORY_USAGE_THRESHOLD)); + requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, (int)(2 * bufferPool.memoryUsageThreshold())); } private void requestUpToSize(int bufferSize, int totalSize) @@ -169,7 +171,7 @@ private void requestUpToSize(int bufferSize, int totalSize) List buffers = new ArrayList<>(numBuffers); for (int i = 0; i < numBuffers; i++) { - ByteBuffer buffer = BufferPool.get(bufferSize, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(bufferSize, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(bufferSize, buffer.capacity()); assertTrue(buffer.isDirect()); @@ -177,7 +179,7 @@ private void requestUpToSize(int bufferSize, int totalSize) } for (ByteBuffer buffer : buffers) - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test @@ -185,10 +187,10 @@ public void testBigRequest() { final int size = BufferPool.NORMAL_CHUNK_SIZE + 1; - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(size, buffer.capacity()); - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test @@ -200,25 +202,25 @@ public void testFillUpChunks() List buffers1 = new ArrayList<>(numBuffers); List buffers2 = new ArrayList<>(numBuffers); for (int i = 0; i < numBuffers; i++) - buffers1.add(BufferPool.get(size, BufferType.OFF_HEAP)); + buffers1.add(bufferPool.get(size, BufferType.OFF_HEAP)); - BufferPool.Chunk chunk1 = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk1 = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk1); for (int i = 0; i < numBuffers; i++) - buffers2.add(BufferPool.get(size, BufferType.OFF_HEAP)); + buffers2.add(bufferPool.get(size, BufferType.OFF_HEAP)); - assertEquals(2, BufferPool.unsafeNumChunks()); + assertEquals(2, bufferPool.unsafeNumChunks()); for (ByteBuffer buffer : buffers1) - BufferPool.put(buffer); + bufferPool.put(buffer); - assertEquals(1, BufferPool.unsafeNumChunks()); + assertEquals(1, bufferPool.unsafeNumChunks()); for (ByteBuffer buffer : buffers2) - BufferPool.put(buffer); + bufferPool.put(buffer); - assertEquals(0, BufferPool.unsafeNumChunks()); + assertEquals(0, bufferPool.unsafeNumChunks()); buffers2.clear(); } @@ -254,16 +256,16 @@ public void testRandomFrees() { doTestRandomFrees(12345567878L); - BufferPool.unsafeReset(); + bufferPool.unsafeReset(); doTestRandomFrees(20452249587L); - BufferPool.unsafeReset(); + bufferPool.unsafeReset(); doTestRandomFrees(82457252948L); - BufferPool.unsafeReset(); + bufferPool.unsafeReset(); doTestRandomFrees(98759284579L); - BufferPool.unsafeReset(); + bufferPool.unsafeReset(); doTestRandomFrees(19475257244L); } @@ -294,10 +296,10 @@ private void doTestFrees(final int size, final int maxFreeSlots, final int[] toR List buffers = new ArrayList<>(maxFreeSlots); for (int i = 0; i < maxFreeSlots; i++) { - buffers.add(BufferPool.get(size, BufferType.OFF_HEAP)); + buffers.add(bufferPool.get(size, BufferType.OFF_HEAP)); } - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertFalse(chunk.isFree()); int freeSize = BufferPool.NORMAL_CHUNK_SIZE - maxFreeSlots * size; @@ -309,7 +311,7 @@ private void doTestFrees(final int size, final int maxFreeSlots, final int[] toR assertNotNull(buffer); assertEquals(size, buffer.capacity()); - BufferPool.put(buffer); + bufferPool.put(buffer); freeSize += size; if (freeSize == chunk.capacity()) @@ -332,18 +334,18 @@ public void testDifferentSizeBuffersOnOneChunk() List buffers = new ArrayList<>(sizes.length); for (int i = 0; i < sizes.length; i++) { - ByteBuffer buffer = BufferPool.get(sizes[i], BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(sizes[i], BufferType.OFF_HEAP); assertNotNull(buffer); assertTrue(buffer.capacity() >= sizes[i]); buffers.add(buffer); - sum += BufferPool.unsafeCurrentChunk().roundUp(buffer.capacity()); + sum += bufferPool.unsafeCurrentChunk().roundUp(buffer.capacity()); } // else the test will fail, adjust sizes as required assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE); - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); Random rnd = new Random(); @@ -353,11 +355,11 @@ public void testDifferentSizeBuffersOnOneChunk() int index = rnd.nextInt(buffers.size()); ByteBuffer buffer = buffers.remove(index); - BufferPool.put(buffer); + bufferPool.put(buffer); } - BufferPool.put(buffers.remove(0)); + bufferPool.put(buffers.remove(0)); - assertEquals(null, BufferPool.unsafeCurrentChunk()); + assertEquals(null, bufferPool.unsafeCurrentChunk()); assertEquals(0, chunk.free()); } @@ -372,7 +374,7 @@ public void testChunkExhausted() List buffers = new ArrayList<>(sizes.length); for (int i = 0; i < sizes.length; i++) { - ByteBuffer buffer = BufferPool.get(sizes[i], BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(sizes[i], BufferType.OFF_HEAP); assertNotNull(buffer); assertTrue(buffer.capacity() >= sizes[i]); buffers.add(buffer); @@ -383,15 +385,15 @@ public void testChunkExhausted() // else the test will fail, adjust sizes as required assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE); - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); for (int i = 0; i < sizes.length; i++) { - BufferPool.put(buffers.get(i)); + bufferPool.put(buffers.get(i)); } - assertEquals(null, BufferPool.unsafeCurrentChunk()); + assertEquals(null, bufferPool.unsafeCurrentChunk()); assertEquals(0, chunk.free()); } @@ -406,22 +408,22 @@ public void testCompactIfOutOfCapacity() for (int i = 0; i < numBuffersInChunk; i++) { - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); buffers.add(buffer); addresses.add(MemoryUtil.getAddress(buffer)); } for (int i = numBuffersInChunk - 1; i >= 0; i--) - BufferPool.put(buffers.get(i)); + bufferPool.put(buffers.get(i)); buffers.clear(); for (int i = 0; i < numBuffersInChunk; i++) { - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(size, buffer.capacity()); - addresses.remove(MemoryUtil.getAddress(buffer)); + assert addresses.remove(MemoryUtil.getAddress(buffer)); buffers.add(buffer); } @@ -429,18 +431,18 @@ public void testCompactIfOutOfCapacity() assertTrue(addresses.isEmpty()); // all 5 released buffers were used for (ByteBuffer buffer : buffers) - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test public void testHeapBuffer() { - ByteBuffer buffer = BufferPool.get(1024, BufferType.ON_HEAP); + ByteBuffer buffer = bufferPool.get(1024, BufferType.ON_HEAP); assertNotNull(buffer); assertEquals(1024, buffer.capacity()); assertFalse(buffer.isDirect()); assertNotNull(buffer.array()); - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test @@ -488,17 +490,17 @@ public void testSingleBufferOneChunk() private void checkBuffer(int size) { - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertEquals(size, buffer.capacity()); if (size > 0 && size < BufferPool.NORMAL_CHUNK_SIZE) { - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); assertEquals(chunk.capacity(), chunk.free() + chunk.roundUp(size)); } - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test @@ -516,14 +518,14 @@ private void checkBuffers(int ... sizes) for (int size : sizes) { - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); assertEquals(size, buffer.capacity()); buffers.add(buffer); } for (ByteBuffer buffer : buffers) - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test @@ -535,57 +537,58 @@ public void testBuffersWithGivenSlots() private void checkBufferWithGivenSlots(int size, long freeSlots) { //first allocate to make sure there is a chunk - ByteBuffer buffer = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(size, BufferType.OFF_HEAP); // now get the current chunk and override the free slots mask - BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); long oldFreeSlots = chunk.setFreeSlots(freeSlots); // now check we can still get the buffer with the free slots mask changed - ByteBuffer buffer2 = BufferPool.get(size, BufferType.OFF_HEAP); + ByteBuffer buffer2 = bufferPool.get(size, BufferType.OFF_HEAP); assertEquals(size, buffer.capacity()); - BufferPool.put(buffer2); + bufferPool.put(buffer2); // unsafeReset the free slots chunk.setFreeSlots(oldFreeSlots); - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test public void testZeroSizeRequest() { - ByteBuffer buffer = BufferPool.get(0, BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(0, BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(0, buffer.capacity()); - BufferPool.put(buffer); + bufferPool.put(buffer); } @Test(expected = IllegalArgumentException.class) public void testNegativeSizeRequest() { - BufferPool.get(-1, BufferType.OFF_HEAP); + bufferPool.get(-1, BufferType.OFF_HEAP); } + @Ignore // FIXME remove it @Test public void testBufferPoolDisabled() { - ByteBuffer buffer = BufferPool.get(1024, BufferType.ON_HEAP); - assertEquals(0, BufferPool.unsafeNumChunks()); + ByteBuffer buffer = bufferPool.get(1024, BufferType.ON_HEAP); + assertEquals(0, bufferPool.unsafeNumChunks()); assertNotNull(buffer); assertEquals(1024, buffer.capacity()); assertFalse(buffer.isDirect()); assertNotNull(buffer.array()); - BufferPool.put(buffer); - assertEquals(0, BufferPool.unsafeNumChunks()); + bufferPool.put(buffer); + assertEquals(0, bufferPool.unsafeNumChunks()); - buffer = BufferPool.get(1024, BufferType.OFF_HEAP); - assertEquals(0, BufferPool.unsafeNumChunks()); + buffer = bufferPool.get(1024, BufferType.OFF_HEAP); + assertEquals(0, bufferPool.unsafeNumChunks()); assertNotNull(buffer); assertEquals(1024, buffer.capacity()); assertTrue(buffer.isDirect()); - BufferPool.put(buffer); - assertEquals(0, BufferPool.unsafeNumChunks()); + bufferPool.put(buffer); + assertEquals(0, bufferPool.unsafeNumChunks()); } @Test @@ -701,7 +704,7 @@ public void run() for (int j = 0; j < threadSizes.length; j++) { - ByteBuffer buffer = BufferPool.get(threadSizes[j], BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(threadSizes[j], BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(threadSizes[j], buffer.capacity()); @@ -716,17 +719,17 @@ public void run() assertEquals(i, buffer.getInt()); if (returnImmediately) - BufferPool.put(buffer); + bufferPool.put(buffer); else toBeReturned.add(buffer); - assertTrue(BufferPool.sizeInBytes() > 0); + assertTrue(bufferPool.sizeInBytes() > 0); } Thread.sleep(rand.nextInt(3)); for (ByteBuffer buffer : toBeReturned) - BufferPool.put(buffer); + bufferPool.put(buffer); } catch (Exception ex) { @@ -770,13 +773,13 @@ private void doMultipleThreadsReleaseBuffers(final int threadCount, final int .. int sum = 0; for (int i = 0; i < sizes.length; i++) { - buffers[i] = BufferPool.get(sizes[i], BufferType.OFF_HEAP); + buffers[i] = bufferPool.get(sizes[i], BufferType.OFF_HEAP); assertNotNull(buffers[i]); assertEquals(sizes[i], buffers[i].capacity()); - sum += BufferPool.unsafeCurrentChunk().roundUp(buffers[i].capacity()); + sum += bufferPool.unsafeCurrentChunk().roundUp(buffers[i].capacity()); } - final BufferPool.Chunk chunk = BufferPool.unsafeCurrentChunk(); + final BufferPool.Chunk chunk = bufferPool.unsafeCurrentChunk(); assertNotNull(chunk); assertFalse(chunk.isFree()); @@ -798,8 +801,8 @@ public void run() { try { - assertNotSame(chunk, BufferPool.unsafeCurrentChunk()); - BufferPool.put(buffer); + assertNotSame(chunk, bufferPool.unsafeCurrentChunk()); + bufferPool.put(buffer); } catch (AssertionError ex) { //this is expected if we release a buffer more than once @@ -828,12 +831,123 @@ public void run() System.gc(); System.gc(); - assertTrue(BufferPool.unsafeCurrentChunk().isFree()); + assertTrue(bufferPool.unsafeCurrentChunk().isFree()); //make sure the main thread can still allocate buffers - ByteBuffer buffer = BufferPool.get(sizes[0], BufferType.OFF_HEAP); + ByteBuffer buffer = bufferPool.get(sizes[0], BufferType.OFF_HEAP); assertNotNull(buffer); assertEquals(sizes[0], buffer.capacity()); - BufferPool.put(buffer); + bufferPool.put(buffer); + } + + @Test + public void testRecyclePartialFreeChunk() + { + // normal chunk size is 128kb + int halfNormalChunk = 64 * 1024; // 64kb, half of normal chunk + List toRelease = new ArrayList<>(); + + // allocate three buffers on different chunks + ByteBuffer buffer0 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP); + BufferPool.Chunk chunk0 = BufferPool.Chunk.getParentChunk(buffer0); + assertFalse(chunk0.isFree()); + allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk + + ByteBuffer buffer1 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP); + BufferPool.Chunk chunk1 = BufferPool.Chunk.getParentChunk(buffer1); + assertFalse(chunk1.isFree()); + assertNotEquals(chunk0, chunk1); + allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk + + ByteBuffer buffer2 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP); + BufferPool.Chunk chunk2 = BufferPool.Chunk.getParentChunk(buffer2); + assertFalse(chunk2.isFree()); + assertNotEquals(chunk0, chunk2); + assertNotEquals(chunk1, chunk2); + allocate(1, halfNormalChunk, toRelease); // allocate remaining buffers in the chunk + + // now all 3 chunks in local pool is full, allocate one more buffer to evict chunk2 + ByteBuffer buffer4 = bufferPool.get(halfNormalChunk, BufferType.OFF_HEAP); + BufferPool.Chunk chunk3 = BufferPool.Chunk.getParentChunk(buffer4); + assertNotEquals(chunk0, chunk3); + assertNotEquals(chunk1, chunk3); + assertNotEquals(chunk2, chunk3); + + // verify chunk2 got evicted, it doesn't have a owner + assertNotNull(chunk0.owner()); + assertEquals(BufferPool.Chunk.Status.ONLINE, chunk0.status()); + //assertNotNull(chunk1.owner()); + //assertEquals(BufferPool.Chunk.Status.ONLINE, chunk1.status()); + assertNull(chunk2.owner()); + assertEquals(BufferPool.Chunk.Status.OFFLINE, chunk2.status()); + + // release half buffers for chunk0/1/2 + release(toRelease); + + // try to recirculate chunk2 and verify freed space + assertEquals(BufferPool.Chunk.Status.ONLINE, chunk2.status()); + assertEquals(halfNormalChunk, chunk2.free()); + ByteBuffer buffer = chunk2.get(halfNormalChunk, false, null); + assertEquals(halfNormalChunk, buffer.capacity()); + } + + @Test + public void testTinyPool() + { + int total = 0; + final int size = BufferPool.TINY_ALLOCATION_UNIT; + final int allocationPerChunk = 64; + + // occupy 3 tiny chunks + List buffers0 = new ArrayList<>(); + BufferPool.Chunk chunk0 = allocate(allocationPerChunk, size, buffers0); + List buffers1 = new ArrayList<>(); + BufferPool.Chunk chunk1 = allocate(allocationPerChunk, size, buffers1); + List buffers2 = new ArrayList<>(); + BufferPool.Chunk chunk2 = allocate(allocationPerChunk, size, buffers2); + total += 3 * BufferPool.TINY_CHUNK_SIZE; + assertEquals(total, bufferPool.unsafeGetBytesInUse()); + + // allocate another tiny chunk.. chunk2 should be evicted + List buffers3 = new ArrayList<>(); + BufferPool.Chunk chunk3 = allocate(allocationPerChunk, size, buffers3); + total += BufferPool.TINY_CHUNK_SIZE; + assertEquals(total, bufferPool.unsafeGetBytesInUse()); + + // verify chunk2 is full and evicted + assertEquals(0, chunk2.free()); + assertNull(chunk2.owner()); + + // release chunk2's buffer + for (int i = 0; i < buffers2.size(); i++) + { + bufferPool.put(buffers2.get(i)); + + if (i == buffers2.size() - 1) + { + // when tiny chunk is fully recycled, parent chunk free space is updated + total -= BufferPool.TINY_CHUNK_SIZE; + assertEquals(total, bufferPool.unsafeGetBytesInUse()); + } + else + { + // before tiny chunk is fully recycled, parent normal chunk is occupied + assertEquals(total, bufferPool.unsafeGetBytesInUse()); + } + } + } + + private BufferPool.Chunk allocate(int num, int bufferSize, List buffers) + { + for (int i = 0; i < num; i++) + buffers.add(bufferPool.get(bufferSize, BufferType.OFF_HEAP)); + + return BufferPool.Chunk.getParentChunk(buffers.get(buffers.size() - 1)); + } + + private void release(List toRelease) + { + for (ByteBuffer buffer : toRelease) + bufferPool.put(buffer); } }