Skip to content

Commit

Permalink
Raft auto-clean up inactive sessions
Browse files Browse the repository at this point in the history
When a lock() call fails with operation timeout, we don't release the session count on the caller, even if forceUnlock() is unlocked. So it will continue to send heartbeats even if it has not acquired the lock or has no pending lock wait keys, etc. To close such inactive but heart-beating sessions, a clean-up task runs in the background. If a session sends heartbeats for some time but holds no locks / semaphores or has no wait keys, it is automatically closed.
  • Loading branch information
metanet authored and mdogan committed Feb 1, 2019
1 parent e70df3c commit bd2eb6d
Show file tree
Hide file tree
Showing 16 changed files with 435 additions and 65 deletions.
Expand Up @@ -146,6 +146,12 @@ public final void onSessionInvalidated(RaftGroupId groupId, long sessionId) {
}
}

@Override
public final Collection<Long> getActiveSessions(RaftGroupId groupId) {
RR registry = getRegistryOrNull(groupId);
return registry != null ? registry.getActiveSessions() : Collections.<Long>emptyList();
}

@Override
public final void onGroupDestroy(final RaftGroupId groupId) {
ResourceRegistry<W, R> registry = registries.get(groupId);
Expand Down Expand Up @@ -224,7 +230,7 @@ protected final void notifyWaitKeys(RaftGroupId groupId, Collection<W> keys, Obj
completeFutures(groupId, indices, result);
}

protected final void completeFutures(RaftGroupId groupId, Collection<Long> indices, Object result) {
private void completeFutures(RaftGroupId groupId, Collection<Long> indices, Object result) {
if (!indices.isEmpty()) {
RaftNodeImpl raftNode = (RaftNodeImpl) raftService.getRaftNode(groupId);
for (Long index : indices) {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.hazelcast.util.collection.Long2ObjectHashMap;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -48,6 +49,8 @@ protected BlockingResource(RaftGroupId groupId, String name) {
this.name = name;
}

public abstract Collection<Long> getOwnerSessionIds();

public final RaftGroupId getGroupId() {
return groupId;
}
Expand Down
Expand Up @@ -45,7 +45,7 @@
public abstract class ResourceRegistry<W extends WaitKey, R extends BlockingResource<W>> implements DataSerializable {

private RaftGroupId groupId;
private final Map<String, R> resources = new HashMap<String, R>();
private final Map<String, R> resources = new ConcurrentHashMap<String, R>();
private final Set<String> destroyedNames = new HashSet<String>();
// value.element1: timeout duration, value.element2: deadline timestamp (transient)
private final Map<W, Tuple2<Long, Long>> waitTimeouts = new ConcurrentHashMap<W, Tuple2<Long, Long>>();
Expand Down Expand Up @@ -148,6 +148,18 @@ public final Map<W, Tuple2<Long, Long>> getWaitTimeouts() {
return unmodifiableMap(waitTimeouts);
}

public final Collection<Long> getActiveSessions() {
Set<Long> sessions = new HashSet<Long>();
for (R res : resources.values()) {
sessions.addAll(res.getOwnerSessionIds());
for (WaitKey key : res.getWaitKeys()) {
sessions.add(key.sessionId());
}
}

return sessions;
}

public final Collection<Long> destroyResource(String name) {
destroyedNames.add(name);
BlockingResource<W> resource = resources.remove(name);
Expand Down
Expand Up @@ -51,6 +51,11 @@ public RaftCountDownLatch() {
super(groupId, name);
}

@Override
public Collection<Long> getOwnerSessionIds() {
return Collections.emptyList();
}

@Override
protected void onInvalidateSession(long sessionId, Long2ObjectHashMap<Object> result) {
}
Expand Down
Expand Up @@ -44,7 +44,7 @@ public interface FencedLock extends DistributedObject {

int getLockCount();

RaftGroupId getRaftGroupId();
RaftGroupId getGroupId();

String getName();
}
Expand Up @@ -50,6 +50,11 @@ public RaftLock() {
super(groupId, name);
}

@Override
public Collection<Long> getOwnerSessionIds() {
return owner != null ? Collections.singleton(owner.sessionId()) : Collections.<Long>emptyList();
}

long acquire(LockEndpoint endpoint, long commitIndex, UUID invocationUid, boolean wait) {
// if acquire() is being retried
if (owner != null && owner.invocationUid().equals(invocationUid)) {
Expand Down
Expand Up @@ -148,11 +148,6 @@ public final int getLockCount() {
return doGetLockCount(groupId, name, NO_SESSION_ID, 0).join();
}

@Override
public final RaftGroupId getRaftGroupId() {
return groupId;
}

@Override
public final String getName() {
return name;
Expand Down
Expand Up @@ -37,6 +37,7 @@

import static com.hazelcast.raft.service.session.AbstractSessionManager.NO_SESSION_ID;
import static com.hazelcast.util.Preconditions.checkPositive;
import static java.util.Collections.unmodifiableCollection;

/**
* TODO: Javadoc Pending...
Expand Down Expand Up @@ -188,6 +189,11 @@ int drain(long sessionId, UUID invocationUid) {
return drained;
}

@Override
public Collection<Long> getOwnerSessionIds() {
return unmodifiableCollection(sessionStates.keySet());
}

Tuple2<Boolean, Collection<SemaphoreInvocationKey>> change(int permits) {
if (permits == 0) {
Collection<SemaphoreInvocationKey> c = Collections.emptyList();
Expand Down
Expand Up @@ -11,6 +11,7 @@
import com.hazelcast.raft.impl.service.HazelcastRaftTestSupport;
import com.hazelcast.raft.impl.service.RaftService;
import com.hazelcast.raft.impl.service.operation.snapshot.RestoreSnapshotOp;
import com.hazelcast.raft.impl.session.RaftSessionService;
import com.hazelcast.raft.service.blocking.ResourceRegistry;
import com.hazelcast.raft.service.lock.proxy.RaftFencedLockProxy;
import com.hazelcast.raft.service.session.SessionManagerService;
Expand All @@ -33,7 +34,9 @@

import static com.hazelcast.raft.impl.RaftUtil.getSnapshotEntry;
import static com.hazelcast.raft.service.lock.RaftFencedLockBasicTest.lockByOtherThread;
import static com.hazelcast.raft.service.session.AbstractSessionManager.NO_SESSION_ID;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -81,6 +84,8 @@ protected Config createConfig(int groupSize, int metadataGroupSize) {
RaftConfig raftConfig = config.getRaftConfig();
raftConfig.getRaftAlgorithmConfig().setCommitIndexAdvanceCountToSnapshot(LOG_ENTRY_COUNT_TO_SNAPSHOT);
raftConfig.addGroupConfig(new RaftGroupConfig(name, groupSize));
raftConfig.setSessionTimeToLiveSeconds(10);
raftConfig.setSessionHeartbeatIntervalMillis(SECONDS.toMillis(1));

RaftLockConfig lockConfig = new RaftLockConfig(name, name);
config.addRaftLockConfig(lockConfig);
Expand All @@ -95,7 +100,7 @@ protected HazelcastInstance[] createInstances() {
public void testSuccessfulTryLockClearsWaitTimeouts() {
lock.lock();

RaftGroupId groupId = getGroupId(lock);
RaftGroupId groupId = lock.getGroupId();
HazelcastInstance leader = getLeaderInstance(instances, groupId);
RaftLockService service = getNodeEngineImpl(leader).getService(RaftLockService.SERVICE_NAME);
final LockRegistry registry = service.getRegistryOrNull(groupId);
Expand Down Expand Up @@ -131,7 +136,7 @@ public void run() {
public void testFailedTryLockClearsWaitTimeouts() throws InterruptedException {
lockByOtherThread(lock);

RaftGroupId groupId = getGroupId(lock);
RaftGroupId groupId = lock.getGroupId();
HazelcastInstance leader = getLeaderInstance(instances, groupId);
RaftLockService service = getNodeEngineImpl(leader).getService(RaftLockService.SERVICE_NAME);
LockRegistry registry = service.getRegistryOrNull(groupId);
Expand All @@ -146,7 +151,7 @@ public void testFailedTryLockClearsWaitTimeouts() throws InterruptedException {
public void testDestroyClearsWaitTimeouts() {
lockByOtherThread(lock);

RaftGroupId groupId = getGroupId(lock);
RaftGroupId groupId = lock.getGroupId();
HazelcastInstance leader = getLeaderInstance(instances, groupId);
RaftLockService service = getNodeEngineImpl(leader).getService(RaftLockService.SERVICE_NAME);
final LockRegistry registry = service.getRegistryOrNull(groupId);
Expand Down Expand Up @@ -190,7 +195,7 @@ public void run() {
}
});

final RaftGroupId groupId = getGroupId(this.lock);
final RaftGroupId groupId = this.lock.getGroupId();

spawn(new Runnable() {
@Override
Expand Down Expand Up @@ -256,7 +261,61 @@ public void run() {
});
}

private RaftGroupId getGroupId(FencedLock lock) {
return ((RaftFencedLockProxy) lock).getGroupId();
@Test
public void testInactiveSessionsAreEventuallyClosed() {
lock.lock();

final RaftGroupId groupId = lock.getGroupId();

assertTrueEventually(new AssertTask() {
@Override
public void run() {
for (HazelcastInstance instance : instances) {
RaftSessionService sessionService = getNodeEngineImpl(instance).getService(RaftSessionService.SERVICE_NAME);
assertFalse(sessionService.getSessions(groupId).isEmpty());
}
}
});

lock.forceUnlock();

assertTrueEventually(new AssertTask() {
@Override
public void run() {
for (HazelcastInstance instance : instances) {
RaftSessionService service = getNodeEngineImpl(instance).getService(RaftSessionService.SERVICE_NAME);
assertTrue(service.getSessions(groupId).isEmpty());
}

SessionManagerService service = getNodeEngineImpl(lockInstance).getService(SessionManagerService.SERVICE_NAME);
assertEquals(NO_SESSION_ID, service.getSession(groupId));
}
});
}

@Test
public void testActiveSessionIsNotClosed() {
lock.lock();

assertTrueEventually(new AssertTask() {
@Override
public void run() {
for (HazelcastInstance instance : instances) {
RaftSessionService sessionService = getNodeEngineImpl(instance).getService(RaftSessionService.SERVICE_NAME);
assertFalse(sessionService.getSessions(lock.getGroupId()).isEmpty());
}
}
});

assertTrueAllTheTime(new AssertTask() {
@Override
public void run() {
for (HazelcastInstance instance : instances) {
RaftSessionService sessionService = getNodeEngineImpl(instance).getService(RaftSessionService.SERVICE_NAME);
assertFalse(sessionService.getSessions(lock.getGroupId()).isEmpty());
}
}
}, 20);
}

}
Expand Up @@ -328,7 +328,7 @@ public void test_ReentrantLockFails_whenSessionClosed() throws ExecutionExceptio
assertTrue(fence > 0);

final AbstractSessionManager sessionManager = getSessionManager(lockInstance);
final RaftGroupId groupId = lock.getRaftGroupId();
final RaftGroupId groupId = lock.getGroupId();
final long sessionId = sessionManager.getSession(groupId);
assertNotEquals(AbstractSessionManager.NO_SESSION_ID, sessionId);

Expand All @@ -353,7 +353,7 @@ public void test_ReentrantTryLockFails_whenSessionClosed() throws ExecutionExcep
assertTrue(fence > 0);

final AbstractSessionManager sessionManager = getSessionManager(lockInstance);
final RaftGroupId groupId = lock.getRaftGroupId();
final RaftGroupId groupId = lock.getGroupId();
final long sessionId = sessionManager.getSession(groupId);
assertNotEquals(AbstractSessionManager.NO_SESSION_ID, sessionId);

Expand All @@ -378,7 +378,7 @@ public void test_ReentrantTryLockWithTimeoutFails_whenSessionClosed() throws Exe
assertTrue(fence > 0);

final AbstractSessionManager sessionManager = getSessionManager(lockInstance);
final RaftGroupId groupId = lock.getRaftGroupId();
final RaftGroupId groupId = lock.getGroupId();
final long sessionId = sessionManager.getSession(groupId);
assertNotEquals(AbstractSessionManager.NO_SESSION_ID, sessionId);

Expand Down Expand Up @@ -461,7 +461,7 @@ public void test_failedTryLock_doesNotAcquireSession() {
lockByOtherThread(lock);

final AbstractSessionManager sessionManager = getSessionManager(lockInstance);
final RaftGroupId groupId = lock.getRaftGroupId();
final RaftGroupId groupId = lock.getGroupId();
final long sessionId = sessionManager.getSession(groupId);
assertNotEquals(AbstractSessionManager.NO_SESSION_ID, sessionId);
assertEquals(1, sessionManager.getSessionUsageCount(groupId, sessionId));
Expand Down

0 comments on commit bd2eb6d

Please sign in to comment.