Skip to content

Commit

Permalink
ISPN-7430 Slowdown when using PutAll with transactions
Browse files Browse the repository at this point in the history
* Fix lock availabilty to perform better with multiple keys
  • Loading branch information
pruivo committed Feb 2, 2017
1 parent 206f36b commit 26b6523
Showing 1 changed file with 28 additions and 18 deletions.
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.infinispan.commons.util.Util;
Expand Down Expand Up @@ -256,7 +257,7 @@ public Void call() throws Exception{
return null;
}

public KeyAwareExtendedLockPromise scheduleLockTimeoutTask(ScheduledExecutorService executorService) {
KeyAwareExtendedLockPromise scheduleLockTimeoutTask(ScheduledExecutorService executorService) {
if (executorService != null && timeoutMillis > 0 && !isAvailable()) {
ScheduledFuture<?> future = executorService.schedule(this, timeoutMillis, TimeUnit.MILLISECONDS);
lockPromise.addListener((state -> future.cancel(false)));
Expand All @@ -268,32 +269,29 @@ public KeyAwareExtendedLockPromise scheduleLockTimeoutTask(ScheduledExecutorServ
private static class CompositeLockPromise implements KeyAwareLockPromise, LockListener, Callable<Void> {

private final List<KeyAwareExtendedLockPromise> lockPromiseList;
private final CompletableFuture<Void> notifier;
private final CompletableFuture<LockState> notifier;
volatile LockState lockState = LockState.ACQUIRED;
private final AtomicInteger countersLeft = new AtomicInteger();

private CompositeLockPromise(int size) {
lockPromiseList = new ArrayList<>(size);
notifier = new CompletableFuture<>();
}

public void addLock(KeyAwareExtendedLockPromise lockPromise) {
void addLock(KeyAwareExtendedLockPromise lockPromise) {
lockPromiseList.add(lockPromise);
}

public void markListAsFinal() {
void markListAsFinal() {
countersLeft.set(lockPromiseList.size());
for (LockPromise lockPromise : lockPromiseList) {
lockPromise.addListener(this);
}
}

@Override
public boolean isAvailable() {
for (LockPromise lockPromise : lockPromiseList) {
if (!lockPromise.isAvailable()) {
return false;
}
}
return true;
return notifier.isDone();
}

@Override
Expand Down Expand Up @@ -329,18 +327,30 @@ public void lock() throws InterruptedException, TimeoutException {

@Override
public void addListener(LockListener listener) {
notifier.thenRun(() -> listener.onEvent(lockState));
notifier.thenAccept(listener::onEvent);
}

@Override
public void onEvent(LockState state) {
if (state != LockState.ACQUIRED && UPDATER.compareAndSet(this, LockState.ACQUIRED, state)) {
for (ExtendedLockPromise lockPromise : lockPromiseList) {
lockPromise.cancel(state);
}
if (notifier.isDone()) {
//already finished
return;
}
if (isAvailable()) {
notifier.complete(null);
//each lock will invoke this
if (state != LockState.ACQUIRED) {
cancelAll(state);
return;
}
if (countersLeft.decrementAndGet() == 0) {
notifier.complete(lockState);
}
}

private void cancelAll(LockState state) {
if (UPDATER.compareAndSet(this, LockState.ACQUIRED, state)) {
//complete the future before cancel other locks. the remaining locks will be invoke onEvent()
notifier.complete(state);
lockPromiseList.forEach(promise -> promise.cancel(state));
}
}

Expand All @@ -357,7 +367,7 @@ public Void call() throws Exception {
return null;
}

public CompositeLockPromise scheduleLockTimeoutTask(ScheduledExecutorService executorService, long time, TimeUnit unit) {
CompositeLockPromise scheduleLockTimeoutTask(ScheduledExecutorService executorService, long time, TimeUnit unit) {
if (executorService != null && time > 0 && !isAvailable()) {
ScheduledFuture<?> future = executorService.schedule(this, time, unit);
addListener((state -> future.cancel(false)));
Expand Down

0 comments on commit 26b6523

Please sign in to comment.