Skip to content

Commit

Permalink
More on RaftLock operation timeout fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
metanet authored and mdogan committed Feb 1, 2019
1 parent fdd134c commit a871011
Show file tree
Hide file tree
Showing 13 changed files with 210 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.hazelcast.raft.impl.RaftGroupIdImpl;
import com.hazelcast.raft.impl.session.SessionExpiredException;
import com.hazelcast.raft.service.lock.RaftLockService;
import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException;
import com.hazelcast.raft.service.exception.WaitKeyCancelledException;
import com.hazelcast.raft.service.session.SessionAwareProxy;
import com.hazelcast.raft.service.session.SessionManagerProvider;
import com.hazelcast.spi.InternalCompletableFuture;
Expand Down Expand Up @@ -133,7 +133,7 @@ public boolean tryLock(long time, TimeUnit unit) {
releaseSession(sessionId);
}
return locked;
} catch (LockRequestCancelledException e) {
} catch (WaitKeyCancelledException e) {
return false;
} catch (SessionExpiredException e) {
invalidateSession(sessionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.hazelcast.raft.impl.session.SessionExpiredException;
import com.hazelcast.raft.service.lock.FencedLock;
import com.hazelcast.raft.service.lock.RaftFencedLockBasicTest;
import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException;
import com.hazelcast.raft.service.exception.WaitKeyCancelledException;
import com.hazelcast.raft.service.session.AbstractSessionManager;
import com.hazelcast.raft.service.session.SessionManagerProvider;
import com.hazelcast.test.HazelcastSerialClassRunner;
Expand Down Expand Up @@ -34,7 +34,7 @@ protected HazelcastInstance[] createInstances() {
lockInstance = f.newHazelcastClient();
HazelcastClientInstanceImpl client = getClient(lockInstance);
SessionExpiredException.register(client.getClientExceptionFactory());
LockRequestCancelledException.register(client.getClientExceptionFactory());
WaitKeyCancelledException.register(client.getClientExceptionFactory());
return instances;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.hazelcast.raft.RaftGroupId;
import com.hazelcast.raft.impl.session.SessionExpiredException;
import com.hazelcast.raft.service.lock.RaftLockBasicTest;
import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException;
import com.hazelcast.raft.service.exception.WaitKeyCancelledException;
import com.hazelcast.raft.service.session.AbstractSessionManager;
import com.hazelcast.raft.service.session.SessionManagerProvider;
import com.hazelcast.test.HazelcastSerialClassRunner;
Expand Down Expand Up @@ -36,7 +36,7 @@ protected HazelcastInstance[] createInstances() {
TestHazelcastFactory f = (TestHazelcastFactory) factory;
client = f.newHazelcastClient();
SessionExpiredException.register(getClient(client).getClientExceptionFactory());
LockRequestCancelledException.register(getClient(client).getClientExceptionFactory());
WaitKeyCancelledException.register(getClient(client).getClientExceptionFactory());
return instances;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ public class RaftAtomicLongProxy implements IAtomicLong {

private final String name;
private final RaftGroupId groupId;
private final RaftInvocationManager raftInvocationManager;
private final RaftInvocationManager invocationManager;

public RaftAtomicLongProxy(RaftInvocationManager invocationManager, RaftGroupId groupId, String name) {
this.name = name;
this.groupId = groupId;
this.raftInvocationManager = invocationManager;
this.invocationManager = invocationManager;
}

@Override
Expand Down Expand Up @@ -105,7 +105,7 @@ public void set(long newValue) {

@Override
public InternalCompletableFuture<Long> addAndGetAsync(final long delta) {
return raftInvocationManager.invoke(groupId, new AddAndGetOp(name, delta));
return invocationManager.invoke(groupId, new AddAndGetOp(name, delta));
}

@Override
Expand All @@ -120,12 +120,12 @@ public InternalCompletableFuture<Long> decrementAndGetAsync() {

@Override
public InternalCompletableFuture<Boolean> compareAndSetAsync(final long expect, final long update) {
return raftInvocationManager.invoke(groupId, new CompareAndSetOp(name, expect, update));
return invocationManager.invoke(groupId, new CompareAndSetOp(name, expect, update));
}

@Override
public InternalCompletableFuture<Long> getAndAddAsync(final long delta) {
return raftInvocationManager.invoke(groupId, new GetAndAddOp(name, delta));
return invocationManager.invoke(groupId, new GetAndAddOp(name, delta));
}

@Override
Expand All @@ -140,7 +140,7 @@ public InternalCompletableFuture<Long> getAndIncrementAsync() {

@Override
public InternalCompletableFuture<Long> getAndSetAsync(final long newValue) {
return raftInvocationManager.invoke(groupId, new GetAndSetOp(name, newValue));
return invocationManager.invoke(groupId, new GetAndSetOp(name, newValue));
}

@Override
Expand Down Expand Up @@ -169,7 +169,7 @@ private long doAlter(IFunction<Long, Long> function, AlterResultType alterResult
}

private InternalCompletableFuture<Long> doAlterAsync(IFunction<Long, Long> function, AlterResultType alterResultType) {
return raftInvocationManager.invoke(groupId, new AlterOp(name, function, alterResultType));
return invocationManager.invoke(groupId, new AlterOp(name, function, alterResultType));
}

@Override
Expand All @@ -195,7 +195,7 @@ public InternalCompletableFuture<Long> getAndAlterAsync(IFunction<Long, Long> fu

@Override
public <R> InternalCompletableFuture<R> applyAsync(final IFunction<Long, R> function) {
return raftInvocationManager.invoke(groupId, new ApplyOp<R>(name, function));
return invocationManager.invoke(groupId, new ApplyOp<R>(name, function));
}

public long localGet(QueryPolicy queryPolicy) {
Expand All @@ -210,7 +210,7 @@ public long localGet(QueryPolicy queryPolicy) {
public ICompletableFuture<Long> localGetAsync(final QueryPolicy queryPolicy) {
final SimpleCompletableFuture<Long> resultFuture = new SimpleCompletableFuture<Long>(null, null);
ICompletableFuture<Long> localFuture =
raftInvocationManager.queryOnLocal(groupId, new LocalGetOp(name), queryPolicy);
invocationManager.queryOnLocal(groupId, new LocalGetOp(name), queryPolicy);

localFuture.andThen(new ExecutionCallback<Long>() {
@Override
Expand All @@ -220,7 +220,7 @@ public void onResponse(Long response) {

@Override
public void onFailure(Throwable t) {
ICompletableFuture<Long> future = raftInvocationManager.query(groupId, new LocalGetOp(name), queryPolicy);
ICompletableFuture<Long> future = invocationManager.query(groupId, new LocalGetOp(name), queryPolicy);
future.andThen(new ExecutionCallback<Long>() {
@Override
public void onResponse(Long response) {
Expand Down Expand Up @@ -255,7 +255,7 @@ public String getServiceName() {

@Override
public void destroy() {
raftInvocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join();
invocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join();
}

public RaftGroupId getGroupId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ public class RaftAtomicRefProxy<T> implements IAtomicReference<T> {

private final String name;
private final RaftGroupId groupId;
private final RaftInvocationManager raftInvocationManager;
private final RaftInvocationManager invocationManager;
private final SerializationService serializationService;

public RaftAtomicRefProxy(RaftInvocationManager invocationManager, SerializationService serializationService,
RaftGroupId groupId, String name) {
this.name = name;
this.groupId = groupId;
this.raftInvocationManager = invocationManager;
this.invocationManager = invocationManager;
this.serializationService = serializationService;
}

Expand Down Expand Up @@ -117,22 +117,22 @@ public <R> R apply(IFunction<T, R> function) {

@Override
public InternalCompletableFuture<Boolean> compareAndSetAsync(T expect, T update) {
return raftInvocationManager.invoke(groupId, new CompareAndSetOp(name, toData(expect), toData(update)));
return invocationManager.invoke(groupId, new CompareAndSetOp(name, toData(expect), toData(update)));
}

@Override
public InternalCompletableFuture<T> getAsync() {
return raftInvocationManager.invoke(groupId, new GetOp(name));
return invocationManager.invoke(groupId, new GetOp(name));
}

@Override
public InternalCompletableFuture<Void> setAsync(T newValue) {
return raftInvocationManager.invoke(groupId, new SetOp(name, toData(newValue), false));
return invocationManager.invoke(groupId, new SetOp(name, toData(newValue), false));
}

@Override
public InternalCompletableFuture<T> getAndSetAsync(T newValue) {
return raftInvocationManager.invoke(groupId, new SetOp(name, toData(newValue), true));
return invocationManager.invoke(groupId, new SetOp(name, toData(newValue), true));
}

@Override
Expand All @@ -147,31 +147,31 @@ public InternalCompletableFuture<Void> clearAsync() {

@Override
public InternalCompletableFuture<Boolean> containsAsync(T expected) {
return raftInvocationManager.invoke(groupId, new ContainsOp(name, toData(expected)));
return invocationManager.invoke(groupId, new ContainsOp(name, toData(expected)));
}

@Override
public InternalCompletableFuture<Void> alterAsync(IFunction<T, T> function) {
checkTrue(function != null, "Function cannot be null");
return raftInvocationManager.invoke(groupId, new ApplyOp(name, toData(function), NO_RETURN_VALUE, true));
return invocationManager.invoke(groupId, new ApplyOp(name, toData(function), NO_RETURN_VALUE, true));
}

@Override
public InternalCompletableFuture<T> alterAndGetAsync(IFunction<T, T> function) {
checkTrue(function != null, "Function cannot be null");
return raftInvocationManager.invoke(groupId, new ApplyOp(name, toData(function), RETURN_NEW_VALUE, true));
return invocationManager.invoke(groupId, new ApplyOp(name, toData(function), RETURN_NEW_VALUE, true));
}

@Override
public InternalCompletableFuture<T> getAndAlterAsync(IFunction<T, T> function) {
checkTrue(function != null, "Function cannot be null");
return raftInvocationManager.invoke(groupId, new ApplyOp(name, toData(function), RETURN_OLD_VALUE, true));
return invocationManager.invoke(groupId, new ApplyOp(name, toData(function), RETURN_OLD_VALUE, true));
}

@Override
public <R> InternalCompletableFuture<R> applyAsync(IFunction<T, R> function) {
checkTrue(function != null, "Function cannot be null");
return raftInvocationManager.invoke(groupId, new ApplyOp(name, toData(function), RETURN_NEW_VALUE, false));
return invocationManager.invoke(groupId, new ApplyOp(name, toData(function), RETURN_NEW_VALUE, false));
}

@Override
Expand All @@ -191,7 +191,7 @@ public String getServiceName() {

@Override
public void destroy() {
raftInvocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join();
invocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join();
}

public RaftGroupId getGroupId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public class RaftCountDownLatchProxy implements ICountDownLatch {

private final RaftGroupId groupId;
private final String name;
private final RaftInvocationManager raftInvocationManager;
private final RaftInvocationManager invocationManager;

public RaftCountDownLatchProxy(RaftInvocationManager invocationManager, RaftGroupId groupId, String name) {
this.raftInvocationManager = invocationManager;
this.invocationManager = invocationManager;
this.groupId = groupId;
this.name = name;
}
Expand All @@ -52,23 +52,23 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(unit);

long timeoutMillis = Math.max(0, unit.toMillis(timeout));
return raftInvocationManager.<Boolean>invoke(groupId, new AwaitOp(name, timeoutMillis)).join();
return invocationManager.<Boolean>invoke(groupId, new AwaitOp(name, timeoutMillis)).join();
}

@Override
public void countDown() {
int round = raftInvocationManager.<Integer>invoke(groupId, new GetRoundOp(name)).join();
raftInvocationManager.invoke(groupId, new CountDownOp(name, round, newUnsecureUUID())).join();
int round = invocationManager.<Integer>invoke(groupId, new GetRoundOp(name)).join();
invocationManager.invoke(groupId, new CountDownOp(name, round, newUnsecureUUID())).join();
}

@Override
public int getCount() {
return raftInvocationManager.<Integer>invoke(groupId, new GetRemainingCountOp(name)).join();
return invocationManager.<Integer>invoke(groupId, new GetRemainingCountOp(name)).join();
}

@Override
public boolean trySetCount(int count) {
return raftInvocationManager.<Boolean>invoke(groupId, new TrySetCountOp(name, count)).join();
return invocationManager.<Boolean>invoke(groupId, new TrySetCountOp(name, count)).join();
}

@Override
Expand All @@ -92,7 +92,7 @@ public RaftGroupId getGroupId() {

@Override
public void destroy() {
raftInvocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join();
invocationManager.invoke(groupId, new DestroyRaftObjectOp(getServiceName(), name)).join();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,36 @@
* limitations under the License.
*/

package com.hazelcast.raft.service.lock.exception;
package com.hazelcast.raft.service.exception;

import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
import com.hazelcast.core.HazelcastException;

/**
* TODO: Javadoc Pending...
*/
public class LockRequestCancelledException extends HazelcastException {
public class WaitKeyCancelledException extends HazelcastException {

// TODO [basri] fixit
public static final int ERROR_CODE = 6768;


public LockRequestCancelledException() {
public WaitKeyCancelledException() {
}

public LockRequestCancelledException(String message) {
public WaitKeyCancelledException(String message) {
super(message);
}

public LockRequestCancelledException(String message, Throwable cause) {
public WaitKeyCancelledException(String message, Throwable cause) {
super(message, cause);
}

public static void register(ClientExceptionFactory factory) {
factory.register(ERROR_CODE, LockRequestCancelledException.class, new ClientExceptionFactory.ExceptionFactory() {
factory.register(ERROR_CODE, WaitKeyCancelledException.class, new ClientExceptionFactory.ExceptionFactory() {
@Override
public Throwable createException(String message, Throwable cause) {
return new LockRequestCancelledException(message, cause);
return new WaitKeyCancelledException(message, cause);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected RaftLock createNewResource(RaftGroupId groupId, String name) {
AcquireResult acquire(String name, LockEndpoint endpoint, long commitIndex, UUID invocationUid) {
AcquireResult result = getOrInitResource(name).acquire(endpoint, commitIndex, invocationUid, true);

for (LockInvocationKey waitKey : result.notifications) {
for (LockInvocationKey waitKey : result.cancelled) {
removeWaitKey(waitKey);
}

Expand All @@ -58,7 +58,7 @@ AcquireResult tryAcquire(String name, LockEndpoint endpoint, long commitIndex, U
AcquireResult result = getOrInitResource(name).acquire(endpoint, commitIndex, invocationUid, wait);
long fence = result.fence;

for (LockInvocationKey waitKey : result.notifications) {
for (LockInvocationKey waitKey : result.cancelled) {
removeWaitKey(waitKey);
}

Expand All @@ -72,7 +72,7 @@ AcquireResult tryAcquire(String name, LockEndpoint endpoint, long commitIndex, U
ReleaseResult release(String name, LockEndpoint endpoint, UUID invocationUid) {
RaftLock lock = getResourceOrNull(name);
if (lock == null) {
return ReleaseResult.NOT_RELEASED;
return ReleaseResult.FAILED;
}

ReleaseResult result = lock.release(endpoint, invocationUid);
Expand All @@ -86,7 +86,7 @@ ReleaseResult release(String name, LockEndpoint endpoint, UUID invocationUid) {
ReleaseResult forceRelease(String name, long expectedFence, UUID invocationUid) {
RaftLock lock = getResourceOrNull(name);
if (lock == null) {
return ReleaseResult.NOT_RELEASED;
return ReleaseResult.FAILED;
}

ReleaseResult result = lock.forceRelease(expectedFence, invocationUid);
Expand Down
Loading

0 comments on commit a871011

Please sign in to comment.