Skip to content

Commit

Permalink
KEYCLOAK-2529 Concurrent startup by more cluster nodes at the same ti…
Browse files Browse the repository at this point in the history
…me. Added DBLockProvider
  • Loading branch information
mposolda committed Mar 7, 2016
1 parent ad63b18 commit 8da768a
Show file tree
Hide file tree
Showing 37 changed files with 1,751 additions and 240 deletions.
23 changes: 23 additions & 0 deletions docbook/auth-server-docs/reference/en/en-US/modules/clustering.xml
Expand Up @@ -57,6 +57,29 @@
database. This can be a relational database or Mongo. To make sure your database doesn't become a single
point of failure you may also want to deploy your database to a cluster.
</para>
<section>
<title>DB lock</title>
<para>Note that Keycloak supports concurrent startup by more cluster nodes at the same. This is ensured by DB lock, which prevents that some
startup actions (migrating database from previous version, importing realms at startup, initial bootstrap of admin user) are always executed just by one
cluster node at a time and other cluster nodes need to wait until the current node finishes startup actions and release the DB lock.
</para>
<para>
By default, the maximum timeout for lock is 900 seconds, so in case that second node is not able to acquire the lock within 900 seconds, it fails to start.
The lock checking is done every 2 seconds by default. Typically you won't need to increase/decrease the default value, but just in case
it's possible to configure it in <literal>standalone/configuration/keycloak-server.json</literal>:
<programlisting>
<![CDATA[
"dblock": {
"jpa": {
"lockWaitTimeout": 900,
"lockRecheckTime": 2
}
}
]]>
</programlisting>
or similarly if you're using Mongo (just by replace <literal>jpa</literal> with <literal>mongo</literal>)
</para>
</section>
</section>

<section>
Expand Down
Expand Up @@ -126,7 +126,7 @@ private void lazyInit(KeycloakSession session) {
properties.put("hibernate.dialect", driverDialect);
}

String schema = config.get("schema");
String schema = getSchema();
if (schema != null) {
properties.put(JpaUtils.HIBERNATE_DEFAULT_SCHEMA, schema);
}
Expand Down Expand Up @@ -167,7 +167,7 @@ private void lazyInit(KeycloakSession session) {
}

if (currentVersion == null || !JpaUpdaterProvider.LAST_VERSION.equals(currentVersion)) {
updater.update(session, connection, schema);
updater.update(connection, schema);
} else {
logger.debug("Database is up to date");
}
Expand Down Expand Up @@ -212,7 +212,8 @@ protected void prepareOperationalInfo(Connection connection) {
}
}

