Skip to content

Commit

Permalink
KEYCLOAK-5716 KEYCLOAK-5738 Avoid infinispan deadlock. Ensure code-to…
Browse files Browse the repository at this point in the history
…-token works correctly in cross-dc
  • Loading branch information
mposolda committed Nov 7, 2017
1 parent 1db3134 commit 62a1c18
Show file tree
Hide file tree
Showing 28 changed files with 206 additions and 108 deletions.
@@ -1,5 +1,5 @@
/* /*
* Copyright 2016 Red Hat, Inc. and/or its affiliates * Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags. * and other contributors as indicated by the @author tags.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -15,9 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */


package org.keycloak.testsuite; package org.keycloak.common.util;

import java.util.function.Supplier;


/** /**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a> * @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
Expand All @@ -44,7 +42,9 @@ public static int execute(Runnable runnable, int attemptsCount, long intervalMil
executionIndex++; executionIndex++;
if (attemptsCount > 0) { if (attemptsCount > 0) {
try { try {
Thread.sleep(intervalMillis); if (intervalMillis > 0) {
Thread.sleep(intervalMillis);
}
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
ie.addSuppressed(e); ie.addSuppressed(e);
throw new RuntimeException(ie); throw new RuntimeException(ie);
Expand Down Expand Up @@ -73,7 +73,9 @@ public static <T> T call(Supplier<T> supplier, int attemptsCount, long intervalM
attemptsCount--; attemptsCount--;
if (attemptsCount > 0) { if (attemptsCount > 0) {
try { try {
Thread.sleep(intervalMillis); if (intervalMillis > 0) {
Thread.sleep(intervalMillis);
}
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
ie.addSuppressed(e); ie.addSuppressed(e);
throw new RuntimeException(ie); throw new RuntimeException(ie);
Expand All @@ -85,4 +87,19 @@ public static <T> T call(Supplier<T> supplier, int attemptsCount, long intervalM
} }
} }



/**
* Needed here just because java.util.function.Supplier defined from Java 8
*/
public interface Supplier<T> {

/**
* Gets a result.
*
* @return a result
*/
T get();
}


} }
5 changes: 4 additions & 1 deletion misc/CrossDataCenter.md
Expand Up @@ -95,15 +95,18 @@ Infinispan Server setup
<cache-container name="clustered" default-cache="default" statistics="true"> <cache-container name="clustered" default-cache="default" statistics="true">
... ...
<replicated-cache-configuration name="sessions-cfg" mode="SYNC" start="EAGER" batching="false"> <replicated-cache-configuration name="sessions-cfg" mode="SYNC" start="EAGER" batching="false">
<transaction mode="NON_XA" locking="PESSIMISTIC"/> <transaction mode="NON_DURABLE_XA" locking="PESSIMISTIC"/>
<locking acquire-timeout="0" />
<backups> <backups>
<backup site="site2" failure-policy="FAIL" strategy="SYNC" enabled="true"/> <backup site="site2" failure-policy="FAIL" strategy="SYNC" enabled="true"/>
</backups> </backups>
</replicated-cache-configuration> </replicated-cache-configuration>


<replicated-cache name="work" configuration="sessions-cfg"/> <replicated-cache name="work" configuration="sessions-cfg"/>
<replicated-cache name="sessions" configuration="sessions-cfg"/> <replicated-cache name="sessions" configuration="sessions-cfg"/>
<replicated-cache name="clientSessions" configuration="sessions-cfg"/>
<replicated-cache name="offlineSessions" configuration="sessions-cfg"/> <replicated-cache name="offlineSessions" configuration="sessions-cfg"/>
<replicated-cache name="offlineClientSessions" configuration="sessions-cfg"/>
<replicated-cache name="actionTokens" configuration="sessions-cfg"/> <replicated-cache name="actionTokens" configuration="sessions-cfg"/>
<replicated-cache name="loginFailures" configuration="sessions-cfg"/> <replicated-cache name="loginFailures" configuration="sessions-cfg"/>


Expand Down
Expand Up @@ -22,6 +22,8 @@
import java.util.function.Supplier; import java.util.function.Supplier;


