Skip to content

Commit

Permalink
KEYCLOAK-2614 Refactor database lock to use 'SELECT FOR UPDATE' pessi…
Browse files Browse the repository at this point in the history
…mistic locking
  • Loading branch information
mposolda committed Apr 8, 2016
1 parent ed97a9b commit 90fc721
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 110 deletions.
Expand Up @@ -39,6 +39,7 @@
import org.keycloak.connections.jpa.updater.liquibase.LiquibaseJpaUpdaterProvider;
import org.keycloak.connections.jpa.updater.liquibase.PostgresPlusDatabase;
import org.keycloak.connections.jpa.updater.liquibase.lock.CustomInsertLockRecordGenerator;
import org.keycloak.connections.jpa.updater.liquibase.lock.CustomLockDatabaseChangeLogGenerator;
import org.keycloak.connections.jpa.updater.liquibase.lock.CustomLockService;
import org.keycloak.connections.jpa.updater.liquibase.lock.DummyLockService;
import org.keycloak.models.KeycloakSession;
Expand Down Expand Up @@ -93,6 +94,9 @@ protected void baseLiquibaseInitialization() {

// Change command for creating lock and drop DELETE lock record from it
SqlGeneratorFactory.getInstance().register(new CustomInsertLockRecordGenerator());

// Use "SELECT FOR UPDATE" for locking database
SqlGeneratorFactory.getInstance().register(new CustomLockDatabaseChangeLogGenerator());
}


Expand Down
@@ -0,0 +1,86 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.keycloak.connections.jpa.updater.liquibase.lock;

import liquibase.database.Database;
import liquibase.database.core.DB2Database;
import liquibase.database.core.H2Database;
import liquibase.database.core.MSSQLDatabase;
import liquibase.database.core.MySQLDatabase;
import liquibase.database.core.OracleDatabase;
import liquibase.database.core.PostgresDatabase;
import liquibase.sql.Sql;
import liquibase.sql.UnparsedSql;
import liquibase.sqlgenerator.SqlGeneratorChain;
import liquibase.sqlgenerator.core.LockDatabaseChangeLogGenerator;
import liquibase.statement.core.LockDatabaseChangeLogStatement;
import org.jboss.logging.Logger;

