Skip to content

Commit

Permalink
GH-3733 Configure TxManager for DefLockRepository (#3782)
Browse files Browse the repository at this point in the history
* GH-3733 Configure TxManager for DefLockRepository

Fixes #3733

The `@Transactional` resolves a primary `TransactionManager` bean
from the application context which might not be sufficient for all
the use-case.

To make it work with the custom (or specific) `TransactionManager`
we have to extend a `DefaultLockRepository` and override all those
`@Transactional` method

* Change the logic of the `DefaultLockRepository` from `@Transactional`
to the `TransactionTemplate` and use provided `TransactionManager`
or resolve one from the application context
* Adjust tests to use explicit `TransactionManager` and call
`afterSingletonsInstantiated()` to initialize a default `TransactionTemplate`
* Mention the change in the docs

* * Extracted all the `TransactionTemplate`s to the properties for caching
* Add `BeanInitializationException` for no-unique `PlatformTransactionManager`
bean in the `afterSingletonsInstantiated()`

* Fix language in the exception message

Co-authored-by: Gary Russell <grussell@vmware.com>

Co-authored-by: Gary Russell <grussell@vmware.com>
  • Loading branch information
artembilan and garyrussell authored Apr 12, 2022
1 parent cd84f16 commit 4b57363
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,22 +21,26 @@

import javax.sql.DataSource;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;

/**
* The default implementation of the {@link LockRepository} based on the
* table from the script presented in the {@code org/springframework/integration/jdbc/schema-*.sql}.
* <p>
* This repository can't be shared between different {@link JdbcLockRegistry} instances.
* Otherwise it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract,
* Otherwise, it opens a possibility to break {@link java.util.concurrent.locks.Lock} contract,
* where {@link JdbcLockRegistry} uses non-shared {@link java.util.concurrent.locks.ReentrantLock}s
* for local synchronizations.
*
Expand All @@ -49,8 +53,8 @@
*
* @since 4.3
*/
@Repository
public class DefaultLockRepository implements LockRepository, InitializingBean {
public class DefaultLockRepository
implements LockRepository, InitializingBean, ApplicationContextAware, SmartInitializingSingleton {

/**
* Default value for the table prefix property.
Expand Down Expand Up @@ -89,12 +93,21 @@ public class DefaultLockRepository implements LockRepository, InitializingBean {

private String renewQuery = "UPDATE %sLOCK SET CREATED_DATE=? WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?";

private ApplicationContext applicationContext;

private PlatformTransactionManager transactionManager;

private TransactionTemplate defaultTransactionTemplate;

private TransactionTemplate readOnlyTransactionTemplate;

private TransactionTemplate serializableTransactionTemplate;

/**
* Constructor that initializes the client id that will be associated for
* all the locks persisted by the store instance to a random {@link UUID}.
* @param dataSource the {@link DataSource} used to maintain the lock repository.
*/
@Autowired
public DefaultLockRepository(DataSource dataSource) {
this(dataSource, UUID.randomUUID().toString());
}
Expand Down Expand Up @@ -124,21 +137,37 @@ public void setRegion(String region) {
}

/**
* Specify the prefix for target data base table used from queries.
* @param prefix the prefix to set (default INT_).
* Specify the prefix for target database table used from queries.
* @param prefix the prefix to set (default {@code INT_}).
*/
public void setPrefix(String prefix) {
this.prefix = prefix;
}

/**
* Specify the time (in milliseconds) to expire dead locks.
* @param timeToLive the time to expire dead locks.
* Specify the time (in milliseconds) to expire dead-locks.
* @param timeToLive the time to expire dead-locks.
*/
public void setTimeToLive(int timeToLive) {
this.ttl = timeToLive;
}

/**
* Set a {@link PlatformTransactionManager} for operations.
* Otherwise, a primary {@link PlatformTransactionManager} bean is obtained
* from the application context.
* @param transactionManager the {@link PlatformTransactionManager} to use.
* @since 6.0
*/
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

@Override
public void afterPropertiesSet() {
this.deleteQuery = String.format(this.deleteQuery, this.prefix);
Expand All @@ -150,50 +179,84 @@ public void afterPropertiesSet() {
this.renewQuery = String.format(this.renewQuery, this.prefix);
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void afterSingletonsInstantiated() {
if (this.transactionManager == null) {
try {
this.transactionManager = this.applicationContext.getBean(PlatformTransactionManager.class);
}
catch (BeansException ex) {
throw new BeanInitializationException(
"A unique or primary 'PlatformTransactionManager' bean " +
"must be present in the application context.", ex);
}
}

DefaultTransactionDefinition transactionDefinition =
new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRES_NEW);

this.defaultTransactionTemplate =
new TransactionTemplate(this.transactionManager, transactionDefinition);

// It is safe to reuse the transactionDefinition - the TransactionTemplate makes copy of its properties.
transactionDefinition.setReadOnly(true);

this.readOnlyTransactionTemplate = new TransactionTemplate(this.transactionManager, transactionDefinition);

transactionDefinition.setReadOnly(false);
transactionDefinition.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);

this.serializableTransactionTemplate = new TransactionTemplate(this.transactionManager, transactionDefinition);
}

@Override
public void close() {
this.template.update(this.deleteAllQuery, this.region, this.id);
this.defaultTransactionTemplate.executeWithoutResult(
transactionStatus -> this.template.update(this.deleteAllQuery, this.region, this.id));
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void delete(String lock) {
this.template.update(this.deleteQuery, this.region, lock, this.id);
this.defaultTransactionTemplate.executeWithoutResult(
transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id));
}

@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.SERIALIZABLE)
@Override
public boolean acquire(String lock) {
if (this.template.update(this.updateQuery, this.id, new Date(), this.region, lock, this.id,
new Date(System.currentTimeMillis() - this.ttl)) > 0) {
return true;
}
try {
return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0;
}
catch (DuplicateKeyException e) {
return false;
}
return this.serializableTransactionTemplate.execute(transactionStatus -> {
if (this.template.update(this.updateQuery, this.id, new Date(), this.region, lock, this.id,
new Date(System.currentTimeMillis() - this.ttl)) > 0) {
return true;
}
try {
return this.template.update(this.insertQuery, this.region, lock, this.id, new Date()) > 0;
}
catch (DuplicateKeyException e) {
return false;
}
});
}

@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
@Override
public boolean isAcquired(String lock) {
return this.template.queryForObject(this.countQuery, Integer.class, // NOSONAR query never returns null
this.region, lock, this.id, new Date(System.currentTimeMillis() - this.ttl)) == 1;
return this.readOnlyTransactionTemplate.execute(transactionStatus ->
this.template.queryForObject(this.countQuery, // NOSONAR query never returns null
Integer.class, this.region, lock, this.id, new Date(System.currentTimeMillis() - this.ttl))
== 1);
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void deleteExpired() {
this.template.update(this.deleteExpiredQuery, this.region, new Date(System.currentTimeMillis() - this.ttl));
this.defaultTransactionTemplate.executeWithoutResult(
transactionStatus ->
this.template.update(this.deleteExpiredQuery, this.region,
new Date(System.currentTimeMillis() - this.ttl)));
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public boolean renew(String lock) {
return this.template.update(this.renewQuery, new Date(), this.region, lock, this.id) > 0;
return this.defaultTransactionTemplate.execute(
transactionStatus -> this.template.update(this.renewQuery, new Date(), this.region, lock, this.id) > 0);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,17 +23,18 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import org.springframework.integration.jdbc.lock.DefaultLockRepository;
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
import org.springframework.integration.leader.Context;
import org.springframework.integration.leader.DefaultCandidate;
import org.springframework.integration.leader.event.LeaderEventPublisher;
import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
Expand All @@ -49,7 +50,7 @@ public class JdbcLockRegistryLeaderInitiatorTests {

public static EmbeddedDatabase dataSource;

@BeforeClass
@BeforeAll
public static void init() {
dataSource = new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
Expand All @@ -58,7 +59,7 @@ public static void init() {
.build();
}

@AfterClass
@AfterAll
public static void destroy() {
dataSource.shutdown();
}
Expand All @@ -70,7 +71,9 @@ public void testDistributedLeaderElection() throws Exception {
List<LockRegistryLeaderInitiator> initiators = new ArrayList<>();
for (int i = 0; i < 2; i++) {
DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
lockRepository.setTransactionManager(new DataSourceTransactionManager(dataSource));
lockRepository.afterPropertiesSet();
lockRepository.afterSingletonsInstantiated();
LockRegistryLeaderInitiator initiator = new LockRegistryLeaderInitiator(
new JdbcLockRegistry(lockRepository),
new DefaultCandidate("foo", "bar"));
Expand Down Expand Up @@ -170,7 +173,7 @@ public void publishOnRevoked(Object source, Context context, String role) {
}

@Test
@Ignore("Looks like an embedded DBd is not fully cleared if we don't close application context")
@Disabled("Looks like an embedded DBd is not fully cleared if we don't close application context")
public void testLostConnection() throws InterruptedException {
CountDownLatch granted = new CountDownLatch(1);
CountingPublisher countingPublisher = new CountingPublisher(granted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void clear() {
this.registry.expireUnusedOlderThan(0);
this.client.close();
this.child = new AnnotationConfigApplicationContext();
this.child.register(DefaultLockRepository.class);
this.child.registerBean("childLockRepository", DefaultLockRepository.class, this.dataSource);
this.child.setParent(this.context);
this.child.refresh();
}
Expand Down Expand Up @@ -195,7 +195,9 @@ public void testOnlyOneLock() throws Exception {
for (int j = 0; j < 20; j++) {
Callable<Boolean> task = () -> {
DefaultLockRepository client = new DefaultLockRepository(this.dataSource);
client.setApplicationContext(this.context);
client.afterPropertiesSet();
client.afterSingletonsInstantiated();
Lock lock = new JdbcLockRegistry(client).obtain("foo");
try {
if (locked.isEmpty() && lock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS)) {
Expand Down Expand Up @@ -231,9 +233,13 @@ public void testOnlyOneLock() throws Exception {
@Test
public void testExclusiveAccess() throws Exception {
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
client1.setApplicationContext(this.context);
client1.afterPropertiesSet();
final DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
client1.afterSingletonsInstantiated();
DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
client2.setApplicationContext(this.context);
client2.afterPropertiesSet();
client2.afterSingletonsInstantiated();
Lock lock1 = new JdbcLockRegistry(client1).obtain("foo");
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
final CountDownLatch latch1 = new CountDownLatch(1);
Expand Down Expand Up @@ -278,10 +284,14 @@ public void testExclusiveAccess() throws Exception {
public void testOutOfDateLockTaken() throws Exception {
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
client1.setTimeToLive(100);
client1.setApplicationContext(this.context);
client1.afterPropertiesSet();
client1.afterSingletonsInstantiated();
DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
client2.setTimeToLive(100);
client2.setApplicationContext(this.context);
client2.afterPropertiesSet();
client2.afterSingletonsInstantiated();
Lock lock1 = new JdbcLockRegistry(client1).obtain("foo");
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -316,10 +326,14 @@ public void testOutOfDateLockTaken() throws Exception {
public void testRenewLock() throws Exception {
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
client1.setTimeToLive(500);
client1.setApplicationContext(this.context);
client1.afterPropertiesSet();
client1.afterSingletonsInstantiated();
DefaultLockRepository client2 = new DefaultLockRepository(dataSource);
client2.setTimeToLive(500);
client2.setApplicationContext(this.context);
client2.afterPropertiesSet();
client2.afterSingletonsInstantiated();
JdbcLockRegistry registry = new JdbcLockRegistry(client1);
Lock lock1 = registry.obtain("foo");
final BlockingQueue<Integer> data = new LinkedBlockingQueue<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.integration.test.util.TestUtils;
Expand Down Expand Up @@ -67,6 +68,9 @@ public class JdbcLockRegistryTests {
@Autowired
private DataSource dataSource;

@Autowired
private ApplicationContext context;

@BeforeEach
public void clear() {
this.registry.expireUnusedOlderThan(0);
Expand Down Expand Up @@ -143,7 +147,9 @@ public void testReentrantLockInterruptibly() throws Exception {
public void testReentrantLockAfterExpiration() throws Exception {
DefaultLockRepository client = new DefaultLockRepository(dataSource);
client.setTimeToLive(1);
client.setApplicationContext(this.context);
client.afterPropertiesSet();
client.afterSingletonsInstantiated();
JdbcLockRegistry registry = new JdbcLockRegistry(client);
Lock lock1 = registry.obtain("foo");
assertThat(lock1.tryLock()).isTrue();
Expand Down
2 changes: 2 additions & 0 deletions src/reference/asciidoc/jdbc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,8 @@ NOTE: The lock renewal can be done only if the lock is held by the current threa
String with version 5.5.6, the `JdbcLockRegistry` is support automatically clean up cache for JdbcLock in `JdbcLockRegistry.locks` via `JdbcLockRegistry.setCacheCapacity()`.
See its JavaDocs for more information.

String with version 6.0, the `DefaultLockRepository` can be supplied with a `PlatformTransactionManager` instead of relying on the primary bean from the application context.

[[jdbc-metadata-store]]
=== JDBC Metadata Store

Expand Down
Loading

0 comments on commit 4b57363

Please sign in to comment.