Skip to content
Permalink
Browse files

Improve RaftLock and RaftSemaphore idempotency

Results of lock/acquire and unlock/release operations are memorized
per <session, thread>. It is because we know that each <session, thread>
pair will have at most 1 active operation at a time and it will not send
a new operation before it is done with the current operation. Therefore,
we memorize result of the last operation for each <session id, thread>
pair and gc them on session close.
  • Loading branch information...
metanet authored and mdogan committed Sep 20, 2018
1 parent 962be76 commit 062cd77c6406524abff48a51d5f37e1e4189bdbd
@@ -49,7 +49,7 @@ protected BlockingResource(RaftGroupId groupId, String name) {
this.name = name;
}

public abstract Collection<Long> getOwnerSessionIds();
public abstract Collection<Long> getActiveSessions();

public final RaftGroupId getGroupId() {
return groupId;
@@ -151,7 +151,7 @@ protected final void removeWaitKey(W key) {
public final Collection<Long> getActiveSessions() {
Set<Long> sessions = new HashSet<Long>();
for (R res : resources.values()) {
sessions.addAll(res.getOwnerSessionIds());
sessions.addAll(res.getActiveSessions());
for (WaitKey key : res.getWaitKeys()) {
sessions.add(key.sessionId());
}
@@ -30,7 +30,7 @@
public class CountDownLatchRegistry extends ResourceRegistry<CountDownLatchInvocationKey, RaftCountDownLatch>
implements IdentifiedDataSerializable {

public CountDownLatchRegistry() {
CountDownLatchRegistry() {
}

CountDownLatchRegistry(RaftGroupId groupId) {
@@ -44,15 +44,15 @@
private int countDownFrom;
private final Set<UUID> countDownUids = new HashSet<UUID>();

public RaftCountDownLatch() {
RaftCountDownLatch() {
}

RaftCountDownLatch(RaftGroupId groupId, String name) {
super(groupId, name);
}

@Override
public Collection<Long> getOwnerSessionIds() {
public Collection<Long> getActiveSessions() {
return Collections.emptyList();
}

@@ -31,7 +31,7 @@
*/
class LockRegistry extends ResourceRegistry<LockInvocationKey, RaftLock> implements IdentifiedDataSerializable {

public LockRegistry() {
LockRegistry() {
}

LockRegistry(RaftGroupId groupId) {
@@ -28,8 +28,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static com.hazelcast.raft.service.lock.RaftLockService.INVALID_FENCE;
@@ -42,37 +44,41 @@

private LockInvocationKey owner;
private int lockCount;
private UUID releaseRefUid;
private Map<LockEndpoint, UUID> invocationRefUids = new HashMap<LockEndpoint, UUID>();

public RaftLock() {
RaftLock() {
}

RaftLock(RaftGroupId groupId, String name) {
super(groupId, name);
}

@Override
public Collection<Long> getOwnerSessionIds() {
public Collection<Long> getActiveSessions() {
return owner != null ? Collections.singleton(owner.sessionId()) : Collections.<Long>emptyList();
}

AcquireResult acquire(LockEndpoint endpoint, long commitIndex, UUID invocationUid, boolean wait) {
// if acquire() is being retried
if (owner != null && owner.invocationUid().equals(invocationUid)) {
if (invocationUid.equals(invocationRefUids.get(endpoint))
|| (owner != null && owner.invocationUid().equals(invocationUid))) {
return AcquireResult.successful(owner.commitIndex());
}

invocationRefUids.remove(endpoint);

LockInvocationKey key = new LockInvocationKey(name, endpoint, commitIndex, invocationUid);
if (owner == null) {
owner = key;
}

if (endpoint.equals(owner.endpoint())) {
invocationRefUids.put(endpoint, invocationUid);
lockCount++;
return AcquireResult.successful(owner.commitIndex());
}

Collection<LockInvocationKey> cancelledWaitKeys = cancelWaitKeys(endpoint, invocationUid);
Collection<LockInvocationKey> cancelledWaitKeys = cancelWaitKeys(endpoint);

if (wait) {
waitKeys.add(key);
@@ -81,12 +87,12 @@ AcquireResult acquire(LockEndpoint endpoint, long commitIndex, UUID invocationUi
return AcquireResult.failed(cancelledWaitKeys);
}

private Collection<LockInvocationKey> cancelWaitKeys(LockEndpoint endpoint, UUID invocationUid) {
private Collection<LockInvocationKey> cancelWaitKeys(LockEndpoint endpoint) {
List<LockInvocationKey> cancelled = new ArrayList<LockInvocationKey>(0);
Iterator<LockInvocationKey> it = waitKeys.iterator();
while (it.hasNext()) {
LockInvocationKey waitKey = it.next();
if (waitKey.endpoint().equals(endpoint) && !waitKey.invocationUid().equals(invocationUid)) {
if (waitKey.endpoint().equals(endpoint)) {
cancelled.add(waitKey);
it.remove();
}
@@ -101,12 +107,12 @@ ReleaseResult release(LockEndpoint endpoint, UUID invocationUuid) {

private ReleaseResult release(LockEndpoint endpoint, int releaseCount, UUID invocationUid) {
// if release() is being retried
if (invocationUid.equals(releaseRefUid)) {
if (invocationUid.equals(invocationRefUids.get(endpoint))) {
return ReleaseResult.SUCCESSFUL;
}

if (owner != null && endpoint.equals(owner.endpoint())) {
releaseRefUid = invocationUid;
invocationRefUids.put(endpoint, invocationUid);

lockCount -= Math.min(releaseCount, lockCount);
if (lockCount > 0) {
@@ -115,39 +121,21 @@ private ReleaseResult release(LockEndpoint endpoint, int releaseCount, UUID invo

LockInvocationKey newOwner = waitKeys.poll();
if (newOwner != null) {
List<LockInvocationKey> keys = new ArrayList<LockInvocationKey>();
keys.add(newOwner);

Iterator<LockInvocationKey> iter = waitKeys.iterator();
while (iter.hasNext()) {
LockInvocationKey key = iter.next();
if (newOwner.invocationUid().equals(key.invocationUid())) {
assert newOwner.endpoint().equals(key.endpoint());
keys.add(key);
iter.remove();
}
}

owner = newOwner;
lockCount = 1;

return ReleaseResult.successful(keys);
return ReleaseResult.successful(Collections.singleton(newOwner));
} else {
owner = null;
}

return ReleaseResult.SUCCESSFUL;
}

return ReleaseResult.failed(cancelWaitKeys(endpoint, invocationUid));
return ReleaseResult.failed(cancelWaitKeys(endpoint));
}

ReleaseResult forceRelease(long expectedFence, UUID invocationUid) {
// if forceRelease() is being retried
if (invocationUid.equals(releaseRefUid)) {
return ReleaseResult.SUCCESSFUL;
}

if (owner == null) {
return ReleaseResult.FAILED;
}
@@ -170,6 +158,13 @@ LockInvocationKey owner() {
@Override
protected void onInvalidateSession(long sessionId, Long2ObjectHashMap<Object> responses) {
if (owner != null && sessionId == owner.endpoint().sessionId()) {
Iterator<LockEndpoint> it = invocationRefUids.keySet().iterator();
while (it.hasNext()) {
if (it.next().sessionId() == sessionId) {
it.remove();
}
}

ReleaseResult result = release(owner.endpoint(), Integer.MAX_VALUE, UuidUtil.newUnsecureUUID());

if (!result.success) {
@@ -207,11 +202,12 @@ public void writeData(ObjectDataOutput out)
out.writeObject(owner);
}
out.writeInt(lockCount);
boolean hasRefUid = (releaseRefUid != null);
out.writeBoolean(hasRefUid);
if (hasRefUid) {
out.writeLong(releaseRefUid.getLeastSignificantBits());
out.writeLong(releaseRefUid.getMostSignificantBits());
out.writeInt(invocationRefUids.size());
for (Map.Entry<LockEndpoint, UUID> e : invocationRefUids.entrySet()) {
out.writeObject(e.getKey());
UUID releaseUid = e.getValue();
out.writeLong(releaseUid.getLeastSignificantBits());
out.writeLong(releaseUid.getMostSignificantBits());
}
}

@@ -224,18 +220,19 @@ public void readData(ObjectDataInput in)
owner = in.readObject();
}
lockCount = in.readInt();
boolean hasRefUid = in.readBoolean();
if (hasRefUid) {
int releaseRefUidCount = in.readInt();
for (int i = 0; i < releaseRefUidCount; i++) {
LockEndpoint endpoint = in.readObject();
long least = in.readLong();
long most = in.readLong();
releaseRefUid = new UUID(most, least);
invocationRefUids.put(endpoint, new UUID(most, least));
}
}

@Override
public String toString() {
return "RaftLock{" + "groupId=" + groupId + ", name='" + name + '\'' + ", owner=" + owner + ", lockCount=" + lockCount
+ ", releaseRefUid=" + releaseRefUid + ", waitKeys=" + waitKeys + '}';
+ ", invocationRefUids=" + invocationRefUids + ", waitKeys=" + waitKeys + '}';
}

static class AcquireResult {

0 comments on commit 062cd77

Please sign in to comment.
You can’t perform that action at this time.