Skip to content

Commit

Permalink
DATAGEODE-153 - Add test asserting 2 concurrent threads entering a tr…
Browse files Browse the repository at this point in the history
…ansaction on the same entry leads to the proper outcome.
  • Loading branch information
jxblum committed Oct 25, 2018
1 parent 6534c0b commit ba98ab9
Show file tree
Hide file tree
Showing 2 changed files with 280 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.data.gemfire.transaction;

import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.data.gemfire.util.RuntimeExceptionFactory.newIllegalStateException;

import java.io.Serializable;
import java.util.function.Function;

import javax.transaction.Transactional;

import edu.umd.cs.mtc.MultithreadedTestCase;
import edu.umd.cs.mtc.TestFramework;

import org.apache.geode.cache.CacheTransactionManager;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.data.annotation.Id;
import org.springframework.data.gemfire.config.annotation.ClientCacheApplication;
import org.springframework.data.gemfire.config.annotation.EnableEntityDefinedRegions;
import org.springframework.data.gemfire.mapping.GemfireMappingContext;
import org.springframework.data.gemfire.mapping.annotation.Region;
import org.springframework.data.gemfire.repository.support.GemfireRepositoryFactoryBean;
import org.springframework.data.gemfire.transaction.config.EnableGemfireCacheTransactions;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Service;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.Assert;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

