Skip to content

Commit

Permalink
Support nested suspend calls for Kotlin coroutines
Browse files Browse the repository at this point in the history
Closes gh-13764
  • Loading branch information
sjohnr committed Sep 5, 2023
1 parent 1a45602 commit 92256f0
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package org.springframework.security.config.annotation.method.configuration

import io.mockk.Called
import io.mockk.clearAllMocks
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.springframework.beans.factory.annotation.Autowired
Expand All @@ -46,7 +49,7 @@ class KotlinEnableReactiveMethodSecurityNoAuthorizationManagerTests {
@Autowired
var messageService: KotlinReactiveMessageService? = null

@After
@AfterEach
fun cleanup() {
clearAllMocks()
}
Expand Down Expand Up @@ -125,6 +128,16 @@ class KotlinEnableReactiveMethodSecurityNoAuthorizationManagerTests {
verify { delegate wasNot Called }
}

@Test
@WithMockUser(authorities = ["ROLE_ADMIN"])
fun `suspendingPreAuthorizeDelegate when user has role then delegate called`() {
coEvery { delegate.suspendingPreAuthorizeHasRole() } returns "ok"
runBlocking {
messageService!!.suspendingPreAuthorizeDelegate()
}
coVerify(exactly = 1) { delegate.suspendingPreAuthorizeHasRole() }
}

@Test
@WithMockUser(authorities = ["ROLE_ADMIN"])
fun `suspendingFlowPreAuthorize when user has role then success`() {
Expand Down Expand Up @@ -168,6 +181,16 @@ class KotlinEnableReactiveMethodSecurityNoAuthorizationManagerTests {
verify { delegate wasNot Called }
}

@Test
@WithMockUser(authorities = ["ROLE_ADMIN"])
fun `suspendingFlowPreAuthorizeDelegate when user has role then delegate called`() {
coEvery { delegate.flowPreAuthorize() } returns flow { }
runBlocking {
messageService!!.suspendingFlowPreAuthorizeDelegate().collect()
}
coVerify(exactly = 1) { delegate.flowPreAuthorize() }
}

@Test
@WithMockUser(authorities = ["ROLE_ADMIN"])
fun `flowPreAuthorize when user has role then success`() {
Expand Down Expand Up @@ -211,6 +234,16 @@ class KotlinEnableReactiveMethodSecurityNoAuthorizationManagerTests {
verify { delegate wasNot Called }
}

@Test
@WithMockUser(authorities = ["ROLE_ADMIN"])
fun `flowPreAuthorizeDelegate when user has role then delegate called`() {
coEvery { delegate.flowPreAuthorize() } returns flow { }
runBlocking {
messageService!!.flowPreAuthorizeDelegate().collect()
}
coVerify(exactly = 1) { delegate.flowPreAuthorize() }
}

@Configuration
@EnableReactiveMethodSecurity(useAuthorizationManager = false)
open class Config {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,17 +19,14 @@
import java.lang.reflect.Method;
import java.util.Collection;

import kotlin.coroutines.Continuation;
import kotlinx.coroutines.reactive.ReactiveFlowKt;
import kotlinx.coroutines.reactor.MonoKt;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.core.CoroutinesUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapter;
Expand Down Expand Up @@ -126,34 +123,23 @@ public Object invoke(final MethodInvocation invocation) {
.map((r) -> (attr != null) ? this.postAdvice.after(auth, invocation, attr, r) : r));
}
if (hasFlowReturnType) {
Flux<?> response;
if (isSuspendingFunction) {
response = toInvoke.flatMapMany((auth) -> Flux
.from(CoroutinesUtils.invokeSuspendingFunction(invocation.getMethod(), invocation.getThis(),
invocation.getArguments()))
.map((r) -> (attr != null) ? this.postAdvice.after(auth, invocation, attr, r) : r));
return toInvoke
.flatMapMany((auth) -> Flux.from(PrePostAdviceReactiveMethodInterceptor.proceed(invocation))
.map((r) -> (attr != null) ? this.postAdvice.after(auth, invocation, attr, r) : r));
}
else {
ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(returnType);
Assert.state(adapter != null, () -> "The returnType " + returnType + " on " + method
+ " must have a org.springframework.core.ReactiveAdapter registered");
response = toInvoke.flatMapMany((auth) -> Flux
Flux<?> response = toInvoke.flatMapMany((auth) -> Flux
.from(adapter.toPublisher(PrePostAdviceReactiveMethodInterceptor.flowProceed(invocation)))
.map((r) -> (attr != null) ? this.postAdvice.after(auth, invocation, attr, r) : r));
return KotlinDelegate.asFlow(response);
}
return KotlinDelegate.asFlow(response);
}
if (isSuspendingFunction) {
Mono<?> response = toInvoke.flatMap((auth) -> Mono
.from(CoroutinesUtils.invokeSuspendingFunction(invocation.getMethod(), invocation.getThis(),
invocation.getArguments()))
.map((r) -> (attr != null) ? this.postAdvice.after(auth, invocation, attr, r) : r));
return KotlinDelegate.awaitSingleOrNull(response,
invocation.getArguments()[invocation.getArguments().length - 1]);
}
return toInvoke.flatMapMany(
(auth) -> Flux.from(PrePostAdviceReactiveMethodInterceptor.<Publisher<?>>proceed(invocation))
.map((r) -> (attr != null) ? this.postAdvice.after(auth, invocation, attr, r) : r));
return toInvoke.flatMap((auth) -> Mono.from(PrePostAdviceReactiveMethodInterceptor.proceed(invocation))
.map((r) -> (attr != null) ? this.postAdvice.after(auth, invocation, attr, r) : r));
}

private static <T extends Publisher<?>> T proceed(final MethodInvocation invocation) {
Expand Down Expand Up @@ -201,10 +187,6 @@ private static Object asFlow(Publisher<?> publisher) {
return ReactiveFlowKt.asFlow(publisher);
}

private static Object awaitSingleOrNull(Mono<?> publisher, Object continuation) {
return MonoKt.awaitSingleOrNull(publisher, (Continuation<Object>) continuation);
}

}

}

0 comments on commit 92256f0

Please sign in to comment.