Skip to content

Commit

Permalink
Implement configurable reentrancy for FencedLock
Browse files Browse the repository at this point in the history
  • Loading branch information
metanet authored and mdogan committed Feb 1, 2019
1 parent 4214776 commit b629fc9
Show file tree
Hide file tree
Showing 69 changed files with 2,214 additions and 1,062 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.hazelcast.client.spi.impl.ClientInvocationFuture;
import com.hazelcast.client.util.ClientDelegatingFuture;
import com.hazelcast.cp.CPGroupId;
import com.hazelcast.cp.FencedLock;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.cp.internal.RaftGroupId;
import com.hazelcast.cp.internal.datastructures.lock.RaftLockOwnershipState;
import com.hazelcast.cp.internal.datastructures.lock.RaftLockService;
Expand All @@ -40,7 +40,6 @@
import static com.hazelcast.client.impl.protocol.util.ParameterUtil.calculateDataSize;
import static com.hazelcast.cp.internal.RaftGroupId.dataSize;
import static com.hazelcast.cp.internal.datastructures.lock.client.LockMessageTaskFactoryProvider.DESTROY_TYPE;
import static com.hazelcast.cp.internal.datastructures.lock.client.LockMessageTaskFactoryProvider.FORCE_UNLOCK_TYPE;
import static com.hazelcast.cp.internal.datastructures.lock.client.LockMessageTaskFactoryProvider.LOCK_OWNERSHIP_STATE;
import static com.hazelcast.cp.internal.datastructures.lock.client.LockMessageTaskFactoryProvider.LOCK_TYPE;
import static com.hazelcast.cp.internal.datastructures.lock.client.LockMessageTaskFactoryProvider.TRY_LOCK_TYPE;
Expand All @@ -53,6 +52,7 @@ class RaftFencedLockProxy extends ClientProxy implements FencedLock {

private static final ClientMessageDecoder BOOLEAN_RESPONSE_DECODER = new BooleanResponseDecoder();
private static final ClientMessageDecoder LOCK_OWNERSHIP_STATE_RESPONSE_DECODER = new RaftLockOwnershipStateResponseDecoder();
private static final ClientMessageDecoder LONG_RESPONSE_DECODER = new LongResponseDecoder();


private final FencedLockImpl lock;
Expand Down Expand Up @@ -102,11 +102,6 @@ public void unlock() {
lock.unlock();
}

@Override
public void forceUnlock() {
lock.forceUnlock();
}

@Override
public long getFence() {
return lock.getFence();
Expand All @@ -123,8 +118,8 @@ public boolean isLockedByCurrentThread() {
}

@Override
public int getLockCountIfLockedByCurrentThread() {
return lock.getLockCountIfLockedByCurrentThread();
public int getLockCount() {
return lock.getLockCount();
}

@Override
Expand Down Expand Up @@ -166,17 +161,6 @@ private static ClientMessage encodeRequest(int messageTypeId, CPGroupId groupId,
return msg;
}

private static ClientMessage encodeRequest(int messageTypeId, CPGroupId groupId, String name, long sessionId,
long threadId, UUID invUid, int val) {
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 4
+ Bits.INT_SIZE_IN_BYTES;
ClientMessage msg = prepareClientMessage(groupId, name, dataSize, messageTypeId);
setRequestParams(msg, sessionId, threadId, invUid);
msg.set(val);
msg.updateFrameLength();
return msg;
}

private static ClientMessage encodeRequest(int messageTypeId, CPGroupId groupId, String name, long sessionId,
long threadId, UUID invUid, long val) {
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 5;
Expand Down Expand Up @@ -231,35 +215,35 @@ public RaftLockOwnershipState decodeClientMessage(ClientMessage msg) {
}
}

private static class LongResponseDecoder implements ClientMessageDecoder {
@Override
public Long decodeClientMessage(ClientMessage msg) {
return msg.getLong();
}
}

private class FencedLockImpl extends AbstractRaftFencedLockProxy {
FencedLockImpl(AbstractProxySessionManager sessionManager, CPGroupId groupId, String proxyName, String objectName) {
super(sessionManager, groupId, proxyName, objectName);
}

@Override
protected InternalCompletableFuture<RaftLockOwnershipState> doLock(long sessionId, long threadId, UUID invocationUid) {
protected InternalCompletableFuture<Long> doLock(long sessionId, long threadId, UUID invocationUid) {
ClientMessage msg = encodeRequest(LOCK_TYPE, groupId, objectName, sessionId, threadId, invocationUid);
return invoke(objectName, msg, LOCK_OWNERSHIP_STATE_RESPONSE_DECODER);
return invoke(objectName, msg, LONG_RESPONSE_DECODER);
}

@Override
protected InternalCompletableFuture<RaftLockOwnershipState> doTryLock(long sessionId, long threadId, UUID invocationUid,
long timeoutMillis) {
protected InternalCompletableFuture<Long> doTryLock(long sessionId, long threadId, UUID invocationUid,
long timeoutMillis) {
ClientMessage msg = encodeRequest(TRY_LOCK_TYPE, groupId, objectName, sessionId, threadId, invocationUid,
timeoutMillis);
return invoke(objectName, msg, LOCK_OWNERSHIP_STATE_RESPONSE_DECODER);
}

@Override
protected InternalCompletableFuture<Object> doUnlock(long sessionId, long threadId, UUID invocationUid,
int releaseCount) {
ClientMessage msg = encodeRequest(UNLOCK_TYPE, groupId, objectName, sessionId, threadId, invocationUid, releaseCount);
return invoke(objectName, msg, BOOLEAN_RESPONSE_DECODER);
return invoke(objectName, msg, LONG_RESPONSE_DECODER);
}

@Override
protected InternalCompletableFuture<Object> doForceUnlock(UUID invocationUid, long expectedFence) {
ClientMessage msg = encodeRequest(FORCE_UNLOCK_TYPE, groupId, objectName, -1, -1, invocationUid, expectedFence);
protected InternalCompletableFuture<Boolean> doUnlock(long sessionId, long threadId, UUID invocationUid) {
ClientMessage msg = encodeRequest(UNLOCK_TYPE, groupId, objectName, sessionId, threadId, invocationUid);
return invoke(objectName, msg, BOOLEAN_RESPONSE_DECODER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@
import com.hazelcast.cp.CPGroupId;
import com.hazelcast.cp.internal.RaftGroupId;
import com.hazelcast.cp.internal.datastructures.spi.client.RaftGroupTaskFactoryProvider;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.spi.InternalCompletableFuture;

import static com.hazelcast.client.impl.protocol.util.ParameterUtil.calculateDataSize;
import static com.hazelcast.cp.internal.RaftService.getObjectNameForProxy;

/**
* Creates client-side proxies for
* Raft-based {@link com.hazelcast.cp.FencedLock}
* Raft-based {@link FencedLock}
*/
public class RaftFencedLockProxyFactory extends ClientProxyFactoryWithContext implements ClientProxyFactory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@

/**
* Contains client-side proxy impl of
* Raft-based {@link com.hazelcast.cp.FencedLock}
* Raft-based {@link com.hazelcast.cp.lock.FencedLock}
*/
package com.hazelcast.client.cp.internal.datastructures.lock;
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,36 @@
import com.hazelcast.client.util.ClientDelegatingFuture;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.cp.CPGroupId;
import com.hazelcast.cp.lock.exception.LockAcquireLimitExceededException;
import com.hazelcast.cp.lock.exception.LockOwnershipLostException;
import com.hazelcast.cp.internal.RaftGroupId;
import com.hazelcast.cp.internal.datastructures.exception.WaitKeyCancelledException;
import com.hazelcast.cp.internal.session.AbstractProxySessionManager;
import com.hazelcast.cp.internal.session.SessionExpiredException;
import com.hazelcast.cp.internal.session.SessionResponse;
import com.hazelcast.cp.internal.session.client.SessionMessageTaskFactoryProvider;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Bits;
import com.hazelcast.spi.InternalCompletableFuture;

import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.client.impl.protocol.ClientProtocolErrorCodes.LOCK_ACQUIRE_LIMIT_EXCEEDED_EXCEPTION;
import static com.hazelcast.client.impl.protocol.ClientProtocolErrorCodes.LOCK_OWNERSHIP_LOST_EXCEPTION;
import static com.hazelcast.client.impl.protocol.ClientProtocolErrorCodes.SESSION_EXPIRED_EXCEPTION;
import static com.hazelcast.client.impl.protocol.ClientProtocolErrorCodes.WAIT_KEY_CANCELLED_EXCEPTION;
import static com.hazelcast.cp.internal.datastructures.semaphore.client.SemaphoreMessageTaskFactoryProvider.GENERATE_THREAD_ID_TYPE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
* Client-side implementation of Raft proxy session manager
*/
public class ClientProxySessionManager extends AbstractProxySessionManager {

public static final long SHUTDOWN_TIMEOUT_SECONDS = 60;
public static final long SHUTDOWN_WAIT_SLEEP_MILLIS = 10;
private static final ClientMessageDecoder SESSION_RESPONSE_DECODER = new SessionResponseDecoder();
private static final ClientMessageDecoder BOOLEAN_RESPONSE_DECODER = new BooleanResponseDecoder();

Expand All @@ -67,6 +76,18 @@ public Throwable createException(String message, Throwable cause) {
return new WaitKeyCancelledException(message, cause);
}
});
factory.register(LOCK_ACQUIRE_LIMIT_EXCEEDED_EXCEPTION, LockAcquireLimitExceededException.class, new ExceptionFactory() {
@Override
public Throwable createException(String message, Throwable cause) {
return new LockAcquireLimitExceededException(message);
}
});
factory.register(LOCK_OWNERSHIP_LOST_EXCEPTION, LockOwnershipLostException.class, new ExceptionFactory() {
@Override
public Throwable createException(String message, Throwable cause) {
return new LockOwnershipLostException(message);
}
});
}

@Override
Expand Down Expand Up @@ -137,6 +158,49 @@ protected ICompletableFuture<Object> closeSession(CPGroupId groupId, Long sessio
return invoke(msg, BOOLEAN_RESPONSE_DECODER);
}

@Override
public Map<CPGroupId, ICompletableFuture<Object>> shutdown() {
Map<CPGroupId, ICompletableFuture<Object>> futures = super.shutdown();

ILogger logger = client.getLoggingService().getLogger(getClass());

long remainingTimeNanos = TimeUnit.SECONDS.toNanos(SHUTDOWN_TIMEOUT_SECONDS);

while (remainingTimeNanos > 0) {
int closed = 0;

for (Map.Entry<CPGroupId, ICompletableFuture<Object>> entry : futures.entrySet()) {
CPGroupId groupId = entry.getKey();
ICompletableFuture<Object> f = entry.getValue();
if (f.isDone()) {
closed++;
try {
f.get();
logger.fine("Session closed for " + groupId);
} catch (Exception e) {
logger.warning("Close session failed for " + groupId, e);

}
}
}

if (closed == futures.size()) {
break;
}

try {
Thread.sleep(SHUTDOWN_WAIT_SLEEP_MILLIS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return futures;
}

remainingTimeNanos -= MILLISECONDS.toNanos(SHUTDOWN_WAIT_SLEEP_MILLIS);
}

return futures;
}

private <T> InternalCompletableFuture<T> invoke(ClientMessage msg, ClientMessageDecoder decoder) {
ClientInvocationFuture future = new ClientInvocation(client, msg, "session").invoke();
return new ClientDelegatingFuture<T>(future, client.getSerializationService(), decoder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.hazelcast.core.ISemaphore;
import com.hazelcast.cp.CPSubsystem;
import com.hazelcast.cp.CPSubsystemManagementService;
import com.hazelcast.cp.FencedLock;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.cp.CPSessionManagementService;
import com.hazelcast.cp.internal.datastructures.atomiclong.RaftAtomicLongService;
import com.hazelcast.cp.internal.datastructures.atomicref.RaftAtomicRefService;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.client.cp.internal.datastructures.lock;

import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.internal.datastructures.lock.BoundedReentrantFencedLockTest;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.TestHazelcastInstanceFactory;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
public class BoundedReentrantFencedLockClientTest extends BoundedReentrantFencedLockTest {

@Override
protected TestHazelcastInstanceFactory createTestFactory() {
return new TestHazelcastFactory();
}

@Override
protected HazelcastInstance[] createInstances() {
HazelcastInstance[] instances = super.createInstances();
TestHazelcastFactory f = (TestHazelcastFactory) factory;
lockInstance = f.newHazelcastClient();
return instances;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.internal.datastructures.lock.RaftFencedLockBasicTest;
import com.hazelcast.cp.internal.datastructures.lock.FencedLockBasicTest;
import com.hazelcast.cp.internal.session.AbstractProxySessionManager;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.TestHazelcastInstanceFactory;
Expand All @@ -33,7 +33,7 @@

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
public class RaftFencedLockClientBasicTest extends RaftFencedLockBasicTest {
public class FencedLockClientBasicTest extends FencedLockBasicTest {

@Override
protected TestHazelcastInstanceFactory createTestFactory() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.client.cp.internal.datastructures.lock;

import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.internal.datastructures.lock.NonReentrantFencedLockTest;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.TestHazelcastInstanceFactory;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
public class NonReentrantFencedLockClientTest extends NonReentrantFencedLockTest {

@Override
protected TestHazelcastInstanceFactory createTestFactory() {
return new TestHazelcastFactory();
}

@Override
protected HazelcastInstance[] createInstances() {
HazelcastInstance[] instances = super.createInstances();
TestHazelcastFactory f = (TestHazelcastFactory) factory;
lockInstance = f.newHazelcastClient();
return instances;
}

}
Loading

0 comments on commit b629fc9

Please sign in to comment.