From a871011e35ccc1fcee040ad13469510c50f4482c Mon Sep 17 00:00:00 2001 From: Ensar Basri Kahveci Date: Mon, 17 Sep 2018 14:52:01 +0300 Subject: [PATCH] More on RaftLock operation timeout fixes --- .../service/lock/client/RaftLockProxy.java | 4 +- .../client/RaftFencedLockClientBasicTest.java | 4 +- .../lock/client/RaftLockClientBasicTest.java | 4 +- .../atomiclong/proxy/RaftAtomicLongProxy.java | 22 ++-- .../atomicref/proxy/RaftAtomicRefProxy.java | 24 ++-- .../proxy/RaftCountDownLatchProxy.java | 16 +-- .../WaitKeyCancelledException.java} | 14 +- .../raft/service/lock/LockRegistry.java | 8 +- .../hazelcast/raft/service/lock/RaftLock.java | 93 +++++++------- .../raft/service/lock/RaftLockService.java | 12 +- .../proxy/AbstractRaftFencedLockProxy.java | 4 +- .../service/lock/proxy/RaftLockProxy.java | 24 ++-- .../service/lock/RaftLockAdvancedTest.java | 120 +++++++++++++++--- 13 files changed, 210 insertions(+), 139 deletions(-) rename hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/{lock/exception/LockRequestCancelledException.java => exception/WaitKeyCancelledException.java} (69%) diff --git a/hazelcast-raft-client/src/main/java/com/hazelcast/raft/service/lock/client/RaftLockProxy.java b/hazelcast-raft-client/src/main/java/com/hazelcast/raft/service/lock/client/RaftLockProxy.java index 5b5f4376b9af..068aad01dd3d 100644 --- a/hazelcast-raft-client/src/main/java/com/hazelcast/raft/service/lock/client/RaftLockProxy.java +++ b/hazelcast-raft-client/src/main/java/com/hazelcast/raft/service/lock/client/RaftLockProxy.java @@ -30,7 +30,7 @@ import com.hazelcast.raft.impl.RaftGroupIdImpl; import com.hazelcast.raft.impl.session.SessionExpiredException; import com.hazelcast.raft.service.lock.RaftLockService; -import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException; +import com.hazelcast.raft.service.exception.WaitKeyCancelledException; import com.hazelcast.raft.service.session.SessionAwareProxy; import com.hazelcast.raft.service.session.SessionManagerProvider; import com.hazelcast.spi.InternalCompletableFuture; @@ -133,7 +133,7 @@ public boolean tryLock(long time, TimeUnit unit) { releaseSession(sessionId); } return locked; - } catch (LockRequestCancelledException e) { + } catch (WaitKeyCancelledException e) { return false; } catch (SessionExpiredException e) { invalidateSession(sessionId); diff --git a/hazelcast-raft-client/src/test/java/com/hazelcast/raft/service/lock/client/RaftFencedLockClientBasicTest.java b/hazelcast-raft-client/src/test/java/com/hazelcast/raft/service/lock/client/RaftFencedLockClientBasicTest.java index 9c985dac2d5e..ea7e1cc87620 100644 --- a/hazelcast-raft-client/src/test/java/com/hazelcast/raft/service/lock/client/RaftFencedLockClientBasicTest.java +++ b/hazelcast-raft-client/src/test/java/com/hazelcast/raft/service/lock/client/RaftFencedLockClientBasicTest.java @@ -6,7 +6,7 @@ import com.hazelcast.raft.impl.session.SessionExpiredException; import com.hazelcast.raft.service.lock.FencedLock; import com.hazelcast.raft.service.lock.RaftFencedLockBasicTest; -import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException; +import com.hazelcast.raft.service.exception.WaitKeyCancelledException; import com.hazelcast.raft.service.session.AbstractSessionManager; import com.hazelcast.raft.service.session.SessionManagerProvider; import com.hazelcast.test.HazelcastSerialClassRunner; @@ -34,7 +34,7 @@ protected HazelcastInstance[] createInstances() { lockInstance = f.newHazelcastClient(); HazelcastClientInstanceImpl client = getClient(lockInstance); SessionExpiredException.register(client.getClientExceptionFactory()); - LockRequestCancelledException.register(client.getClientExceptionFactory()); + WaitKeyCancelledException.register(client.getClientExceptionFactory()); return instances; } diff --git a/hazelcast-raft-client/src/test/java/com/hazelcast/raft/service/lock/client/RaftLockClientBasicTest.java b/hazelcast-raft-client/src/test/java/com/hazelcast/raft/service/lock/client/RaftLockClientBasicTest.java index 60cd4988fdfa..8dfeebdfb810 100644 --- a/hazelcast-raft-client/src/test/java/com/hazelcast/raft/service/lock/client/RaftLockClientBasicTest.java +++ b/hazelcast-raft-client/src/test/java/com/hazelcast/raft/service/lock/client/RaftLockClientBasicTest.java @@ -6,7 +6,7 @@ import com.hazelcast.raft.RaftGroupId; import com.hazelcast.raft.impl.session.SessionExpiredException; import com.hazelcast.raft.service.lock.RaftLockBasicTest; -import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException; +import com.hazelcast.raft.service.exception.WaitKeyCancelledException; import com.hazelcast.raft.service.session.AbstractSessionManager; import com.hazelcast.raft.service.session.SessionManagerProvider; import com.hazelcast.test.HazelcastSerialClassRunner; @@ -36,7 +36,7 @@ protected HazelcastInstance[] createInstances() { TestHazelcastFactory f = (TestHazelcastFactory) factory; client = f.newHazelcastClient(); SessionExpiredException.register(getClient(client).getClientExceptionFactory()); - LockRequestCancelledException.register(getClient(client).getClientExceptionFactory()); + WaitKeyCancelledException.register(getClient(client).getClientExceptionFactory()); return instances; } diff --git a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/atomiclong/proxy/RaftAtomicLongProxy.java b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/atomiclong/proxy/RaftAtomicLongProxy.java index 5abee39bce91..a675aae68f2c 100644 --- a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/atomiclong/proxy/RaftAtomicLongProxy.java +++ b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/atomiclong/proxy/RaftAtomicLongProxy.java @@ -50,12 +50,12 @@ public class RaftAtomicLongProxy implements IAtomicLong { private final String name; private final RaftGroupId groupId; - private final RaftInvocationManager raftInvocationManager; + private final RaftInvocationManager invocationManager; public RaftAtomicLongProxy(RaftInvocationManager invocationManager, RaftGroupId groupId, String name) { this.name = name; this.groupId = groupId; - this.raftInvocationManager = invocationManager; + this.invocationManager = invocationManager; } @Override @@ -105,7 +105,7 @@ public void set(long newValue) { @Override public InternalCompletableFuture addAndGetAsync(final long delta) { - return raftInvocationManager.invoke(groupId, new AddAndGetOp(name, delta)); + return invocationManager.invoke(groupId, new AddAndGetOp(name, delta)); } @Override @@ -120,12 +120,12 @@ public InternalCompletableFuture decrementAndGetAsync() { @Override public InternalCompletableFuture compareAndSetAsync(final long expect, final long update) { - return raftInvocationManager.invoke(groupId, new CompareAndSetOp(name, expect, update)); + return invocationManager.invoke(groupId, new CompareAndSetOp(name, expect, update)); } @Override public InternalCompletableFuture getAndAddAsync(final long delta) { - return raftInvocationManager.invoke(groupId, new GetAndAddOp(name, delta)); + return invocationManager.invoke(groupId, new GetAndAddOp(name, delta)); } @Override @@ -140,7 +140,7 @@ public InternalCompletableFuture getAndIncrementAsync() { @Override public InternalCompletableFuture getAndSetAsync(final long newValue) { - return raftInvocationManager.invoke(groupId, new GetAndSetOp(name, newValue)); + return invocationManager.invoke(groupId, new GetAndSetOp(name, newValue)); } @Override @@ -169,7 +169,7 @@ private long doAlter(IFunction function, AlterResultType alterResult } private InternalCompletableFuture doAlterAsync(IFunction function, AlterResultType alterResultType) { - return raftInvocationManager.invoke(groupId, new AlterOp(name, function, alterResultType)); + return invocationManager.invoke(groupId, new AlterOp(name, function, alterResultType)); } @Override @@ -195,7 +195,7 @@ public InternalCompletableFuture getAndAlterAsync(IFunction fu @Override public InternalCompletableFuture applyAsync(final IFunction function) { - return raftInvocationManager.invoke(groupId, new ApplyOp(name, function)); + return invocationManager.invoke(groupId, new ApplyOp(name, function)); } public long localGet(QueryPolicy queryPolicy) { @@ -210,7 +210,7 @@ public long localGet(QueryPolicy queryPolicy) { public ICompletableFuture localGetAsync(final QueryPolicy queryPolicy) { final SimpleCompletableFuture resultFuture = new SimpleCompletableFuture(null, null); ICompletableFuture localFuture = - raftInvocationManager.queryOnLocal(groupId, new LocalGetOp(name), queryPolicy); + invocationManager.queryOnLocal(groupId, new LocalGetOp(name), queryPolicy); localFuture.andThen(new ExecutionCallback() { @Override @@ -220,7 +220,7 @@ public void onResponse(Long response) { @Override public void onFailure(Throwable t) { - ICompletableFuture future = raftInvocationManager.query(groupId, new LocalGetOp(name), queryPolicy); + ICompletableFuture future = invocationManager.query(groupId, new LocalGetOp(name), queryPolicy); future.andThen(new ExecutionCallback() { @Override public void onResponse(Long response) { @@ -255,7 +255,7 @@ public String getServiceName() { @Override public void destroy() { - raftInvocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join(); + invocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join(); } public RaftGroupId getGroupId() { diff --git a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/atomicref/proxy/RaftAtomicRefProxy.java b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/atomicref/proxy/RaftAtomicRefProxy.java index bde81315dc92..7bc6c8178919 100644 --- a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/atomicref/proxy/RaftAtomicRefProxy.java +++ b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/atomicref/proxy/RaftAtomicRefProxy.java @@ -43,14 +43,14 @@ public class RaftAtomicRefProxy implements IAtomicReference { private final String name; private final RaftGroupId groupId; - private final RaftInvocationManager raftInvocationManager; + private final RaftInvocationManager invocationManager; private final SerializationService serializationService; public RaftAtomicRefProxy(RaftInvocationManager invocationManager, SerializationService serializationService, RaftGroupId groupId, String name) { this.name = name; this.groupId = groupId; - this.raftInvocationManager = invocationManager; + this.invocationManager = invocationManager; this.serializationService = serializationService; } @@ -117,22 +117,22 @@ public R apply(IFunction function) { @Override public InternalCompletableFuture compareAndSetAsync(T expect, T update) { - return raftInvocationManager.invoke(groupId, new CompareAndSetOp(name, toData(expect), toData(update))); + return invocationManager.invoke(groupId, new CompareAndSetOp(name, toData(expect), toData(update))); } @Override public InternalCompletableFuture getAsync() { - return raftInvocationManager.invoke(groupId, new GetOp(name)); + return invocationManager.invoke(groupId, new GetOp(name)); } @Override public InternalCompletableFuture setAsync(T newValue) { - return raftInvocationManager.invoke(groupId, new SetOp(name, toData(newValue), false)); + return invocationManager.invoke(groupId, new SetOp(name, toData(newValue), false)); } @Override public InternalCompletableFuture getAndSetAsync(T newValue) { - return raftInvocationManager.invoke(groupId, new SetOp(name, toData(newValue), true)); + return invocationManager.invoke(groupId, new SetOp(name, toData(newValue), true)); } @Override @@ -147,31 +147,31 @@ public InternalCompletableFuture clearAsync() { @Override public InternalCompletableFuture containsAsync(T expected) { - return raftInvocationManager.invoke(groupId, new ContainsOp(name, toData(expected))); + return invocationManager.invoke(groupId, new ContainsOp(name, toData(expected))); } @Override public InternalCompletableFuture alterAsync(IFunction function) { checkTrue(function != null, "Function cannot be null"); - return raftInvocationManager.invoke(groupId, new ApplyOp(name, toData(function), NO_RETURN_VALUE, true)); + return invocationManager.invoke(groupId, new ApplyOp(name, toData(function), NO_RETURN_VALUE, true)); } @Override public InternalCompletableFuture alterAndGetAsync(IFunction function) { checkTrue(function != null, "Function cannot be null"); - return raftInvocationManager.invoke(groupId, new ApplyOp(name, toData(function), RETURN_NEW_VALUE, true)); + return invocationManager.invoke(groupId, new ApplyOp(name, toData(function), RETURN_NEW_VALUE, true)); } @Override public InternalCompletableFuture getAndAlterAsync(IFunction function) { checkTrue(function != null, "Function cannot be null"); - return raftInvocationManager.invoke(groupId, new ApplyOp(name, toData(function), RETURN_OLD_VALUE, true)); + return invocationManager.invoke(groupId, new ApplyOp(name, toData(function), RETURN_OLD_VALUE, true)); } @Override public InternalCompletableFuture applyAsync(IFunction function) { checkTrue(function != null, "Function cannot be null"); - return raftInvocationManager.invoke(groupId, new ApplyOp(name, toData(function), RETURN_NEW_VALUE, false)); + return invocationManager.invoke(groupId, new ApplyOp(name, toData(function), RETURN_NEW_VALUE, false)); } @Override @@ -191,7 +191,7 @@ public String getServiceName() { @Override public void destroy() { - raftInvocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join(); + invocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join(); } public RaftGroupId getGroupId() { diff --git a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/countdownlatch/proxy/RaftCountDownLatchProxy.java b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/countdownlatch/proxy/RaftCountDownLatchProxy.java index ebc5e956149c..ac82875491a5 100644 --- a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/countdownlatch/proxy/RaftCountDownLatchProxy.java +++ b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/countdownlatch/proxy/RaftCountDownLatchProxy.java @@ -39,10 +39,10 @@ public class RaftCountDownLatchProxy implements ICountDownLatch { private final RaftGroupId groupId; private final String name; - private final RaftInvocationManager raftInvocationManager; + private final RaftInvocationManager invocationManager; public RaftCountDownLatchProxy(RaftInvocationManager invocationManager, RaftGroupId groupId, String name) { - this.raftInvocationManager = invocationManager; + this.invocationManager = invocationManager; this.groupId = groupId; this.name = name; } @@ -52,23 +52,23 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(unit); long timeoutMillis = Math.max(0, unit.toMillis(timeout)); - return raftInvocationManager.invoke(groupId, new AwaitOp(name, timeoutMillis)).join(); + return invocationManager.invoke(groupId, new AwaitOp(name, timeoutMillis)).join(); } @Override public void countDown() { - int round = raftInvocationManager.invoke(groupId, new GetRoundOp(name)).join(); - raftInvocationManager.invoke(groupId, new CountDownOp(name, round, newUnsecureUUID())).join(); + int round = invocationManager.invoke(groupId, new GetRoundOp(name)).join(); + invocationManager.invoke(groupId, new CountDownOp(name, round, newUnsecureUUID())).join(); } @Override public int getCount() { - return raftInvocationManager.invoke(groupId, new GetRemainingCountOp(name)).join(); + return invocationManager.invoke(groupId, new GetRemainingCountOp(name)).join(); } @Override public boolean trySetCount(int count) { - return raftInvocationManager.invoke(groupId, new TrySetCountOp(name, count)).join(); + return invocationManager.invoke(groupId, new TrySetCountOp(name, count)).join(); } @Override @@ -92,7 +92,7 @@ public RaftGroupId getGroupId() { @Override public void destroy() { - raftInvocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join(); + invocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join(); } } diff --git a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/exception/LockRequestCancelledException.java b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/exception/WaitKeyCancelledException.java similarity index 69% rename from hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/exception/LockRequestCancelledException.java rename to hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/exception/WaitKeyCancelledException.java index 615cbdbf3121..5cd5ba05b79c 100644 --- a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/exception/LockRequestCancelledException.java +++ b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/exception/WaitKeyCancelledException.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.hazelcast.raft.service.lock.exception; +package com.hazelcast.raft.service.exception; import com.hazelcast.client.impl.protocol.ClientExceptionFactory; import com.hazelcast.core.HazelcastException; @@ -22,28 +22,28 @@ /** * TODO: Javadoc Pending... */ -public class LockRequestCancelledException extends HazelcastException { +public class WaitKeyCancelledException extends HazelcastException { // TODO [basri] fixit public static final int ERROR_CODE = 6768; - public LockRequestCancelledException() { + public WaitKeyCancelledException() { } - public LockRequestCancelledException(String message) { + public WaitKeyCancelledException(String message) { super(message); } - public LockRequestCancelledException(String message, Throwable cause) { + public WaitKeyCancelledException(String message, Throwable cause) { super(message, cause); } public static void register(ClientExceptionFactory factory) { - factory.register(ERROR_CODE, LockRequestCancelledException.class, new ClientExceptionFactory.ExceptionFactory() { + factory.register(ERROR_CODE, WaitKeyCancelledException.class, new ClientExceptionFactory.ExceptionFactory() { @Override public Throwable createException(String message, Throwable cause) { - return new LockRequestCancelledException(message, cause); + return new WaitKeyCancelledException(message, cause); } }); } diff --git a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/LockRegistry.java b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/LockRegistry.java index 86df89f38fe3..b5845efa396f 100644 --- a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/LockRegistry.java +++ b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/LockRegistry.java @@ -46,7 +46,7 @@ protected RaftLock createNewResource(RaftGroupId groupId, String name) { AcquireResult acquire(String name, LockEndpoint endpoint, long commitIndex, UUID invocationUid) { AcquireResult result = getOrInitResource(name).acquire(endpoint, commitIndex, invocationUid, true); - for (LockInvocationKey waitKey : result.notifications) { + for (LockInvocationKey waitKey : result.cancelled) { removeWaitKey(waitKey); } @@ -58,7 +58,7 @@ AcquireResult tryAcquire(String name, LockEndpoint endpoint, long commitIndex, U AcquireResult result = getOrInitResource(name).acquire(endpoint, commitIndex, invocationUid, wait); long fence = result.fence; - for (LockInvocationKey waitKey : result.notifications) { + for (LockInvocationKey waitKey : result.cancelled) { removeWaitKey(waitKey); } @@ -72,7 +72,7 @@ AcquireResult tryAcquire(String name, LockEndpoint endpoint, long commitIndex, U ReleaseResult release(String name, LockEndpoint endpoint, UUID invocationUid) { RaftLock lock = getResourceOrNull(name); if (lock == null) { - return ReleaseResult.NOT_RELEASED; + return ReleaseResult.FAILED; } ReleaseResult result = lock.release(endpoint, invocationUid); @@ -86,7 +86,7 @@ ReleaseResult release(String name, LockEndpoint endpoint, UUID invocationUid) { ReleaseResult forceRelease(String name, long expectedFence, UUID invocationUid) { RaftLock lock = getResourceOrNull(name); if (lock == null) { - return ReleaseResult.NOT_RELEASED; + return ReleaseResult.FAILED; } ReleaseResult result = lock.forceRelease(expectedFence, invocationUid); diff --git a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/RaftLock.java b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/RaftLock.java index 9db640fa35fc..d1d4c0c5a43e 100644 --- a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/RaftLock.java +++ b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/RaftLock.java @@ -59,7 +59,7 @@ public Collection getOwnerSessionIds() { AcquireResult acquire(LockEndpoint endpoint, long commitIndex, UUID invocationUid, boolean wait) { // if acquire() is being retried if (owner != null && owner.invocationUid().equals(invocationUid)) { - return AcquireResult.locked(owner.commitIndex()); + return AcquireResult.successful(owner.commitIndex()); } LockInvocationKey key = new LockInvocationKey(name, endpoint, commitIndex, invocationUid); @@ -69,26 +69,30 @@ AcquireResult acquire(LockEndpoint endpoint, long commitIndex, UUID invocationUi if (endpoint.equals(owner.endpoint())) { lockCount++; - return AcquireResult.locked(owner.commitIndex()); + return AcquireResult.successful(owner.commitIndex()); } - if (wait) { - List cancelledWaitKeys = new ArrayList(); - Iterator it = waitKeys.iterator(); - while (it.hasNext()) { - LockInvocationKey waitKey = it.next(); - if (waitKey.endpoint().equals(endpoint) && !waitKey.invocationUid().equals(invocationUid)) { - cancelledWaitKeys.add(waitKey); - it.remove(); - } - } + Collection cancelledWaitKeys = cancelWaitKeys(endpoint, invocationUid); + if (wait) { waitKeys.add(key); + } + + return AcquireResult.failed(cancelledWaitKeys); + } - return AcquireResult.waitKeysInvalidated(cancelledWaitKeys); + private Collection cancelWaitKeys(LockEndpoint endpoint, UUID invocationUid) { + List cancelled = new ArrayList(0); + Iterator it = waitKeys.iterator(); + while (it.hasNext()) { + LockInvocationKey waitKey = it.next(); + if (waitKey.endpoint().equals(endpoint) && !waitKey.invocationUid().equals(invocationUid)) { + cancelled.add(waitKey); + it.remove(); + } } - return AcquireResult.NO_ACQUIRE_NO_WAIT; + return cancelled; } ReleaseResult release(LockEndpoint endpoint, UUID invocationUuid) { @@ -98,7 +102,7 @@ ReleaseResult release(LockEndpoint endpoint, UUID invocationUuid) { private ReleaseResult release(LockEndpoint endpoint, int releaseCount, UUID invocationUid) { // if release() is being retried if (invocationUid.equals(releaseRefUid)) { - return ReleaseResult.RELEASED_NO_NOTIFICATION; + return ReleaseResult.SUCCESSFUL; } if (owner != null && endpoint.equals(owner.endpoint())) { @@ -106,7 +110,7 @@ private ReleaseResult release(LockEndpoint endpoint, int releaseCount, UUID invo lockCount -= Math.min(releaseCount, lockCount); if (lockCount > 0) { - return ReleaseResult.RELEASED_NO_NOTIFICATION; + return ReleaseResult.SUCCESSFUL; } LockInvocationKey newOwner = waitKeys.poll(); @@ -127,42 +131,32 @@ private ReleaseResult release(LockEndpoint endpoint, int releaseCount, UUID invo owner = newOwner; lockCount = 1; - return ReleaseResult.released(keys); + return ReleaseResult.successful(keys); } else { owner = null; } - return ReleaseResult.RELEASED_NO_NOTIFICATION; - } - - List keys = new ArrayList(); - Iterator iter = waitKeys.iterator(); - while (iter.hasNext()) { - LockInvocationKey key = iter.next(); - if (key.endpoint().equals(endpoint)) { - keys.add(key); - iter.remove(); - } + return ReleaseResult.SUCCESSFUL; } - return ReleaseResult.waitKeysInvalidated(keys); + return ReleaseResult.failed(cancelWaitKeys(endpoint, invocationUid)); } ReleaseResult forceRelease(long expectedFence, UUID invocationUid) { // if forceRelease() is being retried if (invocationUid.equals(releaseRefUid)) { - return ReleaseResult.RELEASED_NO_NOTIFICATION; + return ReleaseResult.SUCCESSFUL; } if (owner == null) { - return ReleaseResult.NOT_RELEASED; + return ReleaseResult.FAILED; } if (owner.commitIndex() == expectedFence) { return release(owner.endpoint(), lockCount, invocationUid); } - return ReleaseResult.NOT_RELEASED; + return ReleaseResult.FAILED; } int lockCount() { @@ -246,46 +240,45 @@ public String toString() { static class AcquireResult { - private static final AcquireResult NO_ACQUIRE_NO_WAIT - = new AcquireResult(INVALID_FENCE, Collections.emptyList()); - - final long fence; - final Collection notifications; - - private static AcquireResult locked(long fence) { + private static AcquireResult successful(long fence) { return new AcquireResult(fence, Collections.emptyList()); } - private static AcquireResult waitKeysInvalidated(Collection notifications) { - return new AcquireResult(INVALID_FENCE, notifications); + private static AcquireResult failed(Collection cancelled) { + return new AcquireResult(INVALID_FENCE, cancelled); } - private AcquireResult(long fence, Collection notifications) { + final long fence; + + final Collection cancelled; + + private AcquireResult(long fence, Collection cancelled) { this.fence = fence; - this.notifications = unmodifiableCollection(notifications); + this.cancelled = unmodifiableCollection(cancelled); } } static class ReleaseResult { - static final ReleaseResult NOT_RELEASED + static final ReleaseResult FAILED = new ReleaseResult(false, Collections.emptyList()); - private static final ReleaseResult RELEASED_NO_NOTIFICATION + private static final ReleaseResult SUCCESSFUL = new ReleaseResult(true, Collections.emptyList()); - final boolean success; - final Collection notifications; - - private static ReleaseResult released(Collection notifications) { + private static ReleaseResult successful(Collection notifications) { return new ReleaseResult(true, notifications); } - private static ReleaseResult waitKeysInvalidated(Collection notifications) { + private static ReleaseResult failed(Collection notifications) { return new ReleaseResult(false, notifications); } + final boolean success; + + final Collection notifications; + private ReleaseResult(boolean success, Collection notifications) { this.success = success; this.notifications = unmodifiableCollection(notifications); diff --git a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/RaftLockService.java b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/RaftLockService.java index 64d31e5a0142..faebf6ba357f 100644 --- a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/RaftLockService.java +++ b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/RaftLockService.java @@ -26,7 +26,7 @@ import com.hazelcast.raft.service.blocking.AbstractBlockingService; import com.hazelcast.raft.service.lock.RaftLock.AcquireResult; import com.hazelcast.raft.service.lock.RaftLock.ReleaseResult; -import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException; +import com.hazelcast.raft.service.exception.WaitKeyCancelledException; import com.hazelcast.raft.service.lock.proxy.RaftLockProxy; import com.hazelcast.raft.service.session.SessionManagerService; import com.hazelcast.spi.NodeEngine; @@ -54,7 +54,7 @@ protected void initImpl() { super.initImpl(); ClientExceptionFactory clientExceptionFactory = this.nodeEngine.getNode().clientEngine.getClientExceptionFactory(); - LockRequestCancelledException.register(clientExceptionFactory); + WaitKeyCancelledException.register(clientExceptionFactory); } @Override @@ -96,7 +96,7 @@ public long acquire(RaftGroupId groupId, String name, LockEndpoint endpoint, lon } if (!acquired) { - notifyCancelledWaitKeys(groupId, name, result.notifications); + notifyCancelledWaitKeys(groupId, name, result.cancelled); } return fence; @@ -116,7 +116,7 @@ public long tryAcquire(RaftGroupId groupId, String name, LockEndpoint endpoint, if (!acquired) { scheduleTimeout(groupId, new LockInvocationKey(name, endpoint, commitIndex, invocationUid), timeoutMs); - notifyCancelledWaitKeys(groupId, name, result.notifications); + notifyCancelledWaitKeys(groupId, name, result.cancelled); } return fence; @@ -175,9 +175,9 @@ private void notifyCancelledWaitKeys(RaftGroupId groupId, String name, Collectio return; } - logger.warning("Wait keys: " + waitKeys + " for Lock[" + name + "] in " + groupId + " are cancelled."); + logger.warning("Wait keys: " + waitKeys + " for Lock[" + name + "] in " + groupId + " are notifications."); - notifyWaitKeys(groupId, waitKeys, new LockRequestCancelledException()); + notifyWaitKeys(groupId, waitKeys, new WaitKeyCancelledException()); } public int getLockCount(RaftGroupId groupId, String name, LockEndpoint endpoint) { diff --git a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/proxy/AbstractRaftFencedLockProxy.java b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/proxy/AbstractRaftFencedLockProxy.java index 89a32ab9eea7..4239ed9c7765 100644 --- a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/proxy/AbstractRaftFencedLockProxy.java +++ b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/proxy/AbstractRaftFencedLockProxy.java @@ -20,7 +20,7 @@ import com.hazelcast.raft.impl.session.SessionExpiredException; import com.hazelcast.raft.service.lock.FencedLock; import com.hazelcast.raft.service.lock.RaftLockService; -import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException; +import com.hazelcast.raft.service.exception.WaitKeyCancelledException; import com.hazelcast.raft.service.session.AbstractSessionManager; import com.hazelcast.raft.service.session.SessionAwareProxy; import com.hazelcast.spi.InternalCompletableFuture; @@ -101,7 +101,7 @@ public final long tryLock(long time, TimeUnit unit) { releaseSession(sessionId); } return fence; - } catch (LockRequestCancelledException e) { + } catch (WaitKeyCancelledException e) { return INVALID_FENCE; } catch (SessionExpiredException e) { invalidateSession(sessionId); diff --git a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/proxy/RaftLockProxy.java b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/proxy/RaftLockProxy.java index 6f39a2153102..0ebb634c24bd 100644 --- a/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/proxy/RaftLockProxy.java +++ b/hazelcast-raft-dataservices/src/main/java/com/hazelcast/raft/service/lock/proxy/RaftLockProxy.java @@ -22,7 +22,7 @@ import com.hazelcast.raft.impl.RaftOp; import com.hazelcast.raft.impl.service.RaftInvocationManager; import com.hazelcast.raft.impl.session.SessionExpiredException; -import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException; +import com.hazelcast.raft.service.exception.WaitKeyCancelledException; import com.hazelcast.raft.service.lock.operation.ForceUnlockOp; import com.hazelcast.raft.service.lock.operation.GetLockCountOp; import com.hazelcast.raft.service.lock.operation.GetLockFenceOp; @@ -51,13 +51,13 @@ public class RaftLockProxy extends SessionAwareProxy implements ILock { private final String name; - private final RaftInvocationManager raftInvocationManager; + private final RaftInvocationManager invocationManager; public RaftLockProxy(RaftInvocationManager invocationManager, SessionManagerService sessionManager, RaftGroupId groupId, String name) { super(sessionManager, groupId); this.name = name; - this.raftInvocationManager = invocationManager; + this.invocationManager = invocationManager; } @Override @@ -67,7 +67,7 @@ public void lock() { for (;;) { long sessionId = acquireSession(); try { - raftInvocationManager.invoke(groupId, new LockOp(name, sessionId, threadId, invUid)).join(); + invocationManager.invoke(groupId, new LockOp(name, sessionId, threadId, invUid)).join(); break; } catch (SessionExpiredException e) { invalidateSession(sessionId); @@ -90,13 +90,13 @@ public boolean tryLock(long time, TimeUnit unit) { long sessionId = acquireSession(); RaftOp op = new TryLockOp(name, sessionId, threadId, invUid, timeoutMs); try { - InternalCompletableFuture f = raftInvocationManager.invoke(groupId, op); + InternalCompletableFuture f = invocationManager.invoke(groupId, op); boolean locked = (f.join() != INVALID_FENCE); if (!locked) { releaseSession(sessionId); } return locked; - } catch (LockRequestCancelledException e) { + } catch (WaitKeyCancelledException e) { return false; } catch (SessionExpiredException e) { invalidateSession(sessionId); @@ -111,7 +111,7 @@ public void unlock() { throw new IllegalMonitorStateException(); } try { - raftInvocationManager.invoke(groupId, new UnlockOp(name, sessionId, getThreadId(), newUnsecureUUID())).join(); + invocationManager.invoke(groupId, new UnlockOp(name, sessionId, getThreadId(), newUnsecureUUID())).join(); } catch (SessionExpiredException e) { invalidateSession(sessionId); throw new IllegalMonitorStateException("Current thread is not owner of the lock!"); @@ -132,8 +132,8 @@ public void lock(long leaseTime, TimeUnit timeUnit) { @Override public void forceUnlock() { - long fence = raftInvocationManager.invoke(groupId, new GetLockFenceOp(name, NO_SESSION_ID, 0)).join(); - raftInvocationManager.invoke(groupId, new ForceUnlockOp(name, fence, newUnsecureUUID())).join(); + long fence = invocationManager.invoke(groupId, new GetLockFenceOp(name, NO_SESSION_ID, 0)).join(); + invocationManager.invoke(groupId, new ForceUnlockOp(name, fence, newUnsecureUUID())).join(); } @Override @@ -158,12 +158,12 @@ public boolean isLockedByCurrentThread() { return false; } - return raftInvocationManager.invoke(groupId, new GetLockCountOp(name, sessionId, getThreadId())).join() > 0; + return invocationManager.invoke(groupId, new GetLockCountOp(name, sessionId, getThreadId())).join() > 0; } @Override public int getLockCount() { - return raftInvocationManager.invoke(groupId, new GetLockCountOp(name, NO_SESSION_ID, 0)).join(); + return invocationManager.invoke(groupId, new GetLockCountOp(name, NO_SESSION_ID, 0)).join(); } @Override @@ -198,6 +198,6 @@ public String getServiceName() { @Override public void destroy() { - raftInvocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join(); + invocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join(); } } diff --git a/hazelcast-raft-dataservices/src/test/java/com/hazelcast/raft/service/lock/RaftLockAdvancedTest.java b/hazelcast-raft-dataservices/src/test/java/com/hazelcast/raft/service/lock/RaftLockAdvancedTest.java index b4f2036efe32..54a80665bb5e 100644 --- a/hazelcast-raft-dataservices/src/test/java/com/hazelcast/raft/service/lock/RaftLockAdvancedTest.java +++ b/hazelcast-raft-dataservices/src/test/java/com/hazelcast/raft/service/lock/RaftLockAdvancedTest.java @@ -4,13 +4,14 @@ import com.hazelcast.raft.RaftGroupId; import com.hazelcast.raft.impl.service.HazelcastRaftTestSupport; import com.hazelcast.raft.impl.service.RaftInvocationManager; -import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException; +import com.hazelcast.raft.service.exception.WaitKeyCancelledException; import com.hazelcast.raft.service.lock.operation.LockOp; import com.hazelcast.raft.service.lock.operation.TryLockOp; import com.hazelcast.raft.service.lock.proxy.RaftLockProxy; import com.hazelcast.raft.service.session.AbstractSessionManager; import com.hazelcast.raft.service.session.SessionManagerService; import com.hazelcast.spi.InternalCompletableFuture; +import com.hazelcast.test.AssertTask; import com.hazelcast.test.HazelcastSerialClassRunner; import com.hazelcast.test.annotation.ParallelTest; import com.hazelcast.test.annotation.QuickTest; @@ -27,12 +28,14 @@ import static com.hazelcast.util.ThreadUtil.getThreadId; import static com.hazelcast.util.UuidUtil.newUnsecureUUID; import static java.util.concurrent.TimeUnit.MINUTES; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @RunWith(HazelcastSerialClassRunner.class) @Category({QuickTest.class, ParallelTest.class}) -public class RaftLockAdvancedTest extends HazelcastRaftTestSupport { +public class RaftLockAdvancedTest + extends HazelcastRaftTestSupport { private HazelcastInstance[] instances; private HazelcastInstance lockInstance; @@ -57,79 +60,154 @@ private AbstractSessionManager getSessionManager() { } @Test - public void testLock() { + public void testLockCancelsPendingLockRequests() { lockByOtherThread(lock); // there is a session id now - RaftGroupId groupId = lock.getGroupId(); + final RaftGroupId groupId = lock.getGroupId(); long sessionId = getSessionManager().getSession(groupId); RaftInvocationManager invocationManager = getRaftInvocationManager(lockInstance); UUID invUid1 = newUnsecureUUID(); UUID invUid2 = newUnsecureUUID(); - InternalCompletableFuture f1 = invocationManager.invoke(groupId, new LockOp(name, sessionId, getThreadId(), invUid1)); - InternalCompletableFuture f2 = invocationManager.invoke(groupId, new LockOp(name, sessionId, getThreadId(), invUid1)); + InternalCompletableFuture f1 = invocationManager + .invoke(groupId, new TryLockOp(name, sessionId, getThreadId(), invUid1, MINUTES.toMillis(5))); + InternalCompletableFuture f2 = invocationManager + .invoke(groupId, new TryLockOp(name, sessionId, getThreadId(), invUid1, MINUTES.toMillis(5))); + + assertTrueEventually(new AssertTask() { + @Override + public void run() { + RaftLockService service = getNodeEngineImpl(lockInstance).getService(RaftLockService.SERVICE_NAME); + LockRegistry registry = service.getRegistryOrNull(groupId); + assertNotNull(registry); + assertEquals(2, registry.getWaitTimeouts().size()); + } + }); invocationManager.invoke(groupId, new LockOp(name, sessionId, getThreadId(), invUid2)); try { f1.join(); fail(); - } catch (LockRequestCancelledException ignored) { + } catch (WaitKeyCancelledException ignored) { } try { f2.join(); fail(); - } catch (LockRequestCancelledException ignored) { + } catch (WaitKeyCancelledException ignored) { } } @Test - public void testTryLock() { + public void testTryLockWithTimeoutCancelsPendingLockRequests() { lockByOtherThread(lock); // there is a session id now - RaftGroupId groupId = lock.getGroupId(); + final RaftGroupId groupId = lock.getGroupId(); long sessionId = getSessionManager().getSession(groupId); RaftInvocationManager invocationManager = getRaftInvocationManager(lockInstance); UUID invUid1 = newUnsecureUUID(); UUID invUid2 = newUnsecureUUID(); - InternalCompletableFuture f1 = - invocationManager.invoke(groupId, new TryLockOp(name, sessionId, getThreadId(), invUid1, MINUTES.toMillis(5))); - InternalCompletableFuture f2 = - invocationManager.invoke(groupId, new TryLockOp(name, sessionId, getThreadId(), invUid1, MINUTES.toMillis(5))); + InternalCompletableFuture f1 = invocationManager + .invoke(groupId, new TryLockOp(name, sessionId, getThreadId(), invUid1, MINUTES.toMillis(5))); + InternalCompletableFuture f2 = invocationManager + .invoke(groupId, new TryLockOp(name, sessionId, getThreadId(), invUid1, MINUTES.toMillis(5))); - invocationManager.invoke(groupId, new LockOp(name, sessionId, getThreadId(), invUid2)); + assertTrueEventually(new AssertTask() { + @Override + public void run() { + RaftLockService service = getNodeEngineImpl(lockInstance).getService(RaftLockService.SERVICE_NAME); + LockRegistry registry = service.getRegistryOrNull(groupId); + assertNotNull(registry); + assertEquals(2, registry.getWaitTimeouts().size()); + } + }); + + invocationManager.invoke(groupId, new TryLockOp(name, sessionId, getThreadId(), invUid2, 30)); try { f1.join(); fail(); - } catch (LockRequestCancelledException ignored) { + } catch (WaitKeyCancelledException ignored) { } try { f2.join(); fail(); - } catch (LockRequestCancelledException ignored) { + } catch (WaitKeyCancelledException ignored) { } } @Test - public void testUnlock() { + public void testTryLockWithoutTimeoutCancelsPendingLockRequests() { lockByOtherThread(lock); // there is a session id now - RaftGroupId groupId = lock.getGroupId(); + final RaftGroupId groupId = lock.getGroupId(); long sessionId = getSessionManager().getSession(groupId); RaftInvocationManager invocationManager = getRaftInvocationManager(lockInstance); UUID invUid1 = newUnsecureUUID(); + UUID invUid2 = newUnsecureUUID(); + + InternalCompletableFuture f1 = invocationManager + .invoke(groupId, new TryLockOp(name, sessionId, getThreadId(), invUid1, MINUTES.toMillis(5))); + InternalCompletableFuture f2 = invocationManager + .invoke(groupId, new TryLockOp(name, sessionId, getThreadId(), invUid1, MINUTES.toMillis(5))); + + assertTrueEventually(new AssertTask() { + @Override + public void run() { + RaftLockService service = getNodeEngineImpl(lockInstance).getService(RaftLockService.SERVICE_NAME); + LockRegistry registry = service.getRegistryOrNull(groupId); + assertNotNull(registry); + assertEquals(2, registry.getWaitTimeouts().size()); + } + }); + + invocationManager.invoke(groupId, new TryLockOp(name, sessionId, getThreadId(), invUid2, 0)); + + try { + f1.join(); + fail(); + } catch (WaitKeyCancelledException ignored) { + } + + try { + f2.join(); + fail(); + } catch (WaitKeyCancelledException ignored) { + } + } + + @Test + public void testUnlockCancelsPendingLockRequests() { + lockByOtherThread(lock); + + // there is a session id now + + final RaftGroupId groupId = lock.getGroupId(); + long sessionId = getSessionManager().getSession(groupId); + RaftInvocationManager invocationManager = getRaftInvocationManager(lockInstance); + UUID invUid1 = newUnsecureUUID(); + + InternalCompletableFuture f1 = invocationManager + .invoke(groupId, new TryLockOp(name, sessionId, getThreadId(), invUid1, MINUTES.toMillis(5))); - InternalCompletableFuture f1 = invocationManager.invoke(groupId, new LockOp(name, sessionId, getThreadId(), invUid1)); + assertTrueEventually(new AssertTask() { + @Override + public void run() { + RaftLockService service = getNodeEngineImpl(lockInstance).getService(RaftLockService.SERVICE_NAME); + LockRegistry registry = service.getRegistryOrNull(groupId); + assertNotNull(registry); + assertEquals(1, registry.getWaitTimeouts().size()); + } + }); try { lock.unlock(); @@ -140,7 +218,7 @@ public void testUnlock() { try { f1.join(); fail(); - } catch (LockRequestCancelledException ignored) { + } catch (WaitKeyCancelledException ignored) { } }