From baa8deac646b3456f61549348b9684c93ea941a0 Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Sat, 26 Oct 2024 19:43:45 +0300 Subject: [PATCH 1/8] synchronized blocks were replaced with ReentrantLock in SessionBase --- .../java/tech/ydb/topic/impl/SessionBase.java | 93 +++++++++++-------- 1 file changed, 56 insertions(+), 37 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java b/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java index 5c73d3277..da6d4cc4f 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java +++ b/topic/src/main/java/tech/ydb/topic/impl/SessionBase.java @@ -3,6 +3,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; @@ -19,6 +20,7 @@ public abstract class SessionBase implements Session { protected final GrpcReadWriteStream streamConnection; protected final AtomicBoolean isWorking = new AtomicBoolean(true); + private final ReentrantLock lock = new ReentrantLock(); private String token; public SessionBase(GrpcReadWriteStream streamConnection) { @@ -32,41 +34,53 @@ public SessionBase(GrpcReadWriteStream streamConnection) { protected abstract void onStop(); - protected synchronized CompletableFuture start(GrpcReadStream.Observer streamObserver) { - getLogger().info("Session start"); - return streamConnection.start(message -> { - if (getLogger().isTraceEnabled()) { - getLogger().trace("Message received:\n{}", message); - } else { - getLogger().debug("Message received"); - } + protected CompletableFuture start(GrpcReadStream.Observer streamObserver) { + lock.lock(); + + try { + getLogger().info("Session start"); + return streamConnection.start(message -> { + if (getLogger().isTraceEnabled()) { + getLogger().trace("Message received:\n{}", message); + } else { + getLogger().debug("Message received"); + } + + if (isWorking.get()) { + streamObserver.onNext(message); + } + }); + } finally { + lock.unlock(); + } + } + + public void send(W request) { + lock.lock(); - if (isWorking.get()) { - streamObserver.onNext(message); + try { + if (!isWorking.get()) { + if (getLogger().isTraceEnabled()) { + getLogger().trace("Session is already closed. This message is NOT sent:\n{}", request); + } + return; + } + String currentToken = streamConnection.authToken(); + if (!Objects.equals(token, currentToken)) { + token = currentToken; + getLogger().info("Sending new token"); + sendUpdateTokenRequest(token); } - }); - } - public synchronized void send(W request) { - if (!isWorking.get()) { if (getLogger().isTraceEnabled()) { - getLogger().trace("Session is already closed. This message is NOT sent:\n{}", request); + getLogger().trace("Sending request:\n{}", request); + } else { + getLogger().debug("Sending request"); } - return; + streamConnection.sendNext(request); + } finally { + lock.unlock(); } - String currentToken = streamConnection.authToken(); - if (!Objects.equals(token, currentToken)) { - token = currentToken; - getLogger().info("Sending new token"); - sendUpdateTokenRequest(token); - } - - if (getLogger().isTraceEnabled()) { - getLogger().trace("Sending request:\n{}", request); - } else { - getLogger().debug("Sending request"); - } - streamConnection.sendNext(request); } private boolean stop() { @@ -74,15 +88,20 @@ private boolean stop() { return isWorking.compareAndSet(true, false); } - @Override - public synchronized boolean shutdown() { - getLogger().info("Session shutdown"); - if (stop()) { - onStop(); - streamConnection.close(); - return true; + public boolean shutdown() { + lock.lock(); + + try { + getLogger().info("Session shutdown"); + if (stop()) { + onStop(); + streamConnection.close(); + return true; + } + return false; + } finally { + lock.unlock(); } - return false; } } From 192ed9e5799a34e879d16f3920d9d4f34445c501 Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Sat, 26 Oct 2024 19:54:11 +0300 Subject: [PATCH 2/8] synchronized blocks were replaced with ReentrantLock in SyncReaderImpl --- .../ydb/topic/read/impl/SyncReaderImpl.java | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java index 88456043f..6c3bd37cb 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java @@ -8,6 +8,8 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import javax.annotation.Nullable; @@ -34,6 +36,8 @@ public class SyncReaderImpl extends ReaderImpl implements SyncReader { private static final Logger logger = LoggerFactory.getLogger(SyncReaderImpl.class); private static final int POLL_INTERVAL_SECONDS = 5; private final Queue batchesInQueue = new LinkedList<>(); + private final ReentrantLock queueLock = new ReentrantLock(); + private final Condition queueIsNotEmptyCondition = queueLock.newCondition(); private int currentMessageIndex = 0; public SyncReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { @@ -66,22 +70,21 @@ public Message receiveInternal(ReceiveSettings receiveSettings, long timeout, Ti if (isStopped.get()) { throw new RuntimeException("Reader was stopped"); } - synchronized (batchesInQueue) { + + queueLock.lock(); + + try { if (batchesInQueue.isEmpty()) { long millisToWait = TimeUnit.MILLISECONDS.convert(timeout, unit); Instant deadline = Instant.now().plusMillis(millisToWait); - while (true) { - if (!batchesInQueue.isEmpty()) { - break; - } - Instant now = Instant.now(); - if (now.isAfter(deadline)) { + while (batchesInQueue.isEmpty()) { + millisToWait = Duration.between(Instant.now(), deadline).toMillis(); + if (millisToWait <= 0) { break; } - // Using Math.max to prevent rounding duration to 0 which would lead to infinite wait - millisToWait = Math.max(1, Duration.between(now, deadline).toMillis()); + logger.trace("No messages in queue. Waiting for {} ms...", millisToWait); - batchesInQueue.wait(millisToWait); + queueIsNotEmptyCondition.await(millisToWait, TimeUnit.MILLISECONDS); } if (batchesInQueue.isEmpty()) { @@ -112,6 +115,8 @@ public Message receiveInternal(ReceiveSettings receiveSettings, long timeout, Ti } } return result; + } finally { + queueLock.unlock(); } } @@ -143,10 +148,14 @@ protected CompletableFuture handleDataReceivedEvent(DataReceivedEvent even return resultFuture; } - synchronized (batchesInQueue) { + queueLock.lock(); + + try { logger.debug("Putting a message batch into queue and notifying in case receive method is waiting"); batchesInQueue.add(new MessageBatchWrapper(event.getMessages(), resultFuture)); - batchesInQueue.notify(); + queueIsNotEmptyCondition.signal(); + } finally { + queueLock.unlock(); } return resultFuture; } From 9bcdad67de1b33c7b5fd3cd7b42a87b95a50e83d Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Sat, 26 Oct 2024 20:00:50 +0300 Subject: [PATCH 3/8] synchronized blocks were replaced with ReentrantLock in TransactionMessageAccumulatorImpl --- .../impl/TransactionMessageAccumulatorImpl.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/TransactionMessageAccumulatorImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/TransactionMessageAccumulatorImpl.java index d8096a7ab..cddf919dc 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/TransactionMessageAccumulatorImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/TransactionMessageAccumulatorImpl.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -26,7 +27,7 @@ * @author Nikolay Perfilov */ public class TransactionMessageAccumulatorImpl implements TransactionMessageAccumulator { - private static final Logger logger = LoggerFactory.getLogger(DeferredCommitterImpl.class); + private static final Logger logger = LoggerFactory.getLogger(TransactionMessageAccumulatorImpl.class); private final AsyncReader reader; private final Map> rangesByTopic = new ConcurrentHashMap<>(); @@ -34,6 +35,7 @@ public class TransactionMessageAccumulatorImpl implements TransactionMessageAccu private static class PartitionRanges { private final PartitionSession partitionSession; private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet(); + private final ReentrantLock rangesLock = new ReentrantLock(); private PartitionRanges(PartitionSession partitionSession) { this.partitionSession = partitionSession; @@ -41,8 +43,12 @@ private PartitionRanges(PartitionSession partitionSession) { private void add(OffsetsRange offsetRange) { try { - synchronized (ranges) { + rangesLock.lock(); + + try { ranges.add(offsetRange); + } finally { + rangesLock.unlock(); } } catch (RuntimeException exception) { String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " + @@ -54,8 +60,12 @@ private void add(OffsetsRange offsetRange) { } private List getOffsetsRanges() { - synchronized (ranges) { + rangesLock.lock(); + + try { return ranges.getRangesAndClear(); + } finally { + rangesLock.unlock(); } } } From 83bb1413c1253218ec806da15d3cbefe7fe37656 Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Sat, 26 Oct 2024 20:02:31 +0300 Subject: [PATCH 4/8] synchronized blocks were replaced with ReentrantLock in DeferredCommitterImpl --- .../ydb/topic/read/impl/DeferredCommitterImpl.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java index 6abf30475..ee3dfa171 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java @@ -3,6 +3,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +25,7 @@ public class DeferredCommitterImpl implements DeferredCommitter { private static class PartitionRanges { private final PartitionSessionImpl partitionSession; private final DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet(); + private final ReentrantLock rangesLock = new ReentrantLock(); private PartitionRanges(PartitionSessionImpl partitionSession) { this.partitionSession = partitionSession; @@ -31,8 +33,12 @@ private PartitionRanges(PartitionSessionImpl partitionSession) { private void add(OffsetsRange offsetRange) { try { - synchronized (ranges) { + rangesLock.lock(); + + try { ranges.add(offsetRange); + } finally { + rangesLock.unlock(); } } catch (RuntimeException exception) { String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " + @@ -45,8 +51,11 @@ private void add(OffsetsRange offsetRange) { private void commit() { List rangesToCommit; - synchronized (ranges) { + rangesLock.lock(); + try { rangesToCommit = ranges.getRangesAndClear(); + } finally { + rangesLock.unlock(); } partitionSession.commitOffsetRanges(rangesToCommit); } From 0c8b5043c53cc67c6a7612eb39e4b60da1b275ef Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Sat, 26 Oct 2024 20:06:16 +0300 Subject: [PATCH 5/8] synchronized blocks were replaced with ReentrantLock in WriterImpl --- .../tech/ydb/topic/write/impl/WriterImpl.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 101488db9..ab3cd2c7e 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -12,6 +12,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +45,7 @@ public abstract class WriterImpl extends GrpcStreamRetrier { private final AtomicReference> initResultFutureRef = new AtomicReference<>(null); // Messages that are waiting for being put into sending queue due to queue overflow private final Queue incomingQueue = new LinkedList<>(); + private final ReentrantLock incomingQueueLock = new ReentrantLock(); // Messages that are currently encoding private final Queue encodingMessages = new LinkedList<>(); // Messages that are taken into send buffer, are already compressed and are waiting for being sent @@ -99,7 +101,9 @@ private IncomingMessage(EnqueuedMessage message) { } public CompletableFuture tryToEnqueue(EnqueuedMessage message, boolean instant) { - synchronized (incomingQueue) { + incomingQueueLock.lock(); + + try { if (currentInFlightCount >= settings.getMaxSendBufferMessagesCount()) { if (instant) { logger.info("[{}] Rejecting a message due to reaching message queue in-flight limit of {}", id, @@ -137,10 +141,12 @@ public CompletableFuture tryToEnqueue(EnqueuedMessage message, boolean ins IncomingMessage incomingMessage = new IncomingMessage(message); incomingQueue.add(incomingMessage); return incomingMessage.future; + } finally { + incomingQueueLock.unlock(); } } - // should be done under synchronized incomingQueue + // should be done under incomingQueueLock private void acceptMessageIntoSendingQueue(EnqueuedMessage message) { this.lastAcceptedMessageFuture = message.getFuture(); this.currentInFlightCount++; @@ -187,7 +193,9 @@ private void moveEncodedMessagesToSendingQueue() { boolean haveNewMessagesToSend = false; // Working with encodingMessages under synchronized incomingQueue to prevent deadlocks // while working with free method - synchronized (incomingQueue) { + incomingQueueLock.lock(); + + try { // Taking all encoded messages to sending queue while (true) { EnqueuedMessage encodedMessage = encodingMessages.peek(); @@ -217,6 +225,8 @@ private void moveEncodedMessagesToSendingQueue() { break; } } + } finally { + incomingQueueLock.unlock(); } if (haveNewMessagesToSend) { session.sendDataRequestIfNeeded(); @@ -264,15 +274,21 @@ protected CompletableFuture flushImpl() { if (this.lastAcceptedMessageFuture == null) { return CompletableFuture.completedFuture(null); } - synchronized (incomingQueue) { + incomingQueueLock.lock(); + + try { return this.lastAcceptedMessageFuture.isDone() ? CompletableFuture.completedFuture(null) : this.lastAcceptedMessageFuture.thenApply(v -> null); + } finally { + incomingQueueLock.unlock(); } } private void free(int messageCount, long sizeBytes) { - synchronized (incomingQueue) { + incomingQueueLock.lock(); + + try { currentInFlightCount -= messageCount; availableSizeBytes += sizeBytes; if (logger.isTraceEnabled()) { @@ -301,6 +317,8 @@ private void free(int messageCount, long sizeBytes) { } logger.trace("[{}] All messages from incomingQueue are accepted into send buffer", id); } + } finally { + incomingQueueLock.unlock(); } } From fd324c41822b70943e5aa204032293f64ca70521 Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Sat, 26 Oct 2024 20:12:58 +0300 Subject: [PATCH 6/8] synchronized blocks were replaced with ReentrantLock in PartitionSessionImpl --- .../topic/read/impl/PartitionSessionImpl.java | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java index 27548ff90..0dbf09d0e 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java @@ -12,6 +12,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -44,11 +45,13 @@ public class PartitionSessionImpl { private final AtomicBoolean isWorking = new AtomicBoolean(true); private final Queue decodingBatches = new LinkedList<>(); + private final ReentrantLock decodingBatchesLock = new ReentrantLock(); private final Queue readingQueue = new ConcurrentLinkedQueue<>(); private final Function> dataEventCallback; private final AtomicBoolean isReadingNow = new AtomicBoolean(); private final Consumer> commitFunction; private final NavigableMap> commitFutures = new ConcurrentSkipListMap<>(); + private final ReentrantLock commitFuturesLock = new ReentrantLock(); // Offset of the last read message + 1 private long lastReadOffset; private long lastCommittedOffset; @@ -149,14 +152,21 @@ public CompletableFuture addBatches(List decode(newBatch), decompressionExecutor) .thenRun(() -> { boolean haveNewBatchesReady = false; - synchronized (decodingBatches) { + decodingBatchesLock.lock(); + + try { // Taking all encoded messages to sending queue while (true) { Batch decodingBatch = decodingBatches.peek(); @@ -176,7 +186,10 @@ public CompletableFuture addBatches(List addBatches(List[0])); } - // Сommit single offset range with result future + // Commit single offset range with result future public CompletableFuture commitOffsetRange(OffsetsRange rangeToCommit) { CompletableFuture resultFuture = new CompletableFuture<>(); - synchronized (commitFutures) { + commitFuturesLock.lock(); + + try { if (isWorking.get()) { if (logger.isDebugEnabled()) { logger.debug("[{}] Offset range [{}, {}) is requested to be committed for partition session {} " + @@ -205,6 +220,8 @@ public CompletableFuture commitOffsetRange(OffsetsRange rangeToCommit) { partitionId + ") for " + path + " is already closed")); return resultFuture; } + } finally { + commitFuturesLock.unlock(); } List rangeWrapper = new ArrayList<>(1); rangeWrapper.add(rangeToCommit); @@ -334,16 +351,25 @@ private void sendDataToReadersIfNeeded() { } public void shutdown() { - synchronized (commitFutures) { + commitFuturesLock.lock(); + + try { isWorking.set(false); logger.info("[{}] Partition session {} (partition {}) is shutting down. Failing {} commit futures...", path, id, partitionId, commitFutures.size()); commitFutures.values().forEach(f -> f.completeExceptionally(new RuntimeException("Partition session " + id + " (partition " + partitionId + ") for " + path + " is closed"))); + } finally { + commitFuturesLock.unlock(); } - synchronized (decodingBatches) { + + decodingBatchesLock.lock(); + + try { decodingBatches.forEach(Batch::complete); readingQueue.forEach(Batch::complete); + } finally { + decodingBatchesLock.unlock(); } } From 2f69214234281366ee244f4b81a87963263d759a Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Sat, 26 Oct 2024 20:13:32 +0300 Subject: [PATCH 7/8] Comment fix --- topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index ab3cd2c7e..74eb6841b 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -191,8 +191,7 @@ private void encode(EnqueuedMessage message) { private void moveEncodedMessagesToSendingQueue() { boolean haveNewMessagesToSend = false; - // Working with encodingMessages under synchronized incomingQueue to prevent deadlocks - // while working with free method + // Working with encodingMessages under incomingQueueLock to prevent deadlocks while working with free method incomingQueueLock.lock(); try { From f88734598e1f60aea38bd1f54f507434ed8ac4dd Mon Sep 17 00:00:00 2001 From: Igor Melnichenko Date: Mon, 28 Oct 2024 13:37:16 +0300 Subject: [PATCH 8/8] Trailing spaces were removed --- .../main/java/tech/ydb/topic/write/impl/WriterImpl.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 74eb6841b..5c5e4852b 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -102,7 +102,7 @@ private IncomingMessage(EnqueuedMessage message) { public CompletableFuture tryToEnqueue(EnqueuedMessage message, boolean instant) { incomingQueueLock.lock(); - + try { if (currentInFlightCount >= settings.getMaxSendBufferMessagesCount()) { if (instant) { @@ -193,7 +193,7 @@ private void moveEncodedMessagesToSendingQueue() { boolean haveNewMessagesToSend = false; // Working with encodingMessages under incomingQueueLock to prevent deadlocks while working with free method incomingQueueLock.lock(); - + try { // Taking all encoded messages to sending queue while (true) { @@ -274,7 +274,7 @@ protected CompletableFuture flushImpl() { return CompletableFuture.completedFuture(null); } incomingQueueLock.lock(); - + try { return this.lastAcceptedMessageFuture.isDone() ? CompletableFuture.completedFuture(null) @@ -286,7 +286,7 @@ protected CompletableFuture flushImpl() { private void free(int messageCount, long sizeBytes) { incomingQueueLock.lock(); - + try { currentInFlightCount -= messageCount; availableSizeBytes += sizeBytes;