Skip to content

Commit

Permalink
KEYCLOAK-4189 - Cross DC testing
Browse files Browse the repository at this point in the history
  • Loading branch information
hmlnarik committed Jun 12, 2017
1 parent fd8a3dc commit a0f3a64
Show file tree
Hide file tree
Showing 45 changed files with 1,737 additions and 143 deletions.
Expand Up @@ -17,16 +17,30 @@


package org.keycloak.common.util; package org.keycloak.common.util;


import java.util.Collections;
import java.util.Map;
import java.util.Properties; import java.util.Properties;


/** /**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a> * @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
*/ */
public class SystemEnvProperties extends Properties { public class SystemEnvProperties extends Properties {


private final Map<String, String> overrides;

public SystemEnvProperties(Map<String, String> overrides) {
this.overrides = overrides;
}

public SystemEnvProperties() {
this.overrides = Collections.EMPTY_MAP;
}

@Override @Override
public String getProperty(String key) { public String getProperty(String key) {
if (key.startsWith("env.")) { if (overrides.containsKey(key)) {
return overrides.get(key);
} else if (key.startsWith("env.")) {
return System.getenv().get(key.substring(4)); return System.getenv().get(key.substring(4));
} else { } else {
return System.getProperty(key); return System.getProperty(key);
Expand Down
4 changes: 3 additions & 1 deletion misc/CrossDataCenter.md
Expand Up @@ -3,6 +3,8 @@ Test Cross-Data-Center scenario (test with external JDG server)


These are temporary notes. This docs should be removed once we have cross-DC support finished and properly documented. These are temporary notes. This docs should be removed once we have cross-DC support finished and properly documented.


Note that these steps are already automated, see Cross-DC tests section in [HOW-TO-RUN.md](../testsuite/integration-arquillian/HOW-TO-RUN.md) document.

What is working right now is: What is working right now is:
- Propagating of invalidation messages for "realms" and "users" caches - Propagating of invalidation messages for "realms" and "users" caches
- All the other things provided by ClusterProvider, which is: - All the other things provided by ClusterProvider, which is:
Expand All @@ -18,7 +20,7 @@ Basic setup


This is setup with 2 keycloak nodes, which are NOT in cluster. They just share the same database and they will be configured with "work" infinispan cache with remoteStore, which will point This is setup with 2 keycloak nodes, which are NOT in cluster. They just share the same database and they will be configured with "work" infinispan cache with remoteStore, which will point
to external JDG server. to external JDG server.

JDG Server setup JDG Server setup
---------------- ----------------
- Download JDG 7.0 server and unzip to some folder - Download JDG 7.0 server and unzip to some folder
Expand Down
Expand Up @@ -154,7 +154,8 @@ protected void initEmbedded() {


if (clustered) { if (clustered) {
String nodeName = config.get("nodeName", System.getProperty(InfinispanConnectionProvider.JBOSS_NODE_NAME)); String nodeName = config.get("nodeName", System.getProperty(InfinispanConnectionProvider.JBOSS_NODE_NAME));
configureTransport(gcb, nodeName); String jgroupsUdpMcastAddr = config.get("jgroupsUdpMcastAddr", System.getProperty(InfinispanConnectionProvider.JGROUPS_UDP_MCAST_ADDR));
configureTransport(gcb, nodeName, jgroupsUdpMcastAddr);
} }
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains); gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);


Expand Down Expand Up @@ -317,24 +318,40 @@ private Configuration getActionTokenCacheConfig() {
return cb.build(); return cb.build();
} }


protected void configureTransport(GlobalConfigurationBuilder gcb, String nodeName) { private static final Object CHANNEL_INIT_SYNCHRONIZER = new Object();

protected void configureTransport(GlobalConfigurationBuilder gcb, String nodeName, String jgroupsUdpMcastAddr) {
if (nodeName == null) { if (nodeName == null) {
gcb.transport().defaultTransport(); gcb.transport().defaultTransport();
} else { } else {
FileLookup fileLookup = FileLookupFactory.newInstance(); FileLookup fileLookup = FileLookupFactory.newInstance();


try { synchronized (CHANNEL_INIT_SYNCHRONIZER) {
// Compatibility with Wildfly String originalMcastAddr = System.getProperty(InfinispanConnectionProvider.JGROUPS_UDP_MCAST_ADDR);
JChannel channel = new JChannel(fileLookup.lookupFileLocation("default-configs/default-jgroups-udp.xml", this.getClass().getClassLoader())); if (jgroupsUdpMcastAddr == null) {
channel.setName(nodeName); System.getProperties().remove(InfinispanConnectionProvider.JGROUPS_UDP_MCAST_ADDR);
JGroupsTransport transport = new JGroupsTransport(channel); } else {

System.setProperty(InfinispanConnectionProvider.JGROUPS_UDP_MCAST_ADDR, jgroupsUdpMcastAddr);
gcb.transport().nodeName(nodeName); }
gcb.transport().transport(transport); try {

// Compatibility with Wildfly
logger.infof("Configured jgroups transport with the channel name: %s", nodeName); JChannel channel = new JChannel(fileLookup.lookupFileLocation("default-configs/default-jgroups-udp.xml", this.getClass().getClassLoader()));
} catch (Exception e) { channel.setName(nodeName);
throw new RuntimeException(e); JGroupsTransport transport = new JGroupsTransport(channel);

gcb.transport().nodeName(nodeName);
gcb.transport().transport(transport);

logger.infof("Configured jgroups transport with the channel name: %s", nodeName);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (originalMcastAddr == null) {
System.getProperties().remove(InfinispanConnectionProvider.JGROUPS_UDP_MCAST_ADDR);
} else {
System.setProperty(InfinispanConnectionProvider.JGROUPS_UDP_MCAST_ADDR, originalMcastAddr);
}
}
} }
} }
} }
Expand Down
Expand Up @@ -53,6 +53,7 @@ public interface InfinispanConnectionProvider extends Provider {


// System property used on Wildfly to identify distributedCache address and sticky session route // System property used on Wildfly to identify distributedCache address and sticky session route
String JBOSS_NODE_NAME = "jboss.node.name"; String JBOSS_NODE_NAME = "jboss.node.name";
String JGROUPS_UDP_MCAST_ADDR = "jgroups.udp.mcast_addr";




<K, V> Cache<K, V> getCache(String name); <K, V> Cache<K, V> getCache(String name);
Expand Down
Expand Up @@ -220,7 +220,7 @@ public void invalidationEventReceived(InvalidationEvent event) {


addInvalidationsFromEvent(event, invalidations); addInvalidationsFromEvent(event, invalidations);


getLogger().debugf("Invalidating %d cache items after received event %s", invalidations.size(), event); getLogger().debugf("[%s] Invalidating %d cache items after received event %s", cache.getCacheManager().getAddress(), invalidations.size(), event);


for (String invalidation : invalidations) { for (String invalidation : invalidations) {
invalidateObject(invalidation); invalidateObject(invalidation);
Expand Down
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.keycloak.models.cache.infinispan; package org.keycloak.models.cache.infinispan.events;


import org.keycloak.cluster.ClusterEvent; import org.keycloak.cluster.ClusterEvent;
import org.keycloak.models.sessions.infinispan.entities.ActionTokenReducedKey; import org.keycloak.models.sessions.infinispan.entities.ActionTokenReducedKey;
Expand Down
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.keycloak.models.cache.infinispan; package org.keycloak.models.cache.infinispan.events;


import org.keycloak.cluster.ClusterEvent; import org.keycloak.cluster.ClusterEvent;


Expand Down
Expand Up @@ -19,8 +19,8 @@
import org.keycloak.cluster.ClusterProvider; import org.keycloak.cluster.ClusterProvider;
import org.keycloak.models.*; import org.keycloak.models.*;


import org.keycloak.models.cache.infinispan.AddInvalidatedActionTokenEvent; import org.keycloak.models.cache.infinispan.events.AddInvalidatedActionTokenEvent;
import org.keycloak.models.cache.infinispan.RemoveActionTokensSpecificEvent; import org.keycloak.models.cache.infinispan.events.RemoveActionTokensSpecificEvent;
import org.keycloak.models.sessions.infinispan.entities.ActionTokenValueEntity; import org.keycloak.models.sessions.infinispan.entities.ActionTokenValueEntity;
import org.keycloak.models.sessions.infinispan.entities.ActionTokenReducedKey; import org.keycloak.models.sessions.infinispan.entities.ActionTokenReducedKey;
import java.util.*; import java.util.*;
Expand Down Expand Up @@ -58,7 +58,12 @@ public void put(ActionTokenKeyModel key, Map<String, String> notes) {
ActionTokenValueEntity tokenValue = new ActionTokenValueEntity(notes); ActionTokenValueEntity tokenValue = new ActionTokenValueEntity(notes);


ClusterProvider cluster = session.getProvider(ClusterProvider.class); ClusterProvider cluster = session.getProvider(ClusterProvider.class);
this.tx.notify(cluster, InfinispanActionTokenStoreProviderFactory.ACTION_TOKEN_EVENTS, new AddInvalidatedActionTokenEvent(tokenKey, key.getExpiration(), tokenValue), false); AddInvalidatedActionTokenEvent event = new AddInvalidatedActionTokenEvent(tokenKey, key.getExpiration(), tokenValue);
this.tx.notify(cluster, generateActionTokenEventId(), event, false);
}

private static String generateActionTokenEventId() {
return InfinispanActionTokenStoreProviderFactory.ACTION_TOKEN_EVENTS + "/" + UUID.randomUUID();
} }


@Override @Override
Expand Down Expand Up @@ -93,6 +98,6 @@ public void removeAll(String userId, String actionId) {
} }


ClusterProvider cluster = session.getProvider(ClusterProvider.class); ClusterProvider cluster = session.getProvider(ClusterProvider.class);
this.tx.notify(cluster, InfinispanActionTokenStoreProviderFactory.ACTION_TOKEN_EVENTS, new RemoveActionTokensSpecificEvent(userId, actionId), false); this.tx.notify(cluster, generateActionTokenEventId(), new RemoveActionTokensSpecificEvent(userId, actionId), false);
} }
} }
Expand Up @@ -23,21 +23,27 @@
import org.keycloak.connections.infinispan.InfinispanConnectionProvider; import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.*; import org.keycloak.models.*;


import org.keycloak.models.cache.infinispan.AddInvalidatedActionTokenEvent; import org.keycloak.models.cache.infinispan.events.AddInvalidatedActionTokenEvent;
import org.keycloak.models.cache.infinispan.RemoveActionTokensSpecificEvent; import org.keycloak.models.cache.infinispan.events.RemoveActionTokensSpecificEvent;
import org.keycloak.models.sessions.infinispan.entities.ActionTokenValueEntity; import org.keycloak.models.sessions.infinispan.entities.ActionTokenValueEntity;
import org.keycloak.models.sessions.infinispan.entities.ActionTokenReducedKey; import org.keycloak.models.sessions.infinispan.entities.ActionTokenReducedKey;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.context.Flag; import org.infinispan.context.Flag;
import org.infinispan.remoting.transport.Address;
import org.jboss.logging.Logger;


/** /**
* *
* @author hmlnarik * @author hmlnarik
*/ */
public class InfinispanActionTokenStoreProviderFactory implements ActionTokenStoreProviderFactory { public class InfinispanActionTokenStoreProviderFactory implements ActionTokenStoreProviderFactory {


private static final Logger LOG = Logger.getLogger(InfinispanActionTokenStoreProviderFactory.class);

private volatile Cache<ActionTokenReducedKey, ActionTokenValueEntity> actionTokenCache;

public static final String ACTION_TOKEN_EVENTS = "ACTION_TOKEN_EVENTS"; public static final String ACTION_TOKEN_EVENTS = "ACTION_TOKEN_EVENTS";


/** /**
Expand All @@ -49,43 +55,65 @@ public class InfinispanActionTokenStoreProviderFactory implements ActionTokenSto


@Override @Override
public ActionTokenStoreProvider create(KeycloakSession session) { public ActionTokenStoreProvider create(KeycloakSession session) {
return new InfinispanActionTokenStoreProvider(session, this.actionTokenCache);
}

@Override
public void init(Scope config) {
this.config = config;
}

private static Cache<ActionTokenReducedKey, ActionTokenValueEntity> initActionTokenCache(KeycloakSession session) {
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class); InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
Cache<ActionTokenReducedKey, ActionTokenValueEntity> actionTokenCache = connections.getCache(InfinispanConnectionProvider.ACTION_TOKEN_CACHE); Cache<ActionTokenReducedKey, ActionTokenValueEntity> cache = connections.getCache(InfinispanConnectionProvider.ACTION_TOKEN_CACHE);
final Address cacheAddress = cache.getCacheManager().getAddress();


ClusterProvider cluster = session.getProvider(ClusterProvider.class); ClusterProvider cluster = session.getProvider(ClusterProvider.class);


cluster.registerListener(ACTION_TOKEN_EVENTS, event -> { cluster.registerListener(ClusterProvider.ALL, event -> {
if (event instanceof RemoveActionTokensSpecificEvent) { if (event instanceof RemoveActionTokensSpecificEvent) {
RemoveActionTokensSpecificEvent e = (RemoveActionTokensSpecificEvent) event; RemoveActionTokensSpecificEvent e = (RemoveActionTokensSpecificEvent) event;


actionTokenCache LOG.debugf("[%s] Removing token invalidation for user+action: userId=%s, actionId=%s", cacheAddress, e.getUserId(), e.getActionId());

cache
.getAdvancedCache() .getAdvancedCache()
.withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_LOAD) .withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_LOAD)
.keySet() .keySet()
.stream() .stream()
.filter(k -> Objects.equals(k.getUserId(), e.getUserId()) && Objects.equals(k.getActionId(), e.getActionId())) .filter(k -> Objects.equals(k.getUserId(), e.getUserId()) && Objects.equals(k.getActionId(), e.getActionId()))
.forEach(actionTokenCache::remove); .forEach(cache::remove);
} else if (event instanceof AddInvalidatedActionTokenEvent) { } else if (event instanceof AddInvalidatedActionTokenEvent) {
AddInvalidatedActionTokenEvent e = (AddInvalidatedActionTokenEvent) event; AddInvalidatedActionTokenEvent e = (AddInvalidatedActionTokenEvent) event;


LOG.debugf("[%s] Invalidating token %s", cacheAddress, e.getKey());
if (e.getExpirationInSecs() == DEFAULT_CACHE_EXPIRATION) { if (e.getExpirationInSecs() == DEFAULT_CACHE_EXPIRATION) {
actionTokenCache.put(e.getKey(), e.getTokenValue()); cache.put(e.getKey(), e.getTokenValue());
} else { } else {
actionTokenCache.put(e.getKey(), e.getTokenValue(), e.getExpirationInSecs() - Time.currentTime(), TimeUnit.SECONDS); cache.put(e.getKey(), e.getTokenValue(), e.getExpirationInSecs() - Time.currentTime(), TimeUnit.SECONDS);
} }
} }
}); });


return new InfinispanActionTokenStoreProvider(session, actionTokenCache); LOG.debugf("[%s] Registered cluster listeners", cacheAddress);
}


@Override return cache;
public void init(Scope config) {
this.config = config;
} }


@Override @Override
public void postInit(KeycloakSessionFactory factory) { public void postInit(KeycloakSessionFactory factory) {
Cache<ActionTokenReducedKey, ActionTokenValueEntity> cache = this.actionTokenCache;

// It is necessary to put the cache initialization here, otherwise the cache would be initialized lazily, that
// means also listeners will start only after first cache initialization - that would be too late
if (cache == null) {
synchronized (this) {
cache = this.actionTokenCache;
if (cache == null) {
this.actionTokenCache = initActionTokenCache(factory.create());
}
}
}
} }


@Override @Override
Expand Down
Expand Up @@ -92,7 +92,7 @@ private void lazyInit(KeycloakSession session) {
ClusterProvider cluster = session.getProvider(ClusterProvider.class); ClusterProvider cluster = session.getProvider(ClusterProvider.class);
cluster.registerListener(AUTHENTICATION_SESSION_EVENTS, this::updateAuthNotes); cluster.registerListener(AUTHENTICATION_SESSION_EVENTS, this::updateAuthNotes);


log.debug("Registered cluster listeners"); log.debugf("[%s] Registered cluster listeners", authSessionsCache.getCacheManager().getAddress());
} }
} }
} }
Expand Down
Expand Up @@ -81,6 +81,11 @@ public boolean equals(Object obj) {
&& Objects.equals(this.actionVerificationNonce, other.getActionVerificationNonce()); && Objects.equals(this.actionVerificationNonce, other.getActionVerificationNonce());
} }


@Override
public String toString() {
return "userId=" + userId + ", actionId=" + actionId + ", actionVerificationNonce=" + actionVerificationNonce;
}

public static class ExternalizerImpl implements Externalizer<ActionTokenReducedKey> { public static class ExternalizerImpl implements Externalizer<ActionTokenReducedKey> {


@Override @Override
Expand Down
Expand Up @@ -53,7 +53,7 @@ public static class ExternalizerImpl implements Externalizer<ActionTokenValueEnt
public void writeObject(ObjectOutput output, ActionTokenValueEntity t) throws IOException { public void writeObject(ObjectOutput output, ActionTokenValueEntity t) throws IOException {
output.writeByte(VERSION_1); output.writeByte(VERSION_1);


output.writeBoolean(! t.notes.isEmpty()); output.writeBoolean(t.notes.isEmpty());
if (! t.notes.isEmpty()) { if (! t.notes.isEmpty()) {
output.writeObject(t.notes); output.writeObject(t.notes);
} }
Expand Down
Expand Up @@ -70,8 +70,10 @@
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.StringTokenizer; import java.util.StringTokenizer;
Expand All @@ -88,6 +90,8 @@ public class KeycloakApplication extends Application {


public static final String KEYCLOAK_EMBEDDED = "keycloak.embedded"; public static final String KEYCLOAK_EMBEDDED = "keycloak.embedded";


public static final String SERVER_CONTEXT_CONFIG_PROPERTY_OVERRIDES = "keycloak.server.context.config.property-overrides";

private static final Logger logger = Logger.getLogger(KeycloakApplication.class); private static final Logger logger = Logger.getLogger(KeycloakApplication.class);


protected boolean embedded = false; protected boolean embedded = false;
Expand Down Expand Up @@ -262,7 +266,7 @@ public URI getBaseUri(UriInfo uriInfo) {
public static void loadConfig(ServletContext context) { public static void loadConfig(ServletContext context) {
try { try {
JsonNode node = null; JsonNode node = null;

String dmrConfig = loadDmrConfig(context); String dmrConfig = loadDmrConfig(context);
if (dmrConfig != null) { if (dmrConfig != null) {
node = new ObjectMapper().readTree(dmrConfig); node = new ObjectMapper().readTree(dmrConfig);
Expand All @@ -287,7 +291,13 @@ public static void loadConfig(ServletContext context) {
} }


if (node != null) { if (node != null) {
Properties properties = new SystemEnvProperties(); Map<String, String> propertyOverridesMap = new HashMap<>();
String propertyOverrides = context.getInitParameter(SERVER_CONTEXT_CONFIG_PROPERTY_OVERRIDES);
if (context.getInitParameter(SERVER_CONTEXT_CONFIG_PROPERTY_OVERRIDES) != null) {
JsonNode jsonObj = new ObjectMapper().readTree(propertyOverrides);
jsonObj.fields().forEachRemaining(e -> propertyOverridesMap.put(e.getKey(), e.getValue().asText()));
}
Properties properties = new SystemEnvProperties(propertyOverridesMap);
Config.init(new JsonConfigProvider(node, properties)); Config.init(new JsonConfigProvider(node, properties));
} else { } else {
throw new RuntimeException("Keycloak config not found."); throw new RuntimeException("Keycloak config not found.");
Expand Down
18 changes: 18 additions & 0 deletions testsuite/integration-arquillian/HOW-TO-RUN.md
Expand Up @@ -417,4 +417,22 @@ and argument: `-p 8181`


3) Run loadbalancer (class `SimpleUndertowLoadBalancer`) without arguments and system properties. Loadbalancer runs on port 8180, so you can access Keycloak on `http://localhost:8180/auth` 3) Run loadbalancer (class `SimpleUndertowLoadBalancer`) without arguments and system properties. Loadbalancer runs on port 8180, so you can access Keycloak on `http://localhost:8180/auth`


## Cross-DC tests


Cross-DC tests use 2 data centers, each with one automatically started and one manually controlled backend servers
(currently only Keycloak on Undertow), and 1 frontend loadbalancer server node that sits in front of all servers.
The browser usually communicates directly with the frontent node and the test controls where the HTTP requests
land by adjusting load balancer configuration (e.g. to direct the traffic to only a single DC).

For an example of a test, see [org.keycloak.testsuite.crossdc.ActionTokenCrossDCTest](tests/base/src/test/java/org/keycloak/testsuite/crossdc/ActionTokenCrossDCTest.java).

The cross DC requires setting a profile specifying used cache server (currently only Infinispan) by specifying
`cache-server-infinispan` profile in maven.

#### Run Cross-DC Tests from Maven

Run the following command (adjust the test specification according to your needs):

`mvn -Pcache-server-infinispan -Dtest=*.crossdc.* -pl testsuite/integration-arquillian/tests/base test`

_Someone using IntelliJ IDEA, please describe steps for that IDE_

0 comments on commit a0f3a64

Please sign in to comment.