/**
* We use "SELECT FOR UPDATE" pessimistic locking (Same algorithm like Hibernate LockMode.PESSIMISTIC_WRITE )
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class CustomLockDatabaseChangeLogGenerator extends LockDatabaseChangeLogGenerator {

private static final Logger logger = Logger.getLogger(CustomLockDatabaseChangeLogGenerator.class);

@Override
public int getPriority() {
return super.getPriority() + 1; // Ensure bigger priority than LockDatabaseChangeLogGenerator
}

@Override
public Sql[] generateSql(LockDatabaseChangeLogStatement statement, Database database, SqlGeneratorChain sqlGeneratorChain) {

Sql selectForUpdateSql = generateSelectForUpdate(database);

return new Sql[] { selectForUpdateSql };
}


private Sql generateSelectForUpdate(Database database) {
String catalog = database.getLiquibaseCatalogName();
String schema = database.getLiquibaseSchemaName();
String rawLockTableName = database.getDatabaseChangeLogLockTableName();

String lockTableName = database.escapeTableName(catalog, schema, rawLockTableName);
String idColumnName = database.escapeColumnName(catalog, schema, rawLockTableName, "ID");

String sqlBase = "SELECT " + idColumnName + " FROM " + lockTableName;
String sqlWhere = " WHERE " + idColumnName + "=1";

String sql;
if (database instanceof MySQLDatabase || database instanceof PostgresDatabase || database instanceof H2Database ||
database instanceof OracleDatabase) {
sql = sqlBase + sqlWhere + " FOR UPDATE";
} else if (database instanceof MSSQLDatabase) {
sql = sqlBase + " WITH (UPDLOCK, ROWLOCK)" + sqlWhere;
} else if (database instanceof DB2Database) {
sql = sqlBase + sqlWhere + " FOR READ ONLY WITH RS USE AND KEEP UPDATE LOCKS";
} else {
sql = sqlBase + sqlWhere;
logger.warnf("No direct support for database %s . Database lock may not work correctly", database.getClass().getName());
}

logger.debugf("SQL command for pessimistic lock: %s", sql);

return new UnparsedSql(sql);
}

}
Expand Up @@ -18,25 +18,16 @@
package org.keycloak.connections.jpa.updater.liquibase.lock;

import java.lang.reflect.Field;
import java.text.DateFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;

import liquibase.database.Database;
import liquibase.database.core.DerbyDatabase;
import liquibase.exception.DatabaseException;
import liquibase.exception.LockException;
import liquibase.executor.Executor;
import liquibase.executor.ExecutorService;
import liquibase.lockservice.DatabaseChangeLogLock;
import liquibase.lockservice.StandardLockService;
import liquibase.logging.LogFactory;
import liquibase.sql.visitor.AbstractSqlVisitor;
import liquibase.sql.visitor.SqlVisitor;
import liquibase.statement.core.CreateDatabaseChangeLogLockTableStatement;
import liquibase.statement.core.DropTableStatement;
import liquibase.statement.core.InitializeDatabaseChangeLogLockTableStatement;
import liquibase.statement.core.LockDatabaseChangeLogStatement;
import liquibase.statement.core.RawSqlStatement;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Time;
Expand All @@ -51,24 +42,6 @@ public class CustomLockService extends StandardLockService {

private static final Logger log = Logger.getLogger(CustomLockService.class);

private long changeLogLocRecheckTimeMillis = -1;

@Override
public void setChangeLogLockRecheckTime(long changeLogLocRecheckTime) {
super.setChangeLogLockRecheckTime(changeLogLocRecheckTime);
this.changeLogLocRecheckTimeMillis = changeLogLocRecheckTime;
}

// Bug in StandardLockService.getChangeLogLockRecheckTime()
@Override
public Long getChangeLogLockRecheckTime() {
if (changeLogLocRecheckTimeMillis == -1) {
return super.getChangeLogLockRecheckTime();
} else {
return changeLogLocRecheckTimeMillis;
}
}

@Override
public void init() throws DatabaseException {
boolean createdTable = false;
Expand All @@ -84,8 +57,8 @@ public void init() throws DatabaseException {
database.commit();
} catch (DatabaseException de) {
log.warn("Failed to create lock table. Maybe other transaction created in the meantime. Retrying...");
if (log.isDebugEnabled()) {
log.debug(de.getMessage(), de); //Log details at debug level
if (log.isTraceEnabled()) {
log.trace(de.getMessage(), de); //Log details at trace level
}
database.rollback();
throw new LockRetryException(de);
Expand Down Expand Up @@ -115,8 +88,8 @@ public void init() throws DatabaseException {

} catch (DatabaseException de) {
log.warn("Failed to insert first record to the lock table. Maybe other transaction inserted in the meantime. Retrying...");
if (log.isDebugEnabled()) {
log.debug(de.getMessage(), de); // Log details at debug level
if (log.isTraceEnabled()) {
log.trace(de.getMessage(), de); // Log details at trace level
}
database.rollback();
throw new LockRetryException(de);
Expand All @@ -140,34 +113,88 @@ public void init() throws DatabaseException {
}

@Override
public void waitForLock() throws LockException {
public void waitForLock() {
boolean locked = false;
long startTime = Time.toMillis(Time.currentTime());
long timeToGiveUp = startTime + (getChangeLogLockWaitTime());
boolean nextAttempt = true;

while (!locked && Time.toMillis(Time.currentTime()) < timeToGiveUp) {
while (nextAttempt) {
locked = acquireLock();
if (!locked) {
int remainingTime = ((int)(timeToGiveUp / 1000)) - Time.currentTime();
log.debugf("Waiting for changelog lock... Remaining time: %d seconds", remainingTime);
try {
Thread.sleep(getChangeLogLockRecheckTime());
} catch (InterruptedException e) {
e.printStackTrace();
if (remainingTime > 0) {
log.debugf("Will try to acquire log another time. Remaining time: %d seconds", remainingTime);
} else {
nextAttempt = false;
}
} else {
nextAttempt = false;
}
}

if (!locked) {
DatabaseChangeLogLock[] locks = listLocks();
String lockedBy;
if (locks.length > 0) {
DatabaseChangeLogLock lock = locks[0];
lockedBy = lock.getLockedBy() + " since " + DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.SHORT).format(lock.getLockGranted());
int timeout = ((int)(getChangeLogLockWaitTime() / 1000));
throw new IllegalStateException("Could not acquire change log lock within specified timeout " + timeout + " seconds. Currently locked by other transaction");
}
}

@Override
public boolean acquireLock() {
if (hasChangeLogLock) {
// We already have a lock
return true;
}

Executor executor = ExecutorService.getInstance().getExecutor(database);

try {
database.rollback();

// Ensure table created and lock record inserted
this.init();
} catch (DatabaseException de) {
throw new IllegalStateException("Failed to retrieve lock", de);
}

try {
log.debug("Trying to lock database");
executor.execute(new LockDatabaseChangeLogStatement());
log.debug("Successfully acquired database lock");

hasChangeLogLock = true;
database.setCanCacheLiquibaseTableInfo(true);
return true;

} catch (DatabaseException de) {
log.warn("Lock didn't yet acquired. Will possibly retry to acquire lock. Details: " + de.getMessage());
if (log.isTraceEnabled()) {
log.debug(de.getMessage(), de);
}
return false;
}
}


@Override
public void releaseLock() {
try {
if (hasChangeLogLock) {
log.debug("Going to release database lock");
database.commit();
} else {
lockedBy = "UNKNOWN";
log.warn("Attempt to release lock, which is not owned by current transaction");
}
} catch (Exception e) {
log.error("Database error during release lock", e);
} finally {
try {
hasChangeLogLock = false;
database.setCanCacheLiquibaseTableInfo(false);
database.rollback();
} catch (DatabaseException e) {
;
}
throw new LockException("Could not acquire change log lock. Currently locked by " + lockedBy);
}
}

Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.keycloak.connections.jpa.updater.liquibase.lock;

import liquibase.exception.DatabaseException;
import liquibase.exception.LockException;
import liquibase.lockservice.StandardLockService;

Expand All @@ -27,6 +28,15 @@
*/
public class DummyLockService extends StandardLockService {

@Override
public int getPriority() {
return Integer.MAX_VALUE;
}

@Override
public void init() throws DatabaseException {
}

@Override
public void waitForLock() throws LockException {
}
Expand Down
Expand Up @@ -46,7 +46,7 @@ public class LiquibaseDBLockProvider implements DBLockProvider {
private final LiquibaseDBLockProviderFactory factory;
private final KeycloakSession session;

private LockService lockService;
private CustomLockService lockService;
private Connection dbConnection;

private int maxAttempts = DEFAULT_MAX_ATTEMPTS;
Expand All @@ -69,7 +69,6 @@ private void init() {

this.lockService = new CustomLockService();
lockService.setChangeLogLockWaitTime(factory.getLockWaitTimeoutMillis());
lockService.setChangeLogLockRecheckTime(factory.getLockRecheckTimeMillis());
lockService.setDatabase(liquibase.getDatabase());
} catch (LiquibaseException exception) {
safeRollbackConnection();
Expand All @@ -94,31 +93,32 @@ public void waitForLock() {
lockService.waitForLock();
this.maxAttempts = DEFAULT_MAX_ATTEMPTS;
return;
} catch (LockException le) {
if (le.getCause() != null && le.getCause() instanceof LockRetryException) {
// Indicates we should try to acquire lock again in different transaction
restart();
maxAttempts--;
} else {
throw new IllegalStateException("Failed to retrieve lock", le);

// TODO: Possibility to forcefully retrieve lock after timeout instead of just give-up?
}
} catch (LockRetryException le) {
// Indicates we should try to acquire lock again in different transaction
safeRollbackConnection();
restart();
maxAttempts--;
} catch (RuntimeException re) {
safeRollbackConnection();
safeCloseConnection();
throw re;
}
}
}


@Override
public void releaseLock() {
try {
lockService.releaseLock();
} catch (LockException e) {
logger.error("Could not release lock", e);
}
lockService.releaseLock();
lockService.reset();
}

@Override
public boolean supportsForcedUnlock() {
// Implementation based on "SELECT FOR UPDATE" can't force unlock as it's locked by other transaction
return false;
}

@Override
public void destroyLockInfo() {
try {
Expand Down

0 comments on commit 90fc721

Please sign in to comment.