From 9d7154901ff56153b5f7d2b2a4d499963353478b Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Sun, 6 Aug 2023 14:25:39 +0200 Subject: [PATCH] Polishing (cherry picked from commit 6e5af9dccb453dd9adfe997c219eef31fd7ab903) --- ...ansactionalAnnotationIntegrationTests.java | 12 +-- .../adapter/ThrowsAdviceInterceptorTests.java | 14 +-- .../scheduling/quartz/QuartzSupportTests.java | 26 ++--- .../ClassPathBeanDefinitionScanner.java | 18 ++-- .../ClassPathBeanDefinitionScannerTests.java | 2 + .../core/ReactiveAdapterRegistry.java | 23 ++-- .../core/ReactiveTypeDescriptor.java | 14 +-- .../core/ReactiveAdapterRegistryTests.java | 26 ++--- .../R2dbcTransactionManagerUnitTests.java | 100 +++++++++--------- .../PlatformTransactionManager.java | 3 +- .../transaction/ReactiveTransaction.java | 5 +- .../transaction/TransactionStatus.java | 5 +- .../support/SmartTransactionObject.java | 8 +- 13 files changed, 125 insertions(+), 131 deletions(-) diff --git a/integration-tests/src/test/java/org/springframework/scheduling/annotation/ScheduledAndTransactionalAnnotationIntegrationTests.java b/integration-tests/src/test/java/org/springframework/scheduling/annotation/ScheduledAndTransactionalAnnotationIntegrationTests.java index 9062d6e3f615..be62d81ee013 100644 --- a/integration-tests/src/test/java/org/springframework/scheduling/annotation/ScheduledAndTransactionalAnnotationIntegrationTests.java +++ b/integration-tests/src/test/java/org/springframework/scheduling/annotation/ScheduledAndTransactionalAnnotationIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -60,8 +60,8 @@ void failsWhenJdkProxyAndScheduledMethodNotPresentOnInterface() { AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(); ctx.register(Config.class, JdkProxyTxConfig.class, RepoConfigA.class); assertThatExceptionOfType(BeanCreationException.class) - .isThrownBy(ctx::refresh) - .withCauseInstanceOf(IllegalStateException.class); + .isThrownBy(ctx::refresh) + .withCauseInstanceOf(IllegalStateException.class); } @Test @@ -70,7 +70,7 @@ void succeedsWhenSubclassProxyAndScheduledMethodNotPresentOnInterface() throws I ctx.register(Config.class, SubclassProxyTxConfig.class, RepoConfigA.class); ctx.refresh(); - Thread.sleep(100); // allow @Scheduled method to be called several times + Thread.sleep(200); // allow @Scheduled method to be called several times MyRepository repository = ctx.getBean(MyRepository.class); CallCountingTransactionManager txManager = ctx.getBean(CallCountingTransactionManager.class); @@ -85,7 +85,7 @@ void succeedsWhenJdkProxyAndScheduledMethodIsPresentOnInterface() throws Interru ctx.register(Config.class, JdkProxyTxConfig.class, RepoConfigB.class); ctx.refresh(); - Thread.sleep(100); // allow @Scheduled method to be called several times + Thread.sleep(200); // allow @Scheduled method to be called several times MyRepositoryWithScheduledMethod repository = ctx.getBean(MyRepositoryWithScheduledMethod.class); CallCountingTransactionManager txManager = ctx.getBean(CallCountingTransactionManager.class); @@ -100,7 +100,7 @@ void withAspectConfig() throws InterruptedException { ctx.register(AspectConfig.class, MyRepositoryWithScheduledMethodImpl.class); ctx.refresh(); - Thread.sleep(100); // allow @Scheduled method to be called several times + Thread.sleep(200); // allow @Scheduled method to be called several times MyRepositoryWithScheduledMethod repository = ctx.getBean(MyRepositoryWithScheduledMethod.class); assertThat(AopUtils.isCglibProxy(repository)).isTrue(); diff --git a/spring-aop/src/test/java/org/springframework/aop/framework/adapter/ThrowsAdviceInterceptorTests.java b/spring-aop/src/test/java/org/springframework/aop/framework/adapter/ThrowsAdviceInterceptorTests.java index 40b038b457ef..d1bfc13af5d5 100644 --- a/spring-aop/src/test/java/org/springframework/aop/framework/adapter/ThrowsAdviceInterceptorTests.java +++ b/spring-aop/src/test/java/org/springframework/aop/framework/adapter/ThrowsAdviceInterceptorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -77,9 +77,7 @@ public void testCorrectHandlerUsed() throws Throwable { given(mi.getMethod()).willReturn(Object.class.getMethod("hashCode")); given(mi.getThis()).willReturn(new Object()); given(mi.proceed()).willThrow(ex); - assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(() -> - ti.invoke(mi)) - .isSameAs(ex); + assertThatExceptionOfType(FileNotFoundException.class).isThrownBy(() -> ti.invoke(mi)).isSameAs(ex); assertThat(th.getCalls()).isEqualTo(1); assertThat(th.getCalls("ioException")).isEqualTo(1); } @@ -92,9 +90,7 @@ public void testCorrectHandlerUsedForSubclass() throws Throwable { ConnectException ex = new ConnectException(""); MethodInvocation mi = mock(MethodInvocation.class); given(mi.proceed()).willThrow(ex); - assertThatExceptionOfType(ConnectException.class).isThrownBy(() -> - ti.invoke(mi)) - .isSameAs(ex); + assertThatExceptionOfType(ConnectException.class).isThrownBy(() -> ti.invoke(mi)).isSameAs(ex); assertThat(th.getCalls()).isEqualTo(1); assertThat(th.getCalls("remoteException")).isEqualTo(1); } @@ -117,9 +113,7 @@ public void afterThrowing(RemoteException ex) throws Throwable { ConnectException ex = new ConnectException(""); MethodInvocation mi = mock(MethodInvocation.class); given(mi.proceed()).willThrow(ex); - assertThatExceptionOfType(Throwable.class).isThrownBy(() -> - ti.invoke(mi)) - .isSameAs(t); + assertThatExceptionOfType(Throwable.class).isThrownBy(() -> ti.invoke(mi)).isSameAs(t); assertThat(th.getCalls()).isEqualTo(1); assertThat(th.getCalls("remoteException")).isEqualTo(1); } diff --git a/spring-context-support/src/test/java/org/springframework/scheduling/quartz/QuartzSupportTests.java b/spring-context-support/src/test/java/org/springframework/scheduling/quartz/QuartzSupportTests.java index 9d461d2e400f..597633475112 100644 --- a/spring-context-support/src/test/java/org/springframework/scheduling/quartz/QuartzSupportTests.java +++ b/spring-context-support/src/test/java/org/springframework/scheduling/quartz/QuartzSupportTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -114,7 +114,7 @@ void schedulerWithTaskExecutor() throws Exception { trigger.setName("myTrigger"); trigger.setJobDetail(jobDetail); trigger.setStartDelay(1); - trigger.setRepeatInterval(500); + trigger.setRepeatInterval(100); trigger.setRepeatCount(1); trigger.afterPropertiesSet(); @@ -126,14 +126,14 @@ void schedulerWithTaskExecutor() throws Exception { bean.start(); Thread.sleep(500); - assertThat(DummyJob.count > 0).as("DummyJob should have been executed at least once.").isTrue(); + assertThat(DummyJob.count).as("DummyJob should have been executed at least once.").isGreaterThan(0); assertThat(taskExecutor.count).isEqualTo(DummyJob.count); bean.destroy(); } @Test - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) void jobDetailWithRunnableInsteadOfJob() { JobDetailImpl jobDetail = new JobDetailImpl(); assertThatIllegalArgumentException().isThrownBy(() -> @@ -156,7 +156,7 @@ void schedulerWithQuartzJobBean() throws Exception { trigger.setName("myTrigger"); trigger.setJobDetail(jobDetail); trigger.setStartDelay(1); - trigger.setRepeatInterval(500); + trigger.setRepeatInterval(100); trigger.setRepeatCount(1); trigger.afterPropertiesSet(); @@ -168,7 +168,7 @@ void schedulerWithQuartzJobBean() throws Exception { Thread.sleep(500); assertThat(DummyJobBean.param).isEqualTo(10); - assertThat(DummyJobBean.count > 0).isTrue(); + assertThat(DummyJobBean.count).isGreaterThan(0); bean.destroy(); } @@ -190,7 +190,7 @@ void schedulerWithSpringBeanJobFactory() throws Exception { trigger.setName("myTrigger"); trigger.setJobDetail(jobDetail); trigger.setStartDelay(1); - trigger.setRepeatInterval(500); + trigger.setRepeatInterval(100); trigger.setRepeatCount(1); trigger.afterPropertiesSet(); @@ -203,7 +203,7 @@ void schedulerWithSpringBeanJobFactory() throws Exception { Thread.sleep(500); assertThat(DummyJob.param).isEqualTo(10); - assertThat(DummyJob.count > 0).as("DummyJob should have been executed at least once.").isTrue(); + assertThat(DummyJob.count).as("DummyJob should have been executed at least once.").isGreaterThan(0); bean.destroy(); } @@ -225,7 +225,7 @@ void schedulerWithSpringBeanJobFactoryAndParamMismatchNotIgnored() throws Except trigger.setName("myTrigger"); trigger.setJobDetail(jobDetail); trigger.setStartDelay(1); - trigger.setRepeatInterval(500); + trigger.setRepeatInterval(100); trigger.setRepeatCount(1); trigger.afterPropertiesSet(); @@ -239,7 +239,7 @@ void schedulerWithSpringBeanJobFactoryAndParamMismatchNotIgnored() throws Except Thread.sleep(500); assertThat(DummyJob.param).isEqualTo(0); - assertThat(DummyJob.count == 0).isTrue(); + assertThat(DummyJob.count).isEqualTo(0); bean.destroy(); } @@ -260,7 +260,7 @@ void schedulerWithSpringBeanJobFactoryAndQuartzJobBean() throws Exception { trigger.setName("myTrigger"); trigger.setJobDetail(jobDetail); trigger.setStartDelay(1); - trigger.setRepeatInterval(500); + trigger.setRepeatInterval(100); trigger.setRepeatCount(1); trigger.afterPropertiesSet(); @@ -273,7 +273,7 @@ void schedulerWithSpringBeanJobFactoryAndQuartzJobBean() throws Exception { Thread.sleep(500); assertThat(DummyJobBean.param).isEqualTo(10); - assertThat(DummyJobBean.count > 0).isTrue(); + assertThat(DummyJobBean.count).isGreaterThan(0); bean.destroy(); } @@ -292,7 +292,7 @@ void schedulerWithSpringBeanJobFactoryAndJobSchedulingData() throws Exception { Thread.sleep(500); assertThat(DummyJob.param).isEqualTo(10); - assertThat(DummyJob.count > 0).as("DummyJob should have been executed at least once.").isTrue(); + assertThat(DummyJob.count).as("DummyJob should have been executed at least once.").isGreaterThan(0); bean.destroy(); } diff --git a/spring-context/src/main/java/org/springframework/context/annotation/ClassPathBeanDefinitionScanner.java b/spring-context/src/main/java/org/springframework/context/annotation/ClassPathBeanDefinitionScanner.java index 65cbb9bdb9f2..7f41cd5962ce 100644 --- a/spring-context/src/main/java/org/springframework/context/annotation/ClassPathBeanDefinitionScanner.java +++ b/spring-context/src/main/java/org/springframework/context/annotation/ClassPathBeanDefinitionScanner.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -329,8 +329,8 @@ protected void registerBeanDefinition(BeanDefinitionHolder definitionHolder, Bea * @return {@code true} if the bean can be registered as-is; * {@code false} if it should be skipped because there is an * existing, compatible bean definition for the specified name - * @throws ConflictingBeanDefinitionException if an existing, incompatible - * bean definition has been found for the specified name + * @throws IllegalStateException if an existing, incompatible bean definition + * has been found for the specified name */ protected boolean checkCandidate(String beanName, BeanDefinition beanDefinition) throws IllegalStateException { if (!this.registry.containsBeanDefinition(beanName)) { @@ -354,16 +354,16 @@ protected boolean checkCandidate(String beanName, BeanDefinition beanDefinition) * the given existing bean definition. *

