Skip to content

Commit

Permalink
KEYCLOAK-2412 Added ClusterProvider. Avoid concurrent federation sync…
Browse files Browse the repository at this point in the history
… execution by more cluster nodes at the same time.

Clustering - more progress
  • Loading branch information
mposolda committed Feb 17, 2016
1 parent 61f2baf commit 1328531
Show file tree
Hide file tree
Showing 34 changed files with 1,087 additions and 107 deletions.
@@ -0,0 +1,186 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.keycloak.cluster.infinispan;

import java.io.Serializable;
import java.util.concurrent.Callable;

import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.remoting.transport.Transport;
import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ExecutionResult;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.KeycloakSession;

/**
* Various utils related to clustering
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class InfinispanClusterProvider implements ClusterProvider {

protected static final Logger logger = Logger.getLogger(InfinispanClusterProvider.class);

public static final String CLUSTER_STARTUP_TIME_KEY = "cluster-start-time";
private static final String TASK_KEY_PREFIX = "task::";

private final InfinispanClusterProviderFactory factory;
private final KeycloakSession session;
private final Cache<String, Serializable> cache;

public InfinispanClusterProvider(InfinispanClusterProviderFactory factory, KeycloakSession session, Cache<String, Serializable> cache) {
this.factory = factory;
this.session = session;
this.cache = cache;
}


@Override
public int getClusterStartupTime() {
Integer existingClusterStartTime = (Integer) cache.get(InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY);
if (existingClusterStartTime != null) {
return existingClusterStartTime;
} else {
// clusterStartTime not yet initialized. Let's try to put our startupTime
int serverStartTime = (int) (session.getKeycloakSessionFactory().getServerStartupTimestamp() / 1000);

existingClusterStartTime = (Integer) cache.putIfAbsent(InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY, serverStartTime);
if (existingClusterStartTime == null) {
logger.infof("Initialized cluster startup time to %s", Time.toDate(serverStartTime).toString());
return serverStartTime;
} else {
return existingClusterStartTime;
}
}
}


@Override
public void close() {
}


@Override
public <T> ExecutionResult<T> executeIfNotExecuted(String taskKey, int taskTimeoutInSeconds, Callable<T> task) {
String cacheKey = TASK_KEY_PREFIX + taskKey;
boolean locked = tryLock(cacheKey, taskTimeoutInSeconds);
if (locked) {
try {
try {
T result = task.call();
return ExecutionResult.executed(result);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new RuntimeException("Unexpected exception when executed task " + taskKey, e);
}
} finally {
removeFromCache(cacheKey);
}
} else {
return ExecutionResult.notExecuted();
}
}


@Override
public void registerListener(String taskKey, ClusterListener task) {
factory.registerListener(taskKey, task);
}


@Override
public void notify(String taskKey, ClusterEvent event) {
// Put the value to the cache to notify listeners on all the nodes
cache.put(taskKey, event);
}


private String getCurrentNode(Cache<String, Serializable> cache) {
Transport transport = cache.getCacheManager().getTransport();
return transport==null ? "local" : transport.getAddress().toString();
}


private LockEntry createLockEntry(Cache<String, Serializable> cache) {
LockEntry lock = new LockEntry();
lock.setNode(getCurrentNode(cache));
lock.setTimestamp(Time.currentTime());
return lock;
}


private boolean tryLock(String cacheKey, int taskTimeoutInSeconds) {
LockEntry myLock = createLockEntry(cache);

LockEntry existingLock = (LockEntry) cache.putIfAbsent(cacheKey, myLock);
if (existingLock != null) {
// Task likely already in progress. Check if timestamp is not outdated
int thatTime = existingLock.getTimestamp();
int currentTime = Time.currentTime();
if (thatTime + taskTimeoutInSeconds < currentTime) {
logger.infof("Task %s outdated when in progress by node %s. Will try to replace task with our node %s", cacheKey, existingLock.getNode(), myLock.getNode());
boolean replaced = cache.replace(cacheKey, existingLock, myLock);
// TODO: trace
if (!replaced) {
logger.infof("Failed to replace the task %s. Other thread replaced in the meantime. Ignoring task.", cacheKey);
}
return replaced;
} else {
logger.infof("Task %s in progress already by node %s. Ignoring task.", cacheKey, existingLock.getNode());
return false;
}
} else {
logger.infof("Successfully acquired lock for task %s. Our node is %s", cacheKey, myLock.getNode());
return true;
}
}


private void removeFromCache(String cacheKey) {
// 3 attempts to send the message (it may fail if some node fails in the meantime)
int retry = 3;
while (true) {
try {
cache.getAdvancedCache()
.withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FORCE_SYNCHRONOUS)
.remove(cacheKey);
logger.infof("Task %s removed from the cache", cacheKey);
return;
} catch (RuntimeException e) {
ComponentStatus status = cache.getStatus();
if (status.isStopping() || status.isTerminated()) {
logger.warnf("Failed to remove task %s from the cache. Cache is already terminating", cacheKey);
logger.debug(e.getMessage(), e);
return;
}
retry--;
if (retry == 0) {
throw e;
}
}
}
}

}
@@ -0,0 +1,199 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.keycloak.cluster.infinispan;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.infinispan.Cache;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ClusterProviderFactory;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;

/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class InfinispanClusterProviderFactory implements ClusterProviderFactory {

public static final String PROVIDER_ID = "infinispan";

protected static final Logger logger = Logger.getLogger(InfinispanClusterProviderFactory.class);

private volatile Cache<String, Serializable> workCache;

private Map<String, ClusterListener> listeners = new HashMap<>();

@Override
public ClusterProvider create(KeycloakSession session) {
lazyInit(session);
return new InfinispanClusterProvider(this, session, workCache);
}

private void lazyInit(KeycloakSession session) {
if (workCache == null) {
synchronized (this) {
if (workCache == null) {
workCache = session.getProvider(InfinispanConnectionProvider.class).getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
workCache.getCacheManager().addListener(new ViewChangeListener());
workCache.addListener(new CacheEntryListener());
}
}
}
}

@Override
public void init(Config.Scope config) {
}

@Override
public void postInit(KeycloakSessionFactory factory) {
}


@Override
public void close() {

}

@Override
public String getId() {
return PROVIDER_ID;
}


@Listener
public class ViewChangeListener {

@ViewChanged
public void viewChanged(ViewChangedEvent event) {
EmbeddedCacheManager cacheManager = event.getCacheManager();
Transport transport = cacheManager.getTransport();

// Coordinator makes sure that entries for outdated nodes are cleaned up
if (transport != null && transport.isCoordinator()) {

Set<String> newAddresses = convertAddresses(event.getNewMembers());
Set<String> removedNodesAddresses = convertAddresses(event.getOldMembers());
removedNodesAddresses.removeAll(newAddresses);

if (removedNodesAddresses.isEmpty()) {
return;
}

logger.infof("Nodes %s removed from cluster. Removing tasks locked by this nodes", removedNodesAddresses.toString());

Cache<String, Serializable> cache = cacheManager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);

Iterator<String> toRemove = cache.entrySet().stream().filter(new Predicate<Map.Entry<String, Serializable>>() {

@Override
public boolean test(Map.Entry<String, Serializable> entry) {
if (!(entry.getValue() instanceof LockEntry)) {
return false;
}

LockEntry lock = (LockEntry) entry.getValue();
return removedNodesAddresses.contains(lock.getNode());
}

}).map(new Function<Map.Entry<String, Serializable>, String>() {

@Override
public String apply(Map.Entry<String, Serializable> entry) {
return entry.getKey();
}

}).iterator();

while (toRemove.hasNext()) {
String rem = toRemove.next();
logger.infof("Removing task %s due it's node left cluster", rem);
cache.remove(rem);
}
}
}

private Set<String> convertAddresses(Collection<Address> addresses) {
return addresses.stream().map(new Function<Address, String>() {

@Override
public String apply(Address address) {
return address.toString();
}

}).collect(Collectors.toSet());
}

}


<T> void registerListener(String taskKey, ClusterListener task) {
listeners.put(taskKey, task);
}

@Listener
public class CacheEntryListener {

@CacheEntryCreated
public void cacheEntryCreated(CacheEntryCreatedEvent<String, Object> event) {
if (!event.isPre()) {
trigger(event.getKey(), event.getValue());
}
}

@CacheEntryModified
public void cacheEntryModified(CacheEntryModifiedEvent<String, Object> event) {
if (!event.isPre()) {
trigger(event.getKey(), event.getValue());
}
}

private void trigger(String key, Object value) {
ClusterListener task = listeners.get(key);
if (task != null) {
ClusterEvent event = (ClusterEvent) value;
task.run(event);
}
}
}

}

0 comments on commit 1328531

Please sign in to comment.