Skip to content

Commit

Permalink
Raft lock operation timeout fixes
Browse files Browse the repository at this point in the history
Consider the following scenarios where client1 and client2 run on
different threads:

SCENARIO #1:
- STEP 1: client1.lock();
  Lock is acquired by client1.
- STEP 2: client2.lock();
  Wait key is added to the lock for client2.
  After some time, client2.lock() call fails with operation timeout
- STEP3: client2.unlock();
  Fails with IllegalMonitorStateException because client2 is not
  holder of the lock. However, its wait key is still present.
  So the lock will be assigned to client2 when client1 releases it.

SCENARIO #2:
- STEP 1: client1.lock();
  Lock is acquired by client1.
- STEP 2: client2.lock();
  Wait key is added to the lock for client2.
  After some time, client2.lock() call fails with operation timeout
- STEP 3: client2.lock();
  A new wait key is added to the lock for client2, because its second
  lock() call has a new invocation uuid.
- STEP 4: client1.unlock();
  The lock will be assigned to client2 for its first lock() call.
  Now the lock is held by client2 and its lock count is just 1.
  However, nothing will be done for its other wait key in the list.
- STEP 5: client2.unlock();
  Client2 releases the lock, but since it has another wait key
  in the list, it will get the lock again.

 We need two separate fixes to resolve these issues.

 First, unlock() call should cancel all pending wait keys
 of the lock endpoint, if the lock is not currently held by itself.

 Second, if a lock() call is received while there is another wait key
 by the same client, the first lock() call must be cancelled via
 deleting its wait key from the wait list of the lock.
  • Loading branch information