import org.infinispan.commons.api.BasicCache; import org.infinispan.commons.api.BasicCache;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Retry;
import org.keycloak.models.CodeToTokenStoreProvider; import org.keycloak.models.CodeToTokenStoreProvider;
import org.keycloak.models.KeycloakSession; import org.keycloak.models.KeycloakSession;
import org.keycloak.models.sessions.infinispan.entities.ActionTokenValueEntity; import org.keycloak.models.sessions.infinispan.entities.ActionTokenValueEntity;
Expand All @@ -31,6 +33,8 @@
*/ */
public class InfinispanCodeToTokenStoreProvider implements CodeToTokenStoreProvider { public class InfinispanCodeToTokenStoreProvider implements CodeToTokenStoreProvider {


public static final Logger logger = Logger.getLogger(InfinispanCodeToTokenStoreProvider.class);

private final Supplier<BasicCache<UUID, ActionTokenValueEntity>> codeCache; private final Supplier<BasicCache<UUID, ActionTokenValueEntity>> codeCache;
private final KeycloakSession session; private final KeycloakSession session;


Expand All @@ -45,9 +49,24 @@ public boolean putIfAbsent(UUID codeId) {


int lifespanInSeconds = session.getContext().getRealm().getAccessCodeLifespan(); int lifespanInSeconds = session.getContext().getRealm().getAccessCodeLifespan();


BasicCache<UUID, ActionTokenValueEntity> cache = codeCache.get(); boolean codeAlreadyExists = Retry.call(() -> {
ActionTokenValueEntity existing = cache.putIfAbsent(codeId, tokenValue, lifespanInSeconds, TimeUnit.SECONDS);
return existing == null; try {
BasicCache<UUID, ActionTokenValueEntity> cache = codeCache.get();
ActionTokenValueEntity existing = cache.putIfAbsent(codeId, tokenValue, lifespanInSeconds, TimeUnit.SECONDS);
return existing == null;
} catch (RuntimeException re) {
if (logger.isDebugEnabled()) {
logger.debugf(re, "Failed when adding code %s", codeId);
}

// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
throw re;
}

}, 3, 0);

return codeAlreadyExists;
} }


@Override @Override
Expand Down
Expand Up @@ -255,6 +255,10 @@ private <K, V extends SessionEntity> boolean checkRemoteCache(KeycloakSession se


RemoteCache<K, SessionEntityWrapper<V>> remoteCache = (RemoteCache) remoteStores.iterator().next().getRemoteCache(); RemoteCache<K, SessionEntityWrapper<V>> remoteCache = (RemoteCache) remoteStores.iterator().next().getRemoteCache();


if (remoteCache == null) {
throw new IllegalStateException("No remote cache available for the infinispan cache: " + ispnCache.getName());
}

remoteCacheInvoker.addRemoteCache(ispnCache.getName(), remoteCache, maxIdleLoader); remoteCacheInvoker.addRemoteCache(ispnCache.getName(), remoteCache, maxIdleLoader);


RemoteCacheSessionListener hotrodListener = RemoteCacheSessionListener.createListener(session, ispnCache, remoteCache); RemoteCacheSessionListener hotrodListener = RemoteCacheSessionListener.createListener(session, ispnCache, remoteCache);
Expand Down
Expand Up @@ -17,6 +17,7 @@


package org.keycloak.models.sessions.infinispan.remotestore; package org.keycloak.models.sessions.infinispan.remotestore;


import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time; import org.keycloak.common.util.Time;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
Expand All @@ -32,6 +33,7 @@
import org.keycloak.models.RealmModel; import org.keycloak.models.RealmModel;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask; import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity; import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;


Expand Down Expand Up @@ -71,14 +73,28 @@ public <K, V extends SessionEntity> void runTask(KeycloakSession kcSession, Real
return; return;
} }


long maxIdleTimeMs = context.maxIdleTimeLoader.getMaxIdleTimeMs(realm); long loadedMaxIdleTimeMs = context.maxIdleTimeLoader.getMaxIdleTimeMs(realm);


// Double the timeout to ensure that entry won't expire on remoteCache in case that write of some entities to remoteCache is postponed (eg. userSession.lastSessionRefresh) // Double the timeout to ensure that entry won't expire on remoteCache in case that write of some entities to remoteCache is postponed (eg. userSession.lastSessionRefresh)
maxIdleTimeMs = maxIdleTimeMs * 2; final long maxIdleTimeMs = loadedMaxIdleTimeMs * 2;


logger.debugf("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key); logger.debugf("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key);


runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, sessionWrapper); Retry.execute(() -> {

try {
runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, sessionWrapper);
} catch (RuntimeException re) {
if (logger.isDebugEnabled()) {
logger.debugf(re, "Failed running task '%s' on remote cache '%s' . Key: '%s' . Will try to retry the task",
operation, cacheName, key);
}

// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
throw re;
}

}, 10, 0);
} }




Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.infinispan.client.hotrod.annotation.ClientListener; import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent; import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent; import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.manager.PersistenceManager; import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.remote.RemoteStore; import org.infinispan.persistence.remote.RemoteStore;
Expand All @@ -47,9 +48,13 @@ public class ConcurrencyJDGRemoteCacheTest {


private static Map<String, EntryInfo> state = new HashMap<>(); private static Map<String, EntryInfo> state = new HashMap<>();


private RemoteCache remoteCache1;
private RemoteCache remoteCache2;


public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// Init map somehow // Init map somehow
for (int i=0 ; i<30 ; i++) { for (int i=0 ; i<3000 ; i++) {
String key = "key-" + i; String key = "key-" + i;
state.put(key, new EntryInfo()); state.put(key, new EntryInfo());
} }
Expand All @@ -58,19 +63,25 @@ public static void main(String[] args) throws Exception {
Worker worker1 = createWorker(1); Worker worker1 = createWorker(1);
Worker worker2 = createWorker(2); Worker worker2 = createWorker(2);


long start = System.currentTimeMillis();

// Start and join workers // Start and join workers
worker1.start(); worker1.start();
worker2.start(); worker2.start();


worker1.join(); worker1.join();
worker2.join(); worker2.join();


long took = System.currentTimeMillis() - start;

// Output // Output
for (Map.Entry<String, EntryInfo> entry : state.entrySet()) { for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
System.out.println(entry.getKey() + ":::" + entry.getValue()); System.out.println(entry.getKey() + ":::" + entry.getValue());
worker1.cache.remove(entry.getKey()); worker1.cache.remove(entry.getKey());
} }


System.out.println("Took: " + took + " ms");

// Finish JVM // Finish JVM
worker1.cache.getCacheManager().stop(); worker1.cache.getCacheManager().stop();
worker2.cache.getCacheManager().stop(); worker2.cache.getCacheManager().stop();
Expand Down Expand Up @@ -127,7 +138,7 @@ public void run() {
String cacheKey = entry.getKey(); String cacheKey = entry.getKey();
EntryInfo wrapper = state.get(cacheKey); EntryInfo wrapper = state.get(cacheKey);


int val = getClusterStartupTime(this.cache, cacheKey, wrapper); int val = getClusterStartupTime(this.cache, cacheKey, wrapper, myThreadId);
if (myThreadId == 1) { if (myThreadId == 1) {
wrapper.th1.set(val); wrapper.th1.set(val);
} else { } else {
Expand All @@ -141,8 +152,8 @@ public void run() {


} }


public static int getClusterStartupTime(Cache<String, Integer> cache, String cacheKey, EntryInfo wrapper) { public static int getClusterStartupTime(Cache<String, Integer> cache, String cacheKey, EntryInfo wrapper, int myThreadId) {
Integer startupTime = new Random().nextInt(1024); Integer startupTime = myThreadId==1 ? Integer.parseInt(cacheKey.substring(4)) : Integer.parseInt(cacheKey.substring(4)) * 2;


// Concurrency doesn't work correctly with this // Concurrency doesn't work correctly with this
//Integer existingClusterStartTime = (Integer) cache.putIfAbsent(cacheKey, startupTime); //Integer existingClusterStartTime = (Integer) cache.putIfAbsent(cacheKey, startupTime);
Expand All @@ -154,21 +165,25 @@ public static int getClusterStartupTime(Cache<String, Integer> cache, String cac
for (int i=0 ; i<10 ; i++) { for (int i=0 ; i<10 ; i++) {
try { try {
existingClusterStartTime = (Integer) remoteCache.withFlags(Flag.FORCE_RETURN_VALUE).putIfAbsent(cacheKey, startupTime); existingClusterStartTime = (Integer) remoteCache.withFlags(Flag.FORCE_RETURN_VALUE).putIfAbsent(cacheKey, startupTime);
} catch (Exception ce) { break;
} catch (HotRodClientException ce) {
if (i == 9) { if (i == 9) {
throw ce; throw ce;
//break; //break;
} else { } else {
System.err.println("EXception: i=" + i); wrapper.exceptions.incrementAndGet();
System.err.println("Exception: i=" + i + " for key: " + cacheKey + " and myThreadId: " + myThreadId);
} }
} }
} }


if (existingClusterStartTime == null || startupTime.equals(remoteCache.get(cacheKey))) { if (existingClusterStartTime == null
// || startupTime.equals(remoteCache.get(cacheKey))
) {
wrapper.successfulInitializations.incrementAndGet(); wrapper.successfulInitializations.incrementAndGet();
return startupTime; return startupTime;
} else { } else {
System.err.println("Not equal!!! startupTime=" + startupTime + ", existingClusterStartTime=" + existingClusterStartTime ); wrapper.failedInitializations.incrementAndGet();
return existingClusterStartTime; return existingClusterStartTime;
} }
} }
Expand All @@ -178,10 +193,13 @@ public static class EntryInfo {
AtomicInteger successfulListenerWrites = new AtomicInteger(0); AtomicInteger successfulListenerWrites = new AtomicInteger(0);
AtomicInteger th1 = new AtomicInteger(); AtomicInteger th1 = new AtomicInteger();
AtomicInteger th2 = new AtomicInteger(); AtomicInteger th2 = new AtomicInteger();
AtomicInteger failedInitializations = new AtomicInteger();
AtomicInteger exceptions = new AtomicInteger();


@Override @Override
public String toString() { public String toString() {
return String.format("Inits: %d, listeners: %d, th1: %d, th2: %d", successfulInitializations.get(), successfulListenerWrites.get(), th1.get(), th2.get()); return String.format("Inits: %d, listeners: %d, failedInits: %d, exceptions: %s, th1: %d, th2: %d", successfulInitializations.get(), successfulListenerWrites.get(),
failedInitializations.get(), exceptions.get(), th1.get(), th2.get());
} }
} }


Expand Down
Expand Up @@ -121,6 +121,8 @@ public static void main(String[] args) throws Exception {


logger.info("SESSIONS NOT AVAILABLE ON DC2"); logger.info("SESSIONS NOT AVAILABLE ON DC2");


long took = System.currentTimeMillis() - start;
logger.infof("took %d ms", took);


// // Start and join workers // // Start and join workers
// worker1.start(); // worker1.start();
Expand All @@ -137,8 +139,6 @@ public static void main(String[] args) throws Exception {
cache2.getCacheManager().stop(); cache2.getCacheManager().stop();
} }


long took = System.currentTimeMillis() - start;

// // Output // // Output
// for (Map.Entry<String, EntryInfo> entry : state.entrySet()) { // for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
// System.out.println(entry.getKey() + ":::" + entry.getValue()); // System.out.println(entry.getKey() + ":::" + entry.getValue());
Expand Down
Expand Up @@ -187,6 +187,10 @@ public static void main(String[] args) throws Exception {
", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() + ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get()); ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());



System.out.println("remoteCache1.notes: " + ((UserSessionEntity) remoteCache1.get("123")).getNotes().size() );
System.out.println("remoteCache2.notes: " + ((UserSessionEntity) remoteCache2.get("123")).getNotes().size() );

System.out.println("Histogram: "); System.out.println("Histogram: ");
//histogram.dumpStats(); //histogram.dumpStats();


Expand Down Expand Up @@ -314,14 +318,26 @@ public void run() {
// In case it's hardcoded (eg. all the replaces are doing same change, so session is defacto not changed), then histogram may contain bigger value than 1 on some places. // In case it's hardcoded (eg. all the replaces are doing same change, so session is defacto not changed), then histogram may contain bigger value than 1 on some places.
//String noteKey = "some"; //String noteKey = "some";


boolean replaced = false; ReplaceStatus replaced = ReplaceStatus.NOT_REPLACED;
while (!replaced) { while (replaced != ReplaceStatus.REPLACED) {
VersionedValue<UserSessionEntity> versioned = remoteCache.getVersioned("123"); VersionedValue<UserSessionEntity> versioned = remoteCache.getVersioned("123");
UserSessionEntity oldSession = versioned.getValue(); UserSessionEntity oldSession = versioned.getValue();
//UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession); //UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession);
UserSessionEntity clone = oldSession; UserSessionEntity clone = oldSession;


clone.getNotes().put(noteKey, "someVal"); // In case that exception was thrown (ReplaceStatus.ERROR), the remoteCache may have the note. Seems that transactions are not fully rolled-back on the JDG side
// in case that backup fails
if (replaced == ReplaceStatus.NOT_REPLACED) {
clone.getNotes().put(noteKey, "someVal");
} else if (replaced == ReplaceStatus.ERROR) {
if (clone.getNotes().containsKey(noteKey)) {
System.err.println("I HAVE THE KEY: " + noteKey);
} else {
System.err.println("I DON'T HAVE THE KEY: " + noteKey);
clone.getNotes().put(noteKey, "someVal");
}
}

//cache.replace("123", clone); //cache.replace("123", clone);
replaced = cacheReplace(versioned, clone); replaced = cacheReplace(versioned, clone);
} }
Expand All @@ -336,7 +352,7 @@ public void run() {


} }


private boolean cacheReplace(VersionedValue<UserSessionEntity> oldSession, UserSessionEntity newSession) { private ReplaceStatus cacheReplace(VersionedValue<UserSessionEntity> oldSession, UserSessionEntity newSession) {
try { try {
boolean replaced = remoteCache.replaceWithVersion("123", newSession, oldSession.getVersion()); boolean replaced = remoteCache.replaceWithVersion("123", newSession, oldSession.getVersion());
//boolean replaced = true; //boolean replaced = true;
Expand All @@ -348,15 +364,19 @@ private boolean cacheReplace(VersionedValue<UserSessionEntity> oldSession, UserS
} else { } else {
histogram.increaseSuccessOpsCount(oldSession.getVersion()); histogram.increaseSuccessOpsCount(oldSession.getVersion());
} }
return replaced; return replaced ? ReplaceStatus.REPLACED : ReplaceStatus.NOT_REPLACED;
} catch (Exception re) { } catch (Exception re) {
failedReplaceCounter2.incrementAndGet(); failedReplaceCounter2.incrementAndGet();
return false; return ReplaceStatus.ERROR;
} }
//return replaced; //return replaced;
} }


} }

private enum ReplaceStatus {
REPLACED, NOT_REPLACED, ERROR
}
/* /*
// Worker, which operates on "classic" cache and rely on operations delegated to the second cache // Worker, which operates on "classic" cache and rely on operations delegated to the second cache
private static class CacheWorker extends Thread { private static class CacheWorker extends Thread {
Expand Down
Expand Up @@ -36,7 +36,8 @@
<xsl:apply-templates select="@* | node()" /> <xsl:apply-templates select="@* | node()" />


<replicated-cache-configuration name="sessions-cfg" mode="SYNC" start="EAGER" batching="false"> <replicated-cache-configuration name="sessions-cfg" mode="SYNC" start="EAGER" batching="false">
<transaction mode="NON_XA" locking="PESSIMISTIC"/> <transaction mode="NON_DURABLE_XA" locking="PESSIMISTIC"/>
<locking acquire-timeout="0" />
<backups> <backups>
<backup site="{$remote.site}" failure-policy="FAIL" strategy="SYNC" enabled="true"/> <backup site="{$remote.site}" failure-policy="FAIL" strategy="SYNC" enabled="true"/>
</backups> </backups>
Expand Down

0 comments on commit 62a1c18

Please sign in to comment.