/**
* Integration tests asserting the proper configuration and behavior of Apache Geode/Pivotal GemFire
* cache Transactions inside a Spring application context when using SDG to configure
* the {@link CacheTransactionManager}.
*
* Specifically, this test asserts that 2 concurrent threads modifying the same entity inside a cache transaction
* leads to a {@link CommitConflictException}.
*
* @author John Blum
* @see java.util.function.Function
* @see edu.umd.cs.mtc.MultithreadedTestCase
* @see edu.umd.cs.mtc.TestFramework
* @see org.junit.Test
* @see org.apache.geode.cache.CacheTransactionManager
* @see org.apache.geode.cache.CommitConflictException
* @see org.springframework.data.gemfire.config.annotation.ClientCacheApplication
* @see org.springframework.data.gemfire.config.annotation.EnableEntityDefinedRegions
* @see org.springframework.data.gemfire.repository.config.EnableGemfireRepositories
* @see org.springframework.data.gemfire.transaction.config.EnableGemfireCacheTransactions
* @see org.springframework.test.context.ContextConfiguration
* @see org.springframework.test.context.junit4.SpringRunner
* @since 2.2.0
*/
@RunWith(SpringRunner.class)
@ContextConfiguration
@SuppressWarnings("unused")
public class CommitConflictExceptionTransactionalIntegrationTests {

@Autowired
private CustomerService customerService;

@Test
public void concurrentTransactionalThreadsCauseCommitConflictException() throws Throwable {
TestFramework.runOnce(new TransactionalCommitConflictMultithreadedTestCase(this.customerService));
}

static class TransactionalCommitConflictMultithreadedTestCase extends MultithreadedTestCase {

private final CustomerService customerService;

TransactionalCommitConflictMultithreadedTestCase(CustomerService customerService) {

Assert.notNull(customerService, "CustomerService is required");

this.customerService = customerService;
}

@Override
public void initialize() {

super.initialize();

Customer jonDoe = this.customerService.save(Customer.newCustomer(1L, "Jon Doe"));
Customer jonDoeLoaded = this.customerService.findById(jonDoe.getId());

assertThat(jonDoeLoaded).isEqualTo(jonDoe);
}

public void thread1() {

assertTick(0);

Thread.currentThread().setName("Customer Processing Thread One");

this.customerService.process(1L, customer -> {

assertThat(customer.getId()).isEqualTo(1L);
assertThat(customer.getName()).isEqualTo("Jon Doe");

customer.setName("Pie Doe");

waitForTick(2);
assertTick(2);

return customer;

}, Function.identity());
}

public void thread2() {

assertTick(0);

Thread.currentThread().setName("Customer Processing Thread Two");

waitForTick(1);
assertTick(1);

try {

this.customerService.process(1L, customer -> {

assertThat(customer.getId()).isEqualTo(1L);
assertThat(customer.getName()).isEqualTo("Jon Doe");

customer.setName("Sour Doe");

waitForTick(3);
assertTick(3);

return customer;

}, Function.identity());

fail("Expected CommitConflictException!");

}
catch (RuntimeException expected) {
assertThat(expected).isInstanceOf(GemfireTransactionCommitException.class);
assertThat(expected).hasCauseInstanceOf(CommitConflictException.class);
}
}

@Override
public void finish() {

Customer customer = this.customerService.findById(1L);

assertThat(customer).isNotNull();
assertThat(customer.getId()).isEqualTo(1L);
assertThat(customer.getName()).isEqualTo("Pie Doe");
}
}

@ClientCacheApplication
@EnableEntityDefinedRegions(
basePackageClasses = Customer.class,
clientRegionShortcut = ClientRegionShortcut.LOCAL
)
@EnableGemfireCacheTransactions
static class TestConfiguration {

@Bean
GemfireRepositoryFactoryBean<CustomerRepository, Customer, Long> customerRepositoryFactoryBean() {

GemfireRepositoryFactoryBean<CustomerRepository, Customer, Long> customerRepositoryFactoryBean
= new GemfireRepositoryFactoryBean<>(CustomerRepository.class);

customerRepositoryFactoryBean.setGemfireMappingContext(new GemfireMappingContext());

return customerRepositoryFactoryBean;
}

@Bean
CustomerService customerService(CustomerRepository customerRepository) {
return new CustomerService(customerRepository);
}
}

@Data
@EqualsAndHashCode
@Region("Customers")
@RequiredArgsConstructor(staticName = "newCustomer")
static class Customer implements Serializable {

@NonNull @Id
private Long id;

@NonNull
private String name;

}

public interface CustomerRepository extends CrudRepository<Customer, Long> { }

@Service
public static class CustomerService {

private final CustomerRepository customerRepository;

public CustomerService(CustomerRepository customerRepository) {

Assert.notNull(customerRepository, "CustomerRepository is required");

this.customerRepository = customerRepository;
}

protected Customer findById(Long id) {

Assert.notNull(id, "ID is required");

return this.customerRepository.findById(id)
.orElseThrow(() -> newIllegalStateException("No Customer with ID [%d] was found", id));
}

@Transactional
public Customer process(Long id, Function<Customer, Customer> beforeSave,
Function<Customer, Customer> afterSave) {

return afterSave.apply(save(beforeSave.apply(findById(id))));
}

protected Customer save(Customer customer) {

Assert.notNull(customer, "Customer is required");

return this.customerRepository.save(customer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@
*/
@RunWith(SpringRunner.class)
@ContextConfiguration
@SuppressWarnings("unused")
public class GemfireTransactionManagerIntegrationTests {

private static final String GEMFIRE_LOG_LEVEL = "warning";
private static final String GEMFIRE_LOG_LEVEL = "error";

@Resource(name = "Example")
@SuppressWarnings("unused")
private Region<Object, Object> example;

@Autowired
Expand All @@ -67,27 +67,32 @@ public class GemfireTransactionManagerIntegrationTests {

@Test(expected = IllegalArgumentException.class)
public void suspendAndResumeIsSuccessful() {

try {
assertThat(example).isEmpty();

service.doCacheTransactions();
assertThat(this.example).isEmpty();

this.service.doCacheTransactions();
}
catch (IllegalArgumentException e) {
assertThat(e).hasMessage("boom!");
assertThat(e).hasNoCause();
catch (IllegalArgumentException expected) {

assertThat(expected).hasMessage("BOOM!");
assertThat(expected).hasNoCause();

throw e;
throw expected;
}
finally {
assertThat(example).hasSize(1);
assertThat(example.containsKey("tx-1-op-1")).isFalse();
assertThat(example.containsKey("tx-2-op-1")).isTrue();
assertThat(this.example).hasSize(1);
assertThat(this.example.containsKey("tx-1-op-1")).isFalse();
assertThat(this.example.containsKey("tx-2-op-1")).isTrue();
}
}

@SuppressWarnings("unused")
@PeerCacheApplication(
name = "GemfireTransactionManagerIntegrationTests",
logLevel = GEMFIRE_LOG_LEVEL
)
@EnableGemfireCacheTransactions
@PeerCacheApplication(name = "GemfireTransactionManagerIntegrationTests", logLevel = GEMFIRE_LOG_LEVEL)
static class TestConfiguration {

@Bean(name = "Example")
Expand All @@ -104,14 +109,14 @@ LocalRegionFactoryBean<Object, Object> exampleRegion(GemFireCache gemfireCache)

@Bean
SuspendAndResumeCacheTransactionsRepository suspendAndResumeCacheTransactionsRepository(
GemFireCache gemFireCache) {
GemFireCache gemFireCache) {

return new SuspendAndResumeCacheTransactionsRepository(gemFireCache.getRegion("Example"));
}

@Bean
SuspendAndResumeCacheTransactionsService suspendAndResumeCacheTransactionsService(
SuspendAndResumeCacheTransactionsRepository repository) {
SuspendAndResumeCacheTransactionsRepository repository) {

return new SuspendAndResumeCacheTransactionsService(repository);
}
Expand All @@ -129,9 +134,9 @@ static class SuspendAndResumeCacheTransactionsService {

@Transactional
public void doCacheTransactions() {
repository.doOperationOneInTransactionOne();
repository.doOperationOneInTransactionTwo();
repository.doOperationTwoInTransactionOne();
this.repository.doOperationOneInTransactionOne();
this.repository.doOperationOneInTransactionTwo();
this.repository.doOperationTwoInTransactionOne();
}
}

Expand All @@ -148,17 +153,17 @@ static class SuspendAndResumeCacheTransactionsRepository {

@Transactional(propagation = Propagation.REQUIRED)
public void doOperationOneInTransactionOne() {
example.put("tx-1-op-1", "test");
this.example.put("tx-1-op-1", "test");
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void doOperationOneInTransactionTwo() {
example.put("tx-2-op-1", "test");
this.example.put("tx-2-op-1", "test");
}

@Transactional(propagation = Propagation.REQUIRED)
public void doOperationTwoInTransactionOne() {
throw new IllegalArgumentException("boom!");
throw new IllegalArgumentException("BOOM!");
}
}
}

0 comments on commit ba98ab9

Please sign in to comment.