metanet authored and mdogan committed Feb 1, 2019
1 parent dee6b20 commit fdd134c
Show file tree
Hide file tree
Showing 16 changed files with 424 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.hazelcast.raft.impl.RaftGroupIdImpl;
import com.hazelcast.raft.impl.session.SessionExpiredException;
import com.hazelcast.raft.service.lock.RaftLockService;
import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException;
import com.hazelcast.raft.service.session.SessionAwareProxy;
import com.hazelcast.raft.service.session.SessionManagerProvider;
import com.hazelcast.spi.InternalCompletableFuture;
Expand Down Expand Up @@ -132,6 +133,8 @@ public boolean tryLock(long time, TimeUnit unit) {
releaseSession(sessionId);
}
return locked;
} catch (LockRequestCancelledException e) {
return false;
} catch (SessionExpiredException e) {
invalidateSession(sessionId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.hazelcast.raft.impl.session.SessionExpiredException;
import com.hazelcast.raft.service.lock.FencedLock;
import com.hazelcast.raft.service.lock.RaftFencedLockBasicTest;
import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException;
import com.hazelcast.raft.service.session.AbstractSessionManager;
import com.hazelcast.raft.service.session.SessionManagerProvider;
import com.hazelcast.test.HazelcastSerialClassRunner;
Expand Down Expand Up @@ -33,6 +34,7 @@ protected HazelcastInstance[] createInstances() {
lockInstance = f.newHazelcastClient();
HazelcastClientInstanceImpl client = getClient(lockInstance);
SessionExpiredException.register(client.getClientExceptionFactory());
LockRequestCancelledException.register(client.getClientExceptionFactory());
return instances;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.hazelcast.raft.RaftGroupId;
import com.hazelcast.raft.impl.session.SessionExpiredException;
import com.hazelcast.raft.service.lock.RaftLockBasicTest;
import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException;
import com.hazelcast.raft.service.session.AbstractSessionManager;
import com.hazelcast.raft.service.session.SessionManagerProvider;
import com.hazelcast.test.HazelcastSerialClassRunner;
Expand Down Expand Up @@ -35,6 +36,7 @@ protected HazelcastInstance[] createInstances() {
TestHazelcastFactory f = (TestHazelcastFactory) factory;
client = f.newHazelcastClient();
SessionExpiredException.register(getClient(client).getClientExceptionFactory());
LockRequestCancelledException.register(getClient(client).getClientExceptionFactory());
return instances;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.hazelcast.core.ISemaphore;
import com.hazelcast.raft.RaftGroupId;
import com.hazelcast.raft.impl.session.SessionExpiredException;
import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException;
import com.hazelcast.raft.service.semaphore.RaftSessionAwareSemaphoreBasicTest;
import com.hazelcast.raft.service.session.AbstractSessionManager;
import com.hazelcast.raft.service.session.SessionManagerProvider;
Expand Down Expand Up @@ -33,6 +34,7 @@ protected HazelcastInstance[] createInstances() {
TestHazelcastFactory f = (TestHazelcastFactory) factory;
semaphoreInstance = f.newHazelcastClient();
SessionExpiredException.register(getClient(semaphoreInstance).getClientExceptionFactory());
LockRequestCancelledException.register(getClient(semaphoreInstance).getClientExceptionFactory());
return instances;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.hazelcast.core.ISemaphore;
import com.hazelcast.raft.RaftGroupId;
import com.hazelcast.raft.impl.session.SessionExpiredException;
import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException;
import com.hazelcast.raft.service.semaphore.RaftSessionlessSemaphoreBasicTest;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.TestHazelcastInstanceFactory;
Expand Down Expand Up @@ -33,6 +34,7 @@ protected HazelcastInstance[] createInstances() {
TestHazelcastFactory f = (TestHazelcastFactory) factory;
client = f.newHazelcastClient();
SessionExpiredException.register(getClient(client).getClientExceptionFactory());
LockRequestCancelledException.register(getClient(client).getClientExceptionFactory());
return instances;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.hazelcast.spi.ManagedService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.util.Clock;

import java.util.ArrayList;
Expand Down Expand Up @@ -61,15 +62,15 @@ public abstract class AbstractBlockingService<W extends WaitKey, R extends Block
public static final long WAIT_TIMEOUT_TASK_UPPER_BOUND_MILLIS = 1500;
private static final long WAIT_TIMEOUT_TASK_PERIOD_MILLIS = 500;

protected final NodeEngine nodeEngine;
protected final NodeEngineImpl nodeEngine;
protected final ILogger logger;
protected volatile RaftService raftService;

private final ConcurrentMap<RaftGroupId, RR> registries = new ConcurrentHashMap<RaftGroupId, RR>();
private volatile SessionAccessor sessionAccessor;

protected AbstractBlockingService(NodeEngine nodeEngine) {
this.nodeEngine = nodeEngine;
this.nodeEngine = (NodeEngineImpl) nodeEngine;
this.logger = nodeEngine.getLogger(getClass());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.raft.RaftGroupId;
import com.hazelcast.raft.service.blocking.ResourceRegistry;
import com.hazelcast.raft.service.lock.RaftLock.AcquireResult;
import com.hazelcast.raft.service.lock.RaftLock.ReleaseResult;

import java.util.Collection;
import java.util.Collections;
import java.util.UUID;

import static com.hazelcast.raft.service.lock.RaftLockService.INVALID_FENCE;
Expand All @@ -43,46 +43,58 @@ protected RaftLock createNewResource(RaftGroupId groupId, String name) {
return new RaftLock(groupId, name);
}

long acquire(String name, LockEndpoint endpoint, long commitIndex, UUID invocationUid) {
return getOrInitResource(name).acquire(endpoint, commitIndex, invocationUid, true);
AcquireResult acquire(String name, LockEndpoint endpoint, long commitIndex, UUID invocationUid) {
AcquireResult result = getOrInitResource(name).acquire(endpoint, commitIndex, invocationUid, true);

for (LockInvocationKey waitKey : result.notifications) {
removeWaitKey(waitKey);
}

return result;
}

long tryAcquire(String name, LockEndpoint endpoint, long commitIndex, UUID invocationUid, long timeoutMs) {
AcquireResult tryAcquire(String name, LockEndpoint endpoint, long commitIndex, UUID invocationUid, long timeoutMs) {
boolean wait = (timeoutMs > 0);
long fence = getOrInitResource(name).acquire(endpoint, commitIndex, invocationUid, wait);
AcquireResult result = getOrInitResource(name).acquire(endpoint, commitIndex, invocationUid, wait);
long fence = result.fence;

for (LockInvocationKey waitKey : result.notifications) {
removeWaitKey(waitKey);
}

if (wait && fence == INVALID_FENCE) {
addWaitKey(new LockInvocationKey(name, endpoint, commitIndex, invocationUid), timeoutMs);
}

return fence;
return result;
}

Collection<LockInvocationKey> release(String name, LockEndpoint endpoint, UUID invocationUid) {
ReleaseResult release(String name, LockEndpoint endpoint, UUID invocationUid) {
RaftLock lock = getResourceOrNull(name);
if (lock == null) {
return Collections.emptyList();
return ReleaseResult.NOT_RELEASED;
}

Collection<LockInvocationKey> keys = lock.release(endpoint, invocationUid);
for (LockInvocationKey key : keys) {
ReleaseResult result = lock.release(endpoint, invocationUid);
for (LockInvocationKey key : result.notifications) {
removeWaitKey(key);
}

return keys;
return result;
}

Collection<LockInvocationKey> forceRelease(String name, long expectedFence, UUID invocationUid) {
ReleaseResult forceRelease(String name, long expectedFence, UUID invocationUid) {
RaftLock lock = getResourceOrNull(name);
if (lock == null) {
return Collections.emptyList();
return ReleaseResult.NOT_RELEASED;
}

Collection<LockInvocationKey> keys = lock.forceRelease(expectedFence, invocationUid);
for (LockInvocationKey key : keys) {
ReleaseResult result = lock.forceRelease(expectedFence, invocationUid);
for (LockInvocationKey key : result.notifications) {
removeWaitKey(key);
}

return keys;
return result;
}

int getLockCount(String name, LockEndpoint endpoint) {
Expand Down

0 comments on commit fdd134c

Please sign in to comment.