The default implementation considers them as compatible when the existing * bean definition comes from the same source or from a non-scanning source. - * @param newDefinition the new bean definition, originated from scanning - * @param existingDefinition the existing bean definition, potentially an + * @param newDef the new bean definition, originated from scanning + * @param existingDef the existing bean definition, potentially an * explicitly defined one or a previously generated one from scanning * @return whether the definitions are considered as compatible, with the * new definition to be skipped in favor of the existing definition */ - protected boolean isCompatible(BeanDefinition newDefinition, BeanDefinition existingDefinition) { - return (!(existingDefinition instanceof ScannedGenericBeanDefinition) || // explicitly registered overriding bean - (newDefinition.getSource() != null && newDefinition.getSource().equals(existingDefinition.getSource())) || // scanned same file twice - newDefinition.equals(existingDefinition)); // scanned equivalent class twice + protected boolean isCompatible(BeanDefinition newDef, BeanDefinition existingDef) { + return (!(existingDef instanceof ScannedGenericBeanDefinition) || // explicitly registered overriding bean + (newDef.getSource() != null && newDef.getSource().equals(existingDef.getSource())) || // scanned same file twice + newDef.equals(existingDef)); // scanned equivalent class twice } diff --git a/spring-context/src/test/java/org/springframework/context/annotation/ClassPathBeanDefinitionScannerTests.java b/spring-context/src/test/java/org/springframework/context/annotation/ClassPathBeanDefinitionScannerTests.java index 76b9fc53dd62..617b5579b7a6 100644 --- a/spring-context/src/test/java/org/springframework/context/annotation/ClassPathBeanDefinitionScannerTests.java +++ b/spring-context/src/test/java/org/springframework/context/annotation/ClassPathBeanDefinitionScannerTests.java @@ -197,6 +197,7 @@ public void testSimpleScanWithDefaultFiltersAndOverridingBean() { context.registerBeanDefinition("stubFooDao", new RootBeanDefinition(TestBean.class)); ClassPathBeanDefinitionScanner scanner = new ClassPathBeanDefinitionScanner(context); scanner.setIncludeAnnotationConfig(false); + // should not fail! scanner.scan(BASE_PACKAGE); } @@ -207,6 +208,7 @@ public void testSimpleScanWithDefaultFiltersAndDefaultBeanNameClash() { ClassPathBeanDefinitionScanner scanner = new ClassPathBeanDefinitionScanner(context); scanner.setIncludeAnnotationConfig(false); scanner.scan("org.springframework.context.annotation3"); + assertThatIllegalStateException().isThrownBy(() -> scanner.scan(BASE_PACKAGE)) .withMessageContaining("stubFooDao") .withMessageContaining(StubFooDao.class.getName()); diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java index 9e56b5d5e37d..c7e74b2ab2bb 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,8 +24,6 @@ import java.util.concurrent.CompletionStage; import java.util.function.Function; -import kotlinx.coroutines.CompletableDeferredKt; -import kotlinx.coroutines.Deferred; import org.reactivestreams.Publisher; import reactor.blockhound.BlockHound; import reactor.blockhound.integration.BlockHoundIntegration; @@ -39,13 +37,14 @@ import org.springframework.util.ReflectionUtils; /** - * A registry of adapters to adapt Reactive Streams {@link Publisher} to/from - * various async/reactive types such as {@code CompletableFuture}, RxJava - * {@code Flowable}, and others. + * A registry of adapters to adapt Reactive Streams {@link Publisher} to/from various + * async/reactive types such as {@code CompletableFuture}, RxJava {@code Flowable}, etc. + * This is designed to complement Spring's Reactor {@code Mono}/{@code Flux} support while + * also being usable without Reactor, e.g. just for {@code org.reactivestreams} bridging. * - *

By default, depending on classpath availability, adapters are registered - * for Reactor, RxJava 3, {@link CompletableFuture}, {@code Flow.Publisher}, - * and Kotlin Coroutines' {@code Deferred} and {@code Flow}. + *

By default, depending on classpath availability, adapters are registered for Reactor + * (including {@code CompletableFuture} and {@code Flow.Publisher} adapters), RxJava 3, + * Kotlin Coroutines' {@code Deferred} (bridged via Reactor) and SmallRye Mutiny 1.x. * *

Note: As of Spring Framework 5.3.11, support for * RxJava 1.x and 2.x is deprecated in favor of RxJava 3. @@ -401,9 +400,9 @@ private static class CoroutinesRegistrar { @SuppressWarnings("KotlinInternalInJava") void registerAdapters(ReactiveAdapterRegistry registry) { registry.registerReactiveType( - ReactiveTypeDescriptor.singleOptionalValue(Deferred.class, - () -> CompletableDeferredKt.CompletableDeferred(null)), - source -> CoroutinesUtils.deferredToMono((Deferred) source), + ReactiveTypeDescriptor.singleOptionalValue(kotlinx.coroutines.Deferred.class, + () -> kotlinx.coroutines.CompletableDeferredKt.CompletableDeferred(null)), + source -> CoroutinesUtils.deferredToMono((kotlinx.coroutines.Deferred) source), source -> CoroutinesUtils.monoToDeferred(Mono.from(source))); registry.registerReactiveType( diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java b/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java index f1d76c330e24..5e37afabf097 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,7 @@ public final class ReactiveTypeDescriptor { private final boolean noValue; @Nullable - private final Supplier emptyValueSupplier; + private final Supplier emptySupplier; private final boolean deferred; @@ -55,7 +55,7 @@ private ReactiveTypeDescriptor(Class reactiveType, boolean multiValue, boolea this.reactiveType = reactiveType; this.multiValue = multiValue; this.noValue = noValue; - this.emptyValueSupplier = emptySupplier; + this.emptySupplier = emptySupplier; this.deferred = deferred; } @@ -89,16 +89,16 @@ public boolean isNoValue() { * Return {@code true} if the reactive type can complete with no values. */ public boolean supportsEmpty() { - return (this.emptyValueSupplier != null); + return (this.emptySupplier != null); } /** * Return an empty-value instance for the underlying reactive or async type. - * Use of this type implies {@link #supportsEmpty()} is {@code true}. + *

Use of this type implies {@link #supportsEmpty()} is {@code true}. */ public Object getEmptyValue() { - Assert.state(this.emptyValueSupplier != null, "Empty values not supported"); - return this.emptyValueSupplier.get(); + Assert.state(this.emptySupplier != null, "Empty values not supported"); + return this.emptySupplier.get(); } /** diff --git a/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java b/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java index 35683ac0cbee..98d7d0f59d2b 100644 --- a/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java +++ b/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -100,7 +100,7 @@ void toFlux() { List sequence = Arrays.asList(1, 2, 3); Publisher source = io.reactivex.rxjava3.core.Flowable.fromIterable(sequence); Object target = getAdapter(Flux.class).fromPublisher(source); - assertThat(target instanceof Flux).isTrue(); + assertThat(target).isInstanceOf(Flux.class); assertThat(((Flux) target).collectList().block(ONE_SECOND)).isEqualTo(sequence); } @@ -108,7 +108,7 @@ void toFlux() { void toMono() { Publisher source = io.reactivex.rxjava3.core.Flowable.fromArray(1, 2, 3); Object target = getAdapter(Mono.class).fromPublisher(source); - assertThat(target instanceof Mono).isTrue(); + assertThat(target).isInstanceOf(Mono.class); assertThat(((Mono) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1)); } @@ -116,7 +116,7 @@ void toMono() { void toCompletableFuture() throws Exception { Publisher source = Flux.fromArray(new Integer[] {1, 2, 3}); Object target = getAdapter(CompletableFuture.class).fromPublisher(source); - assertThat(target instanceof CompletableFuture).isTrue(); + assertThat(target).isInstanceOf(CompletableFuture.class); assertThat(((CompletableFuture) target).get()).isEqualTo(Integer.valueOf(1)); } @@ -125,7 +125,7 @@ void fromCompletableFuture() { CompletableFuture future = new CompletableFuture<>(); future.complete(1); Object target = getAdapter(CompletableFuture.class).toPublisher(future); - assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue(); + assertThat(target).as("Expected Mono Publisher: " + target.getClass().getName()).isInstanceOf(Mono.class); assertThat(((Mono) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1)); } } @@ -294,7 +294,7 @@ void toFlowable() { List sequence = Arrays.asList(1, 2, 3); Publisher source = Flux.fromIterable(sequence); Object target = getAdapter(io.reactivex.rxjava3.core.Flowable.class).fromPublisher(source); - assertThat(target instanceof io.reactivex.rxjava3.core.Flowable).isTrue(); + assertThat(target).isInstanceOf(io.reactivex.rxjava3.core.Flowable.class); assertThat(((io.reactivex.rxjava3.core.Flowable) target).toList().blockingGet()).isEqualTo(sequence); } @@ -303,7 +303,7 @@ void toObservable() { List sequence = Arrays.asList(1, 2, 3); Publisher source = Flux.fromIterable(sequence); Object target = getAdapter(io.reactivex.rxjava3.core.Observable.class).fromPublisher(source); - assertThat(target instanceof io.reactivex.rxjava3.core.Observable).isTrue(); + assertThat(target).isInstanceOf(io.reactivex.rxjava3.core.Observable.class); assertThat(((io.reactivex.rxjava3.core.Observable) target).toList().blockingGet()).isEqualTo(sequence); } @@ -311,7 +311,7 @@ void toObservable() { void toSingle() { Publisher source = Flux.fromArray(new Integer[] {1}); Object target = getAdapter(io.reactivex.rxjava3.core.Single.class).fromPublisher(source); - assertThat(target instanceof io.reactivex.rxjava3.core.Single).isTrue(); + assertThat(target).isInstanceOf(io.reactivex.rxjava3.core.Single.class); assertThat(((io.reactivex.rxjava3.core.Single) target).blockingGet()).isEqualTo(Integer.valueOf(1)); } @@ -319,7 +319,7 @@ void toSingle() { void toCompletable() { Publisher source = Flux.fromArray(new Integer[] {1, 2, 3}); Object target = getAdapter(io.reactivex.rxjava3.core.Completable.class).fromPublisher(source); - assertThat(target instanceof io.reactivex.rxjava3.core.Completable).isTrue(); + assertThat(target).isInstanceOf(io.reactivex.rxjava3.core.Completable.class); ((io.reactivex.rxjava3.core.Completable) target).blockingAwait(); } @@ -328,7 +328,7 @@ void fromFlowable() { List sequence = Arrays.asList(1, 2, 3); Object source = io.reactivex.rxjava3.core.Flowable.fromIterable(sequence); Object target = getAdapter(io.reactivex.rxjava3.core.Flowable.class).toPublisher(source); - assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue(); + assertThat(target).as("Expected Flux Publisher: " + target.getClass().getName()).isInstanceOf(Flux.class); assertThat(((Flux) target).collectList().block(ONE_SECOND)).isEqualTo(sequence); } @@ -337,7 +337,7 @@ void fromObservable() { List sequence = Arrays.asList(1, 2, 3); Object source = io.reactivex.rxjava3.core.Observable.fromIterable(sequence); Object target = getAdapter(io.reactivex.rxjava3.core.Observable.class).toPublisher(source); - assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue(); + assertThat(target).as("Expected Flux Publisher: " + target.getClass().getName()).isInstanceOf(Flux.class); assertThat(((Flux) target).collectList().block(ONE_SECOND)).isEqualTo(sequence); } @@ -345,7 +345,7 @@ void fromObservable() { void fromSingle() { Object source = io.reactivex.rxjava3.core.Single.just(1); Object target = getAdapter(io.reactivex.rxjava3.core.Single.class).toPublisher(source); - assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue(); + assertThat(target).as("Expected Mono Publisher: " + target.getClass().getName()).isInstanceOf(Mono.class); assertThat(((Mono) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1)); } @@ -353,7 +353,7 @@ void fromSingle() { void fromCompletable() { Object source = io.reactivex.rxjava3.core.Completable.complete(); Object target = getAdapter(io.reactivex.rxjava3.core.Completable.class).toPublisher(source); - assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue(); + assertThat(target).as("Expected Mono Publisher: " + target.getClass().getName()).isInstanceOf(Mono.class); ((Mono) target).block(ONE_SECOND); } } diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java index 9f6cfd890549..1c99636646c8 100644 --- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java +++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java @@ -24,7 +24,6 @@ import io.r2dbc.spi.R2dbcBadGrammarException; import io.r2dbc.spi.R2dbcTimeoutException; import io.r2dbc.spi.Statement; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; @@ -55,6 +54,7 @@ * Unit tests for {@link R2dbcTransactionManager}. * * @author Mark Paluch + * @author Juergen Hoeller */ class R2dbcTransactionManagerUnitTests { @@ -85,8 +85,7 @@ void testSimpleTransaction() { ConnectionFactoryUtils.getConnection(connectionFactoryMock) .flatMap(connection -> TransactionSynchronizationManager.forCurrentTransaction() - .doOnNext(synchronizationManager -> synchronizationManager.registerSynchronization( - sync))) + .doOnNext(synchronizationManager -> synchronizationManager.registerSynchronization(sync))) .as(operator::transactional) .as(StepVerifier::create) .expectNextCount(1) @@ -118,12 +117,11 @@ void testBeginFails() { TransactionalOperator operator = TransactionalOperator.create(tm, definition); - ConnectionFactoryUtils.getConnection(connectionFactoryMock).as( - operator::transactional) + ConnectionFactoryUtils.getConnection(connectionFactoryMock) + .as(operator::transactional) .as(StepVerifier::create) .expectErrorSatisfies(actual -> assertThat(actual).isInstanceOf( - CannotCreateTransactionException.class).hasCauseInstanceOf( - R2dbcBadGrammarException.class)) + CannotCreateTransactionException.class).hasCauseInstanceOf(R2dbcBadGrammarException.class)) .verify(); } @@ -139,8 +137,8 @@ void appliesIsolationLevel() { TransactionalOperator operator = TransactionalOperator.create(tm, definition); - ConnectionFactoryUtils.getConnection(connectionFactoryMock).as( - operator::transactional) + ConnectionFactoryUtils.getConnection(connectionFactoryMock) + .as(operator::transactional) .as(StepVerifier::create) .expectNextCount(1) .verifyComplete(); @@ -164,8 +162,8 @@ void doesNotSetIsolationLevelIfMatch() { TransactionalOperator operator = TransactionalOperator.create(tm, definition); - ConnectionFactoryUtils.getConnection(connectionFactoryMock).as( - operator::transactional) + ConnectionFactoryUtils.getConnection(connectionFactoryMock) + .as(operator::transactional) .as(StepVerifier::create) .expectNextCount(1) .verifyComplete(); @@ -184,8 +182,8 @@ void doesNotSetAutoCommitDisabled() { TransactionalOperator operator = TransactionalOperator.create(tm, definition); - ConnectionFactoryUtils.getConnection(connectionFactoryMock).as( - operator::transactional) + ConnectionFactoryUtils.getConnection(connectionFactoryMock) + .as(operator::transactional) .as(StepVerifier::create) .expectNextCount(1) .verifyComplete(); @@ -232,8 +230,8 @@ void appliesReadOnly() { TransactionalOperator operator = TransactionalOperator.create(tm, definition); - ConnectionFactoryUtils.getConnection(connectionFactoryMock).as( - operator::transactional) + ConnectionFactoryUtils.getConnection(connectionFactoryMock) + .as(operator::transactional) .as(StepVerifier::create) .expectNextCount(1) .verifyComplete(); @@ -249,7 +247,6 @@ void appliesReadOnly() { @Test void testCommitFails() { when(connectionMock.commitTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Commit should fail")))); - when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty()); TransactionalOperator operator = TransactionalOperator.create(tm); @@ -270,7 +267,6 @@ void testCommitFails() { @Test void testRollback() { - AtomicInteger commits = new AtomicInteger(); when(connectionMock.commitTransaction()).thenReturn( Mono.fromRunnable(commits::incrementAndGet)); @@ -282,11 +278,9 @@ void testRollback() { TransactionalOperator operator = TransactionalOperator.create(tm); ConnectionFactoryUtils.getConnection(connectionFactoryMock) - .doOnNext(connection -> { - throw new IllegalStateException(); - }).as(operator::transactional) - .as(StepVerifier::create) - .verifyError(IllegalStateException.class); + .doOnNext(connection -> { throw new IllegalStateException(); }) + .as(operator::transactional) + .as(StepVerifier::create).verifyError(IllegalStateException.class); assertThat(commits).hasValue(0); assertThat(rollbacks).hasValue(1); @@ -303,15 +297,11 @@ void testRollbackFails() { when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Commit should fail"))), Mono.empty()); TransactionalOperator operator = TransactionalOperator.create(tm); - operator.execute(reactiveTransaction -> { - reactiveTransaction.setRollbackOnly(); - return ConnectionFactoryUtils.getConnection(connectionFactoryMock) .doOnNext(connection -> connection.createStatement("foo")).then(); - }).as(StepVerifier::create) - .verifyError(IllegalTransactionStateException.class); + }).as(StepVerifier::create).verifyError(IllegalTransactionStateException.class); verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(); @@ -338,7 +328,7 @@ void testConnectionReleasedWhenRollbackFails() { .doOnNext(connection -> { throw new IllegalStateException("Intentional error to trigger rollback"); }).then()).as(StepVerifier::create) - .verifyErrorSatisfies(e -> Assertions.assertThat(e) + .verifyErrorSatisfies(ex -> assertThat(ex) .isInstanceOf(BadSqlGrammarException.class) .hasCause(new R2dbcBadGrammarException("Rollback should fail")) ); @@ -357,19 +347,15 @@ void testTransactionSetRollbackOnly() { TransactionSynchronization.STATUS_ROLLED_BACK); TransactionalOperator operator = TransactionalOperator.create(tm); - operator.execute(tx -> { - tx.setRollbackOnly(); assertThat(tx.isNewTransaction()).isTrue(); - return TransactionSynchronizationManager.forCurrentTransaction().doOnNext( synchronizationManager -> { assertThat(synchronizationManager.hasResource(connectionFactoryMock)).isTrue(); synchronizationManager.registerSynchronization(sync); }).then(); - }).as(StepVerifier::create) - .verifyComplete(); + }).as(StepVerifier::create).verifyComplete(); verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(); @@ -389,20 +375,16 @@ void testPropagationNeverWithExistingTransaction() { DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); - TransactionalOperator operator = TransactionalOperator.create(tm, definition); + TransactionalOperator operator = TransactionalOperator.create(tm, definition); operator.execute(tx1 -> { - assertThat(tx1.isNewTransaction()).isTrue(); - definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NEVER); return operator.execute(tx2 -> { - fail("Should have thrown IllegalTransactionStateException"); return Mono.empty(); }); - }).as(StepVerifier::create) - .verifyError(IllegalTransactionStateException.class); + }).as(StepVerifier::create).verifyError(IllegalTransactionStateException.class); verify(connectionMock).rollbackTransaction(); verify(connectionMock).close(); @@ -414,32 +396,49 @@ void testPropagationSupportsAndRequiresNew() { DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS); - TransactionalOperator operator = TransactionalOperator.create(tm, definition); + TransactionalOperator operator = TransactionalOperator.create(tm, definition); operator.execute(tx1 -> { - assertThat(tx1.isNewTransaction()).isFalse(); - DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition(); - innerDef.setPropagationBehavior( - TransactionDefinition.PROPAGATION_REQUIRES_NEW); + innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionalOperator inner = TransactionalOperator.create(tm, innerDef); - return inner.execute(tx2 -> { - assertThat(tx2.isNewTransaction()).isTrue(); return Mono.empty(); }); - }).as(StepVerifier::create) - .verifyComplete(); + }).as(StepVerifier::create).verifyComplete(); verify(connectionMock).commitTransaction(); verify(connectionMock).close(); } + @Test + void testPropagationSupportsAndRequiresNewWithRollback() { + when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty()); - private static class TestTransactionSynchronization - implements TransactionSynchronization { + DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); + definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS); + + TransactionalOperator operator = TransactionalOperator.create(tm, definition); + operator.execute(tx1 -> { + assertThat(tx1.isNewTransaction()).isFalse(); + DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition(); + innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + TransactionalOperator inner = TransactionalOperator.create(tm, innerDef); + return inner.execute(tx2 -> { + assertThat(tx2.isNewTransaction()).isTrue(); + tx2.setRollbackOnly(); + return Mono.empty(); + }); + }).as(StepVerifier::create).verifyComplete(); + + verify(connectionMock).rollbackTransaction(); + verify(connectionMock).close(); + } + + + private static class TestTransactionSynchronization implements TransactionSynchronization { private int status; @@ -512,7 +511,6 @@ protected void doAfterCompletion(int status) { this.afterCompletionCalled = true; assertThat(status).isEqualTo(this.status); } - } } diff --git a/spring-tx/src/main/java/org/springframework/transaction/PlatformTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/PlatformTransactionManager.java index 6122d0906d4e..ae192da79c18 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/PlatformTransactionManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/PlatformTransactionManager.java @@ -68,8 +68,7 @@ public interface PlatformTransactionManager extends TransactionManager { * @see TransactionDefinition#getTimeout * @see TransactionDefinition#isReadOnly */ - TransactionStatus getTransaction(@Nullable TransactionDefinition definition) - throws TransactionException; + TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException; /** * Commit the given transaction, with regard to its status. If the transaction diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransaction.java b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransaction.java index 11a33e681b52..2c3a03e37855 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransaction.java +++ b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransaction.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ package org.springframework.transaction; /** - * Representation of an ongoing reactive transaction. + * Representation of an ongoing {@link ReactiveTransactionManager} transaction. * This is currently a marker interface extending {@link TransactionExecution} * but may acquire further methods in a future revision. * @@ -30,6 +30,7 @@ * @since 5.2 * @see #setRollbackOnly() * @see ReactiveTransactionManager#getReactiveTransaction + * @see org.springframework.transaction.reactive.TransactionCallback#doInTransaction */ public interface ReactiveTransaction extends TransactionExecution { diff --git a/spring-tx/src/main/java/org/springframework/transaction/TransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/TransactionStatus.java index 5968c57fd7bb..d61ed25f618d 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/TransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/TransactionStatus.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,8 @@ import java.io.Flushable; /** - * Representation of the status of a transaction. + * Representation of an ongoing {@link PlatformTransactionManager} transaction. + * Extends the common {@link TransactionExecution} interface. * *

Transactional code can use this to retrieve status information, * and to programmatically request a rollback (instead of throwing diff --git a/spring-tx/src/main/java/org/springframework/transaction/support/SmartTransactionObject.java b/spring-tx/src/main/java/org/springframework/transaction/support/SmartTransactionObject.java index 1bb2bf173173..d774381ccbde 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/support/SmartTransactionObject.java +++ b/spring-tx/src/main/java/org/springframework/transaction/support/SmartTransactionObject.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,13 +23,13 @@ * return an internal rollback-only marker, typically from another * transaction that has participated and marked it as rollback-only. * - *

Autodetected by DefaultTransactionStatus, to always return a - * current rollbackOnly flag even if not resulting from the current + *

Autodetected by {@link DefaultTransactionStatus} in order to always + * return a current rollbackOnly flag even if not resulting from the current * TransactionStatus. * * @author Juergen Hoeller * @since 1.1 - * @see DefaultTransactionStatus#isRollbackOnly + * @see DefaultTransactionStatus#isGlobalRollbackOnly() */ public interface SmartTransactionObject extends Flushable {