Skip to content

Commit

Permalink
polish and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Apr 25, 2023
1 parent 50f29b8 commit 038c8dd
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ static boolean isReactive(Method method) {
Assert.isTrue(method.getParameterCount() == 1,"Kotlin suspending functions may only be"
+ " annotated with @Scheduled if declared without arguments");
Assert.isTrue(coroutinesReactorPresent, "Kotlin suspending functions may only be annotated with"
+ " @Scheduled if the Coroutine-Reactor bridge (kotlinx.coroutines.reactive) is present at runtime");
+ " @Scheduled if the Coroutine-Reactor bridge (kotlinx.coroutines.reactor) is present at runtime");
return true;
}
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
Expand Down Expand Up @@ -165,8 +165,8 @@ public void onNext(Object o) {
}

@Override
public void onError(Throwable e) {
LOGGER.warn("Unexpected error occurred in scheduled reactive task", e);
public void onError(Throwable ex) {
LOGGER.warn("Unexpected error occurred in scheduled reactive task", ex);
latch.countDown();
}

Expand All @@ -178,8 +178,8 @@ public void onComplete() {
try {
latch.await();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
};
}
Expand All @@ -195,8 +195,8 @@ public void onNext(Object o) {
}

@Override
public void onError(Throwable e) {
LOGGER.warn("Unexpected error occurred in scheduled reactive task", e);
public void onError(Throwable ex) {
LOGGER.warn("Unexpected error occurred in scheduled reactive task", ex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,15 @@ void rejectCantAccessMethod() {

@Test
void hasCheckpointToString() {
//FIXME test checkpointing
assertThat("FIXME").isEqualTo("@Scheduled 'mono()' in bean 'org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupportTests$ReactiveMethods'");
ReactiveMethods target = new ReactiveMethods();
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "mono");
Publisher<?> p = getPublisherFor(m, target);

assertThat(p.getClass().getName())
.as("checkpoint class")
.isEqualTo("reactor.core.publisher.FluxOnAssembly");

assertThat(p).hasToString("checkpoint(\"@Scheduled 'mono()' in bean 'org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupportTests$ReactiveMethods'\")");
}

static class ReactiveMethods {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.ReactiveTask
import org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.getPublisherFor
import org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.isReactive
import org.springframework.util.ReflectionUtils
import reactor.core.publisher.Mono
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.Continuation

Expand Down Expand Up @@ -122,11 +120,6 @@ class KotlinScheduledAnnotationReactiveSupportTests {
Assertions.assertThatIllegalArgumentException().isThrownBy { isReactive(m) }
.withMessage("Kotlin suspending functions may only be annotated with @Scheduled if declared without arguments")
.withNoCause()

//constructor of task doesn't reject
Assertions.assertThatNoException().isThrownBy {
ReactiveTask(m, target!!, Duration.ZERO, Duration.ZERO, false)
}
}

@Test
Expand All @@ -137,13 +130,6 @@ class KotlinScheduledAnnotationReactiveSupportTests {
Assertions.assertThatIllegalArgumentException().isThrownBy { getPublisherFor(m!!, target!!) }
.withMessage("Cannot convert the @Scheduled reactive method return type to Publisher")
.withNoCause()

//constructor of task
Assertions.assertThatIllegalArgumentException().isThrownBy {
ReactiveTask(m!!, target!!, Duration.ZERO, Duration.ZERO, false)
}
.withMessage("Cannot convert the @Scheduled reactive method return type to Publisher")
.withNoCause()
}

@Test
Expand All @@ -155,11 +141,6 @@ class KotlinScheduledAnnotationReactiveSupportTests {
Assertions.assertThatIllegalStateException().isThrownBy { mono.block() }
.withMessage("expected")
.withNoCause()

//constructor of task doesn't throw
Assertions.assertThatNoException().isThrownBy {
ReactiveTask(m, target!!, Duration.ZERO, Duration.ZERO, false)
}
}

@Test
Expand All @@ -171,74 +152,4 @@ class KotlinScheduledAnnotationReactiveSupportTests {
mono.block()
assertThat(target!!.subscription).describedAs("after subscription").hasValue(1)
}

@Test
fun hasCheckpointToString() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspending", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ZERO, Duration.ZERO, false)
assertThat(reactiveTask).hasToString("@Scheduled 'suspending()' in bean 'org.springframework.scheduling.annotation.KotlinScheduledAnnotationReactiveSupportTests\$SuspendingFunctions'")
}

@Test
fun cancelledEarlyPreventsSubscription() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspendingTracking", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ZERO, Duration.ofSeconds(10), false)
reactiveTask.cancel()
reactiveTask.subscribe()
assertThat(target!!.subscription).hasValue(0)
}

@Test
fun multipleSubscriptionsTracked() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspendingTracking", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ZERO, Duration.ofMillis(500), false)
reactiveTask.subscribe()
Thread.sleep(1500)
reactiveTask.cancel()
assertThat(target!!.subscription).hasValueGreaterThanOrEqualTo(3)
}

@Test
@Throws(InterruptedException::class)
fun noInitialDelayFixedDelay() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspendingTracking", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ZERO, Duration.ofSeconds(10), false)
reactiveTask.subscribe()
Thread.sleep(500)
reactiveTask.cancel()
assertThat(target!!.subscription).hasValue(1)
}

@Test
@Throws(InterruptedException::class)
fun noInitialDelayFixedRate() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspendingTracking", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ZERO, Duration.ofSeconds(10), true)
reactiveTask.subscribe()
Thread.sleep(500)
reactiveTask.cancel()
assertThat(target!!.subscription).hasValue(1)
}

@Test
@Throws(InterruptedException::class)
fun initialDelayFixedDelay() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspendingTracking", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ofSeconds(10), Duration.ofMillis(500), false)
reactiveTask.subscribe()
Thread.sleep(500)
reactiveTask.cancel()
assertThat(target!!.subscription).hasValue(0)
}

@Test
@Throws(InterruptedException::class)
fun initialDelayFixedRate() {
val m = ReflectionUtils.findMethod(SuspendingFunctions::class.java, "suspendingTracking", Continuation::class.java)
val reactiveTask = ReactiveTask(m!!, target!!, Duration.ofSeconds(10), Duration.ofMillis(500), true)
reactiveTask.subscribe()
Thread.sleep(500)
reactiveTask.cancel()
assertThat(target!!.subscription).hasValue(0)
}
}

0 comments on commit 038c8dd

Please sign in to comment.