From 26b6523b98171defdc5b73a48091f27f86948ef3 Mon Sep 17 00:00:00 2001 From: Pedro Ruivo Date: Wed, 1 Feb 2017 17:57:00 +0000 Subject: [PATCH] ISPN-7430 Slowdown when using PutAll with transactions * Fix lock availabilty to perform better with multiple keys --- .../locks/impl/DefaultLockManager.java | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/infinispan/util/concurrent/locks/impl/DefaultLockManager.java b/core/src/main/java/org/infinispan/util/concurrent/locks/impl/DefaultLockManager.java index d5a48d652f46..e96cfdd46857 100644 --- a/core/src/main/java/org/infinispan/util/concurrent/locks/impl/DefaultLockManager.java +++ b/core/src/main/java/org/infinispan/util/concurrent/locks/impl/DefaultLockManager.java @@ -14,6 +14,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.infinispan.commons.util.Util; @@ -256,7 +257,7 @@ public Void call() throws Exception{ return null; } - public KeyAwareExtendedLockPromise scheduleLockTimeoutTask(ScheduledExecutorService executorService) { + KeyAwareExtendedLockPromise scheduleLockTimeoutTask(ScheduledExecutorService executorService) { if (executorService != null && timeoutMillis > 0 && !isAvailable()) { ScheduledFuture future = executorService.schedule(this, timeoutMillis, TimeUnit.MILLISECONDS); lockPromise.addListener((state -> future.cancel(false))); @@ -268,19 +269,21 @@ public KeyAwareExtendedLockPromise scheduleLockTimeoutTask(ScheduledExecutorServ private static class CompositeLockPromise implements KeyAwareLockPromise, LockListener, Callable { private final List lockPromiseList; - private final CompletableFuture notifier; + private final CompletableFuture notifier; volatile LockState lockState = LockState.ACQUIRED; + private final AtomicInteger countersLeft = new AtomicInteger(); private CompositeLockPromise(int size) { lockPromiseList = new ArrayList<>(size); notifier = new CompletableFuture<>(); } - public void addLock(KeyAwareExtendedLockPromise lockPromise) { + void addLock(KeyAwareExtendedLockPromise lockPromise) { lockPromiseList.add(lockPromise); } - public void markListAsFinal() { + void markListAsFinal() { + countersLeft.set(lockPromiseList.size()); for (LockPromise lockPromise : lockPromiseList) { lockPromise.addListener(this); } @@ -288,12 +291,7 @@ public void markListAsFinal() { @Override public boolean isAvailable() { - for (LockPromise lockPromise : lockPromiseList) { - if (!lockPromise.isAvailable()) { - return false; - } - } - return true; + return notifier.isDone(); } @Override @@ -329,18 +327,30 @@ public void lock() throws InterruptedException, TimeoutException { @Override public void addListener(LockListener listener) { - notifier.thenRun(() -> listener.onEvent(lockState)); + notifier.thenAccept(listener::onEvent); } @Override public void onEvent(LockState state) { - if (state != LockState.ACQUIRED && UPDATER.compareAndSet(this, LockState.ACQUIRED, state)) { - for (ExtendedLockPromise lockPromise : lockPromiseList) { - lockPromise.cancel(state); - } + if (notifier.isDone()) { + //already finished + return; } - if (isAvailable()) { - notifier.complete(null); + //each lock will invoke this + if (state != LockState.ACQUIRED) { + cancelAll(state); + return; + } + if (countersLeft.decrementAndGet() == 0) { + notifier.complete(lockState); + } + } + + private void cancelAll(LockState state) { + if (UPDATER.compareAndSet(this, LockState.ACQUIRED, state)) { + //complete the future before cancel other locks. the remaining locks will be invoke onEvent() + notifier.complete(state); + lockPromiseList.forEach(promise -> promise.cancel(state)); } } @@ -357,7 +367,7 @@ public Void call() throws Exception { return null; } - public CompositeLockPromise scheduleLockTimeoutTask(ScheduledExecutorService executorService, long time, TimeUnit unit) { + CompositeLockPromise scheduleLockTimeoutTask(ScheduledExecutorService executorService, long time, TimeUnit unit) { if (executorService != null && time > 0 && !isAvailable()) { ScheduledFuture future = executorService.schedule(this, time, unit); addListener((state -> future.cancel(false)));