private Connection getConnection() {
@Override
public Connection getConnection() {
try {
String dataSourceLookup = config.get("dataSource");
if (dataSourceLookup != null) {
Expand All @@ -226,6 +227,11 @@ private Connection getConnection() {
throw new RuntimeException("Failed to connect to database", e);
}
}

@Override
public String getSchema() {
return config.get("schema");
}

@Override
public Map<String,String> getOperationalInfo() {
Expand Down
Expand Up @@ -17,11 +17,18 @@

package org.keycloak.connections.jpa;

import java.sql.Connection;

import org.keycloak.provider.ProviderFactory;

/**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
*/
public interface JpaConnectionProviderFactory extends ProviderFactory<JpaConnectionProvider> {

// Caller is responsible for closing connection
Connection getConnection();

String getSchema();

}
Expand Up @@ -17,7 +17,6 @@

package org.keycloak.connections.jpa.updater;

import org.keycloak.models.KeycloakSession;
import org.keycloak.provider.Provider;

import java.sql.Connection;
Expand All @@ -33,7 +32,7 @@ public interface JpaUpdaterProvider extends Provider {

public String getCurrentVersionSql(String defaultSchema);

public void update(KeycloakSession session, Connection connection, String defaultSchema);
public void update(Connection connection, String defaultSchema);

public void validate(Connection connection, String defaultSchema);

Expand Down
Expand Up @@ -20,18 +20,10 @@
import liquibase.Contexts;
import liquibase.Liquibase;
import liquibase.changelog.ChangeSet;
import liquibase.changelog.DatabaseChangeLog;
import liquibase.changelog.RanChangeSet;
import liquibase.database.Database;
import liquibase.database.DatabaseFactory;
import liquibase.database.core.DB2Database;
import liquibase.database.jvm.JdbcConnection;
import liquibase.logging.LogFactory;
import liquibase.logging.LogLevel;
import liquibase.resource.ClassLoaderResourceAccessor;
import liquibase.servicelocator.ServiceLocator;
import org.jboss.logging.Logger;
import org.keycloak.connections.jpa.updater.JpaUpdaterProvider;
import org.keycloak.connections.jpa.updater.liquibase.conn.LiquibaseConnectionProvider;
import org.keycloak.models.KeycloakSession;

import java.sql.Connection;
Expand All @@ -46,16 +38,22 @@ public class LiquibaseJpaUpdaterProvider implements JpaUpdaterProvider {

private static final Logger logger = Logger.getLogger(LiquibaseJpaUpdaterProvider.class);

private static final String CHANGELOG = "META-INF/jpa-changelog-master.xml";
private static final String DB2_CHANGELOG = "META-INF/db2-jpa-changelog-master.xml";
public static final String CHANGELOG = "META-INF/jpa-changelog-master.xml";
public static final String DB2_CHANGELOG = "META-INF/db2-jpa-changelog-master.xml";

private final KeycloakSession session;

public LiquibaseJpaUpdaterProvider(KeycloakSession session) {
this.session = session;
}

@Override
public String getCurrentVersionSql(String defaultSchema) {
return "SELECT ID from " + getTable("DATABASECHANGELOG", defaultSchema) + " ORDER BY DATEEXECUTED DESC LIMIT 1";
}

@Override
public void update(KeycloakSession session, Connection connection, String defaultSchema) {
public void update(Connection connection, String defaultSchema) {
logger.debug("Starting database update");

// Need ThreadLocal as liquibase doesn't seem to have API to inject custom objects into tasks
Expand Down Expand Up @@ -108,145 +106,14 @@ public void validate(Connection connection, String defaultSchema) {
}

private Liquibase getLiquibase(Connection connection, String defaultSchema) throws Exception {
ServiceLocator sl = ServiceLocator.getInstance();

if (!System.getProperties().containsKey("liquibase.scan.packages")) {
if (sl.getPackages().remove("liquibase.core")) {
sl.addPackageToScan("liquibase.core.xml");
}

if (sl.getPackages().remove("liquibase.parser")) {
sl.addPackageToScan("liquibase.parser.core.xml");
}

if (sl.getPackages().remove("liquibase.serializer")) {
sl.addPackageToScan("liquibase.serializer.core.xml");
}

sl.getPackages().remove("liquibase.ext");
sl.getPackages().remove("liquibase.sdk");
}

LogFactory.setInstance(new LogWrapper());

// Adding PostgresPlus support to liquibase
DatabaseFactory.getInstance().register(new PostgresPlusDatabase());

Database database = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(new JdbcConnection(connection));
if (defaultSchema != null) {
database.setDefaultSchemaName(defaultSchema);
}

String changelog = (database instanceof DB2Database) ? DB2_CHANGELOG : CHANGELOG;
logger.debugf("Using changelog file: %s", changelog);
return new Liquibase(changelog, new ClassLoaderResourceAccessor(getClass().getClassLoader()), database);
LiquibaseConnectionProvider liquibaseProvider = session.getProvider(LiquibaseConnectionProvider.class);
return liquibaseProvider.getLiquibase(connection, defaultSchema);
}

@Override
public void close() {
}

private static class LogWrapper extends LogFactory {

private liquibase.logging.Logger logger = new liquibase.logging.Logger() {
@Override
public void setName(String name) {
}

@Override
public void setLogLevel(String level) {
}

@Override
public void setLogLevel(LogLevel level) {
}

@Override
public void setLogLevel(String logLevel, String logFile) {
}

@Override
public void severe(String message) {
LiquibaseJpaUpdaterProvider.logger.error(message);
}

@Override
public void severe(String message, Throwable e) {
LiquibaseJpaUpdaterProvider.logger.error(message, e);
}

@Override
public void warning(String message) {
// Ignore this warning as cascaded drops doesn't work anyway with all DBs, which we need to support
if ("Database does not support drop with cascade".equals(message)) {
LiquibaseJpaUpdaterProvider.logger.debug(message);
} else {
LiquibaseJpaUpdaterProvider.logger.warn(message);
}
}

@Override
public void warning(String message, Throwable e) {
LiquibaseJpaUpdaterProvider.logger.warn(message, e);
}

@Override
public void info(String message) {
LiquibaseJpaUpdaterProvider.logger.debug(message);
}

@Override
public void info(String message, Throwable e) {
LiquibaseJpaUpdaterProvider.logger.debug(message, e);
}

@Override
public void debug(String message) {
LiquibaseJpaUpdaterProvider.logger.trace(message);
}

@Override
public LogLevel getLogLevel() {
if (LiquibaseJpaUpdaterProvider.logger.isTraceEnabled()) {
return LogLevel.DEBUG;
} else if (LiquibaseJpaUpdaterProvider.logger.isDebugEnabled()) {
return LogLevel.INFO;
} else {
return LogLevel.WARNING;
}
}

@Override
public void debug(String message, Throwable e) {
LiquibaseJpaUpdaterProvider.logger.trace(message, e);
}

@Override
public void setChangeLog(DatabaseChangeLog databaseChangeLog) {
}

@Override
public void setChangeSet(ChangeSet changeSet) {
}

@Override
public int getPriority() {
return 0;
}
};

@Override
public liquibase.logging.Logger getLog(String name) {
return logger;
}

@Override
public liquibase.logging.Logger getLog() {
return logger;
}

}

public static String getTable(String table, String defaultSchema) {
return defaultSchema != null ? defaultSchema + "." + table : table;
}
Expand Down
Expand Up @@ -30,7 +30,7 @@ public class LiquibaseJpaUpdaterProviderFactory implements JpaUpdaterProviderFac

@Override
public JpaUpdaterProvider create(KeycloakSession session) {
return new LiquibaseJpaUpdaterProvider();
return new LiquibaseJpaUpdaterProvider(session);
}

@Override
Expand Down

0 comments on commit 8da768a

Please sign in to comment.