From 164852944a4f113be48d1c0182ca621cc85b0c29 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 17 Dec 2025 16:31:45 -0500 Subject: [PATCH 1/2] GH-10623: Add `GrpcInboundGateway` Fixes: https://github.com/spring-projects/spring-integration/issues/10623 * Add `spring-integration-grpc` module * Manage respective new `io.grpc` dependencies * Add Checkstyle suppression for new Proto-generated classes * Add `GrpcHeaders` for convenient API * Implement `GrpcInboundGateway` with a proxy for gRPC service methods * Add `TestInProcessConfiguration` for a common in-process infrastructure * Document new feature --- build.gradle | 41 +++ .../integration/grpc/GrpcHeaders.java | 58 ++++ .../grpc/inbound/GrpcInboundGateway.java | 234 ++++++++++++++++ .../grpc/inbound/package-info.java | 5 + .../integration/grpc/package-info.java | 5 + .../grpc/TestInProcessConfiguration.java | 79 ++++++ .../grpc/inbound/GrpcInboundGatewayTests.java | 250 ++++++++++++++++++ .../src/test/proto/test_hello.proto | 40 +++ .../src/test/resources/log4j2-test.xml | 15 ++ src/checkstyle/checkstyle-suppressions.xml | 1 + src/reference/antora/modules/ROOT/nav.adoc | 1 + .../antora/modules/ROOT/pages/grpc.adoc | 154 +++++++++++ 12 files changed, 883 insertions(+) create mode 100644 spring-integration-grpc/src/main/java/org/springframework/integration/grpc/GrpcHeaders.java create mode 100644 spring-integration-grpc/src/main/java/org/springframework/integration/grpc/inbound/GrpcInboundGateway.java create mode 100644 spring-integration-grpc/src/main/java/org/springframework/integration/grpc/inbound/package-info.java create mode 100644 spring-integration-grpc/src/main/java/org/springframework/integration/grpc/package-info.java create mode 100644 spring-integration-grpc/src/test/java/org/springframework/integration/grpc/TestInProcessConfiguration.java create mode 100644 spring-integration-grpc/src/test/java/org/springframework/integration/grpc/inbound/GrpcInboundGatewayTests.java create mode 100644 spring-integration-grpc/src/test/proto/test_hello.proto create mode 100644 spring-integration-grpc/src/test/resources/log4j2-test.xml create mode 100644 src/reference/antora/modules/ROOT/pages/grpc.adoc diff --git a/build.gradle b/build.gradle index 6b741518574..d9a77c797b6 100644 --- a/build.gradle +++ b/build.gradle @@ -62,6 +62,7 @@ ext { graalvmVersion = '25.0.1' greenmailVersion = '2.1.8' groovyVersion = '5.0.3' + grpcVersion = '1.77.0' hamcrestVersion = '3.0' hazelcastVersion = '5.6.0' hibernateVersion = '7.1.12.Final' @@ -173,6 +174,7 @@ allprojects { dependencyManagement(platform("org.springframework.security:spring-security-bom:$springSecurityVersion")) dependencyManagement(platform("org.springframework.ws:spring-ws-bom:$springWsVersion")) dependencyManagement(platform("org.mongodb:mongodb-driver-bom:$mongoDriverVersion")) + dependencyManagement(platform("io.grpc:grpc-bom:$grpcVersion")) } } @@ -627,6 +629,45 @@ project('spring-integration-groovy') { } } +project('spring-integration-grpc') { + description = 'Spring Integration gRPC Support' + + apply plugin: 'com.google.protobuf' + + configurations { + [compileProtoPath, testCompileProtoPath].each { + it.extendsFrom(dependencyManagement) + } + } + + dependencies { + api 'io.grpc:grpc-stub' + + testImplementation 'io.grpc:grpc-protobuf' + testImplementation 'io.grpc:grpc-inprocess' + testImplementation "com.google.protobuf:protobuf-java:$protobufVersion" + } + + protobuf { + protoc { + artifact = "com.google.protobuf:protoc:$protobufVersion" + } + plugins { + grpc { + artifact = "io.grpc:protoc-gen-grpc-java:$grpcVersion" + } + } + generateProtoTasks { + // Only generate for test source set, not main + ofSourceSet('test')*.plugins { + grpc { + option '@generated=omit' + } + } + } + } +} + project('spring-integration-hazelcast') { description = 'Spring Integration Hazelcast Support' dependencies { diff --git a/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/GrpcHeaders.java b/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/GrpcHeaders.java new file mode 100644 index 00000000000..c93e7e20767 --- /dev/null +++ b/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/GrpcHeaders.java @@ -0,0 +1,58 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.grpc; + +/** + * Constants for gRPC-specific message headers. + * + * @author Artem Bilan + * + * @since 7.1 + */ +public final class GrpcHeaders { + + /** + * The prefix for all gRPC-specific headers. + */ + public static final String PREFIX = "grpc_"; + + /** + * The header containing the called gRPC service name. + */ + public static final String SERVICE = PREFIX + "service"; + + /** + * The header containing the gRPC service method name. + */ + public static final String SERVICE_METHOD = PREFIX + "serviceMethod"; + + /** + * The header containing the gRPC service method type. + * One of the {@link io.grpc.MethodDescriptor.MethodType} + */ + public static final String METHOD_TYPE = PREFIX + "methodType"; + + /** + * The header containing the gRPC service method schema descriptor. + * A value from the {@link io.grpc.MethodDescriptor#getSchemaDescriptor()} + */ + public static final String SCHEMA_DESCRIPTOR = PREFIX + "schemaDescriptor"; + + private GrpcHeaders() { + } + +} diff --git a/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/inbound/GrpcInboundGateway.java b/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/inbound/GrpcInboundGateway.java new file mode 100644 index 00000000000..1eb9e46cdaf --- /dev/null +++ b/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/inbound/GrpcInboundGateway.java @@ -0,0 +1,234 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.grpc.inbound; + +import java.lang.reflect.Method; +import java.util.Arrays; + +import io.grpc.BindableService; +import io.grpc.MethodDescriptor; +import io.grpc.ServerServiceDefinition; +import io.grpc.stub.StreamObserver; +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; +import org.jspecify.annotations.Nullable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import org.springframework.aop.framework.ProxyFactory; +import org.springframework.core.log.LogMessage; +import org.springframework.integration.gateway.DefaultMethodInvokingMethodInterceptor; +import org.springframework.integration.gateway.MessagingGatewaySupport; +import org.springframework.integration.grpc.GrpcHeaders; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; +import org.springframework.util.ReflectionUtils; +import org.springframework.util.StringUtils; + +/** + * The {@link MessagingGatewaySupport} implementation for gRPC {@link BindableService}. + * An instance of this class requires a {@link BindableService} class from the gRPC service definition. + * Only standard 'grpc' services are supported which implements a generated {@code AsyncService} interface. + * This gateway is a {@link BindableService} by itself to be registered with the gRPC server. + * An internal proxy is created to intercept gRPC method calls and convert them to Spring Integration messages. + * A reply from the downstream flow is produced back to the gRPC response payload. + * The request payload is a Proto message from gRPC request. + * The reply payload must be a Proto message for gRPC response. + *

+ * This gateway supports all the gRPC {@link MethodDescriptor.MethodType} types. + * All the requests are produced to downstream flow in a reactive manner via {@link #sendAndReceiveMessageReactive(Object)}. + * The {@link MethodDescriptor.MethodType#UNARY} and {@link MethodDescriptor.MethodType#BIDI_STREAMING} + * are same from the downstream handling logic perspective. + * The {@link MethodDescriptor.MethodType#CLIENT_STREAMING} produces a {@link Flux} of gRPC request payloads. + * The {@link MethodDescriptor.MethodType#SERVER_STREAMING} reply can be a single entity or a {@link Flux} of them. + *

+ * For convenience, the {@link GrpcHeaders} are populated into a request message. + * Such information can be used, for example, in downstream flow for routing. + * + * @author Artem Bilan + * + * @since 7.1 + */ +public class GrpcInboundGateway extends MessagingGatewaySupport implements BindableService { + + private final Class grpcServiceClass; + + @SuppressWarnings("NullAway.Init") + private Object asyncService; + + @SuppressWarnings("NullAway.Init") + private ServerServiceDefinition serverServiceDefinition; + + public GrpcInboundGateway(Class grpcServiceClass) { + this.grpcServiceClass = grpcServiceClass; + } + + @Override + protected void onInit() { + super.onInit(); + Class[] serviceInterfaces = + ClassUtils.getAllInterfacesForClass(this.grpcServiceClass, getApplicationContext().getClassLoader()); + + for (Class serviceInterface : serviceInterfaces) { + if ("AsyncService".equals(serviceInterface.getSimpleName())) { + createServiceProxyAndServerDefinition(serviceInterface); + break; + } + } + + Assert.state(this.asyncService != null, + "Only standard 'grpc' service are supported providing an 'AsyncService' contract."); + } + + @SuppressWarnings("NullAway") + private void createServiceProxyAndServerDefinition(Class serviceInterface) { + ProxyFactory proxyFactory = new ProxyFactory(serviceInterface, (MethodInterceptor) this::interceptGrpc); + proxyFactory.addAdvice(new DefaultMethodInvokingMethodInterceptor()); + this.asyncService = proxyFactory.getProxy(getApplicationContext().getClassLoader()); + Method bindServiceMethod = + ClassUtils.getStaticMethod(this.grpcServiceClass.getEnclosingClass(), "bindService", serviceInterface); + + this.serverServiceDefinition = + (ServerServiceDefinition) ReflectionUtils.invokeMethod(bindServiceMethod, null, this.asyncService); + } + + @Override + public ServerServiceDefinition bindService() { + return this.serverServiceDefinition; + } + + @SuppressWarnings({"unchecked", "NullAway"}) + private @Nullable Object interceptGrpc(MethodInvocation invocation) { + Object[] arguments = invocation.getArguments(); + + String fullMethodName = + this.serverServiceDefinition.getServiceDescriptor().getName() + + '/' + + StringUtils.capitalize(invocation.getMethod().getName()); + + MethodDescriptor serviceMethod = + this.serverServiceDefinition.getMethod(fullMethodName) + .getMethodDescriptor(); + + logger.debug(LogMessage.format("gRPC request for [%s] with arguments %s", + fullMethodName, Arrays.toString(arguments))); + + switch (serviceMethod.getType()) { + case UNARY -> { + unary(serviceMethod, arguments[0], (StreamObserver) arguments[1]); + return null; + } + case SERVER_STREAMING -> { + serverStreaming(serviceMethod, arguments[0], (StreamObserver) arguments[1]); + return null; + } + case CLIENT_STREAMING -> { + return clientStreaming(serviceMethod, (StreamObserver) arguments[0]); + } + case BIDI_STREAMING -> { + return bidiStreaming(serviceMethod, (StreamObserver) arguments[0]); + } + default -> throw new IllegalStateException("Unknown gRPC method type: " + serviceMethod.getType()); + } + } + + private void unary(MethodDescriptor methodDescriptor, Object requestPayload, + StreamObserver responseObserver) { + + sendRequestAndProduceReply(methodDescriptor, requestPayload) + .subscribe(responseObserver::onNext, responseObserver::onError, responseObserver::onCompleted); + } + + private void serverStreaming(MethodDescriptor methodDescriptor, Object requestPayload, + StreamObserver responseObserver) { + + sendRequestAndProduceReply(methodDescriptor, requestPayload) + .flatMapMany(payload -> payload instanceof Flux flux ? flux : Flux.just(payload)) + .subscribe(responseObserver::onNext, responseObserver::onError, responseObserver::onCompleted); + } + + private StreamObserver clientStreaming(MethodDescriptor methodDescriptor, + StreamObserver responseObserver) { + + Sinks.Many requestPayload = Sinks.many().unicast().onBackpressureBuffer(); + + return new StreamObserver<>() { + + @Override + public void onNext(Object value) { + requestPayload.tryEmitNext(value); + } + + @Override + public void onError(Throwable t) { + throw new IllegalStateException( + "gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed", t); + } + + @Override + public void onCompleted() { + requestPayload.tryEmitComplete(); + sendRequestAndProduceReply(methodDescriptor, requestPayload.asFlux()) + .subscribe(responseObserver::onNext, responseObserver::onError, + responseObserver::onCompleted); + } + + }; + } + + private StreamObserver bidiStreaming(MethodDescriptor methodDescriptor, + StreamObserver responseObserver) { + + return new StreamObserver<>() { + + @Override + public void onNext(Object value) { + sendRequestAndProduceReply(methodDescriptor, value) + .subscribe(responseObserver::onNext, responseObserver::onError); + } + + @Override + public void onError(Throwable t) { + throw new IllegalStateException( + "gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed", t); + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + + }; + } + + private Mono sendRequestAndProduceReply(MethodDescriptor serviceMethod, Object requestPayload) { + Message requestMessage = + getMessageBuilderFactory() + .withPayload(requestPayload) + .setHeader(GrpcHeaders.SERVICE, serviceMethod.getServiceName()) + .setHeader(GrpcHeaders.SERVICE_METHOD, serviceMethod.getBareMethodName()) + .setHeader(GrpcHeaders.METHOD_TYPE, serviceMethod.getType()) + .setHeader(GrpcHeaders.SCHEMA_DESCRIPTOR, serviceMethod.getSchemaDescriptor()) + .build(); + + return sendAndReceiveMessageReactive(requestMessage) + .map(Message::getPayload); + } + +} diff --git a/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/inbound/package-info.java b/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/inbound/package-info.java new file mode 100644 index 00000000000..faf8382a178 --- /dev/null +++ b/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/inbound/package-info.java @@ -0,0 +1,5 @@ +/** + * Components for server-side gRPC support. + */ +@org.jspecify.annotations.NullMarked +package org.springframework.integration.grpc.inbound; diff --git a/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/package-info.java b/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/package-info.java new file mode 100644 index 00000000000..c18c6a59e87 --- /dev/null +++ b/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/package-info.java @@ -0,0 +1,5 @@ +/** + * Base package for gRPC support. + */ +@org.jspecify.annotations.NullMarked +package org.springframework.integration.grpc; diff --git a/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/TestInProcessConfiguration.java b/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/TestInProcessConfiguration.java new file mode 100644 index 00000000000..aa709a5cb8f --- /dev/null +++ b/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/TestInProcessConfiguration.java @@ -0,0 +1,79 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.grpc; + +import java.io.IOException; + +import io.grpc.BindableService; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; + +/** + * @author Artem Bilan + * + * @since 7.1 + */ +@Configuration(proxyBeanMethods = false) +public class TestInProcessConfiguration implements DisposableBean { + + final String serverName = InProcessServerBuilder.generateName(); + + final InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName(this.serverName); + + volatile Server server; + + @Bean + ManagedChannel grpcChannel() { + return InProcessChannelBuilder.forName(this.serverName).build(); + } + + @Bean + BeanPostProcessor bindGrpcServicesPostProcessor() { + return new BeanPostProcessor() { + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + if (bean instanceof BindableService bindableService) { + TestInProcessConfiguration.this.serverBuilder.addService(bindableService); + } + return bean; + } + + }; + } + + @EventListener(ContextRefreshedEvent.class) + void startServer() throws IOException { + this.server = this.serverBuilder.build().start(); + } + + @Override + public void destroy() { + this.server.shutdown(); + } + +} diff --git a/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/inbound/GrpcInboundGatewayTests.java b/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/inbound/GrpcInboundGatewayTests.java new file mode 100644 index 00000000000..d01af0d46fc --- /dev/null +++ b/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/inbound/GrpcInboundGatewayTests.java @@ -0,0 +1,250 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.grpc.inbound; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.grpc.GrpcHeaders; +import org.springframework.integration.grpc.TestInProcessConfiguration; +import org.springframework.integration.grpc.proto.HelloReply; +import org.springframework.integration.grpc.proto.HelloRequest; +import org.springframework.integration.grpc.proto.TestHelloWorldGrpc; +import org.springframework.messaging.Message; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.StringUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Artem Bilan + * + * @since 7.1 + */ +@SpringJUnitConfig +@DirtiesContext +class GrpcInboundGatewayTests { + + @Autowired + TestHelloWorldGrpc.TestHelloWorldBlockingStub testHelloWorldBlockingStub; + + @Autowired + TestHelloWorldGrpc.TestHelloWorldFutureStub testHelloWorldFutureStub; + + @Autowired + TestHelloWorldGrpc.TestHelloWorldStub testHelloWorldStub; + + @Test + void unary() { + HelloReply reply = this.testHelloWorldBlockingStub.sayHello(newHelloRequest("World")); + assertThat(reply).extracting(HelloReply::getMessage).isEqualTo("Hello World"); + } + + @Test + void unaryFuture() throws ExecutionException, InterruptedException { + ListenableFuture reply = this.testHelloWorldFutureStub.sayHello(newHelloRequest("Future World")); + assertThat(reply.get()).extracting(HelloReply::getMessage).isEqualTo("Hello Future World"); + } + + @Test + void unaryAsync() throws InterruptedException { + AtomicReference reply = new AtomicReference<>(); + CountDownLatch replyLatch = new CountDownLatch(1); + this.testHelloWorldStub.sayHello(newHelloRequest("Observed World"), newReplyObserver(reply::set, replyLatch)); + + assertThat(replyLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + assertThat(reply.get()).extracting(HelloReply::getMessage).isEqualTo("Hello Observed World"); + } + + @Test + void serverStreaming() throws InterruptedException { + List replies = new ArrayList<>(); + CountDownLatch replyLatch = new CountDownLatch(1); + this.testHelloWorldStub.streamSayHello(newHelloRequest("Stream World"), + newReplyObserver(replies::add, replyLatch)); + + assertThat(replyLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + assertThat(replies.get(0)).extracting(HelloReply::getMessage).isEqualTo("Hello Stream World"); + assertThat(replies.get(1)).extracting(HelloReply::getMessage).isEqualTo("Hello again!"); + } + + @Test + void clientStreaming() throws InterruptedException { + AtomicReference reply = new AtomicReference<>(); + CountDownLatch replyLatch = new CountDownLatch(1); + + StreamObserver requestStreamObserver = + this.testHelloWorldStub.helloToEveryOne(newReplyObserver(reply::set, replyLatch)); + + String[] names = {"Anna", "Bill", "Tom"}; + + for (String name : names) { + requestStreamObserver.onNext(newHelloRequest(name)); + } + requestStreamObserver.onCompleted(); + + assertThat(replyLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + assertThat(reply.get()).extracting(HelloReply::getMessage).isEqualTo("Hello " + String.join(", ", names)); + } + + @Test + void bidiStreaming() throws InterruptedException { + List replies = new ArrayList<>(); + CountDownLatch replyLatch = new CountDownLatch(1); + + StreamObserver requestStreamObserver = + this.testHelloWorldStub.bidiStreamHello(newReplyObserver(replies::add, replyLatch)); + + String[] names = {"Sofia", "Mark", "Paul", "Martha"}; + + for (String name : names) { + requestStreamObserver.onNext(newHelloRequest(name)); + } + requestStreamObserver.onCompleted(); + + assertThat(replyLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + assertThat(replies) + .extracting(HelloReply::getMessage) + .containsAll(Arrays.stream(names).map("Hello "::concat).toList()); + } + + private static HelloRequest newHelloRequest(String message) { + return HelloRequest.newBuilder().setName(message).build(); + } + + private static StreamObserver newReplyObserver(Consumer replyConsumer, + CountDownLatch completionLatch) { + + return new StreamObserver<>() { + + @Override + public void onNext(R value) { + replyConsumer.accept(value); + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onCompleted() { + completionLatch.countDown(); + } + + }; + } + + @Configuration(proxyBeanMethods = false) + @Import(TestInProcessConfiguration.class) + @EnableIntegration + static class TestConfig { + + @Bean + TestHelloWorldGrpc.TestHelloWorldBlockingStub testHelloWorldBlockingStub(ManagedChannel grpcChannel) { + return TestHelloWorldGrpc.newBlockingStub(grpcChannel); + } + + @Bean + TestHelloWorldGrpc.TestHelloWorldFutureStub testHelloWorldFutureStub(ManagedChannel grpcChannel) { + return TestHelloWorldGrpc.newFutureStub(grpcChannel); + } + + @Bean + TestHelloWorldGrpc.TestHelloWorldStub testHelloWorldStub(ManagedChannel grpcChannel) { + return TestHelloWorldGrpc.newStub(grpcChannel); + } + + @Bean + GrpcInboundGateway helloWorldService() { + return new GrpcInboundGateway(TestHelloWorldGrpc.TestHelloWorldImplBase.class); + } + + @Bean + IntegrationFlow grpcIntegrationFlow(GrpcInboundGateway helloWorldService) { + return IntegrationFlow.from(helloWorldService) + .route(Message.class, message -> + message.getHeaders().get(GrpcHeaders.SERVICE_METHOD, String.class), + router -> router + + .subFlowMapping("SayHello", flow -> flow + .transform(this::requestReply)) + + .subFlowMapping("StreamSayHello", flow -> flow + .transform(this::streamReply)) + + .subFlowMapping("HelloToEveryOne", flow -> flow + .transformWith(transformSpec -> transformSpec + .transformer(this::streamRequest) + .async(true))) + + .subFlowMapping("BidiStreamHello", flow -> flow + .transform(this::requestReply)) + ) + .get(); + } + + private HelloReply requestReply(HelloRequest helloRequest) { + return newHelloReply("Hello " + helloRequest.getName()); + } + + private Flux streamReply(HelloRequest helloRequest) { + return Flux.just( + newHelloReply("Hello " + helloRequest.getName()), + newHelloReply("Hello again!")); + } + + private Mono streamRequest(Flux request) { + return request + .map(HelloRequest::getName) + .collectList() + .map(names -> StringUtils.collectionToDelimitedString(names, ", ")) + .map("Hello "::concat) + .map(TestConfig::newHelloReply); + } + + private static HelloReply newHelloReply(String message) { + return HelloReply.newBuilder().setMessage(message).build(); + } + + } + +} diff --git a/spring-integration-grpc/src/test/proto/test_hello.proto b/spring-integration-grpc/src/test/proto/test_hello.proto new file mode 100644 index 00000000000..1e77605b71c --- /dev/null +++ b/spring-integration-grpc/src/test/proto/test_hello.proto @@ -0,0 +1,40 @@ +syntax = "proto3"; + +package integration.grpc.test; + +option java_package = "org.springframework.integration.grpc.proto"; + +option java_multiple_files = true; + +option java_outer_classname = "HelloWorldProto"; + +// The greeting service definition. +service TestHelloWorld { + + // Sends a greeting + rpc SayHello(HelloRequest) returns (HelloReply) {} + + // Sends a greeting and something else + rpc StreamSayHello(HelloRequest) returns (stream HelloReply) {} + + // Sends a greeting to everyone presenting + rpc HelloToEveryOne(stream HelloRequest) returns (HelloReply) {} + + // Streams requests and replies + rpc BidiStreamHello(stream HelloRequest) returns (stream HelloReply) {} + +} + +// The request message containing the user's name. +message HelloRequest { + + string name = 1; + +} + +// The response message containing the greetings +message HelloReply { + + string message = 1; + +} \ No newline at end of file diff --git a/spring-integration-grpc/src/test/resources/log4j2-test.xml b/spring-integration-grpc/src/test/resources/log4j2-test.xml new file mode 100644 index 00000000000..b4768a0c870 --- /dev/null +++ b/spring-integration-grpc/src/test/resources/log4j2-test.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/src/checkstyle/checkstyle-suppressions.xml b/src/checkstyle/checkstyle-suppressions.xml index e769990b97f..7e2e12ab3b9 100644 --- a/src/checkstyle/checkstyle-suppressions.xml +++ b/src/checkstyle/checkstyle-suppressions.xml @@ -14,4 +14,5 @@ + diff --git a/src/reference/antora/modules/ROOT/nav.adoc b/src/reference/antora/modules/ROOT/nav.adoc index 1579ce0fb22..cd6d7d682d6 100644 --- a/src/reference/antora/modules/ROOT/nav.adoc +++ b/src/reference/antora/modules/ROOT/nav.adoc @@ -150,6 +150,7 @@ ** xref:ftp/server-events.adoc[] ** xref:ftp/remote-file-info.adoc[] * xref:graphql.adoc[] +* xref:grpc.adoc[] * xref:hazelcast.adoc[] * xref:http.adoc[] ** xref:http/inbound.adoc[] diff --git a/src/reference/antora/modules/ROOT/pages/grpc.adoc b/src/reference/antora/modules/ROOT/pages/grpc.adoc new file mode 100644 index 00000000000..4786a1b8083 --- /dev/null +++ b/src/reference/antora/modules/ROOT/pages/grpc.adoc @@ -0,0 +1,154 @@ +[[grpc]] += gRPC Support + +Starting with version 7.1, Spring Integration provides inbound and outbound gateways to communicate via https://grpc.io[gRPC] protocol. + +This dependency is required for the project: + +[tabs] +====== +Maven:: ++ +[source, xml, subs="normal", role="primary"] +---- + + org.springframework.integration + spring-integration-grpc + {project-version} + +---- + +Gradle:: ++ +[source, groovy, subs="normal", role="secondary"] +---- +compile "org.springframework.integration:spring-integration-grpc:{project-version}" +---- +====== + +Spring Integration components for gRPC are not generated from Protocol buffers, and they are not type-safe as typical gRPC service and stub implementations. +This is mostly due to the generic nature of the Spring Integration framework itself, where the unit of work is a `Message` abstraction and the payload type of this message is usually out of integration component internal logic scope. +Therefore, gRPC messages for service calls are sent and received as is without conversion assumptions. +For example, if gRCP service methods are like this: + +[source,protobuf] +---- +service TestHelloWorld { + + // Sends a greeting + rpc SayHello(HelloRequest) returns (HelloReply) {} + + // Sends a greeting and something else + rpc StreamSayHello(HelloRequest) returns (stream HelloReply) {} + + // Sends a greeting to everyone presenting + rpc HelloToEveryOne(stream HelloRequest) returns (HelloReply) {} + + // Streams requests and replies + rpc BidiStreamHello(stream HelloRequest) returns (stream HelloReply) {} + +} +---- + +The `HelloRequest` will be a request message payload on the inbound gateway (server) side, and has to be on the outbound gateway (client) side request. +Therefore, the `HelloReply` has to be a reply message payload on the inbound gateway, and will be received on the outbound gateway. + +The `GrpcHeaders` class contains convenient constants for header names used (and populated) in messages before and after gRPC gateways. +For example, the `GrpcHeaders.METHOD_TYPE` header contains a `io.grpc.MethodDescriptor.MethodType` enum value on the server side (inbound gateway) for easier downstream routing. +Another useful header is a `GrpcHeaders.SERVICE_METHOD` which indicates what gRPC service method was called on the server, or what gRPC service method to call from the client stub. + +NOTE: The `GrpcHeaders.SERVICE_METHOD` header on the inbound gateway has a value of the gRPC service method name exactly as it is declared in the Protobuf (see `.proto` example above) and how it is stored into the `io.grpc.MethodDescriptor` of the service definition. + +[[grpc-inbound-gateway]] +== Inbound Gateway for gRPC + +The `GrpcInboundGateway` is a `MessagingGatewaySupport` implementation to receive gRPC requests, send messages downstream flow and produce gRPC responses. +For initialization, the instance of this gateway requires only an abstract gRPC service class implementing `BindableService`, usually generated from Protobuf and comes with a `*ImplBase` class name. + +WARNING: Only standard gRPC services are supported: a generated `AsyncService` contract is what `GrpcInboundGateway` logic based on. +The Reactor and Kotlin-based service generation don't make sense in Spring Integration logic since those types are not exposed anyhow from the gateway definition. + +The gateway uses the mentioned `AsyncService` interface to create proxy and intercept gRPC service methods. + +The following example demonstrates how to configure a `GrpcInboundGateway`: + +[source, java] +---- +@Bean +GrpcInboundGateway helloWorldService() { + return new GrpcInboundGateway(TestHelloWorldGrpc.TestHelloWorldImplBase.class); +} +---- + +The `GrpcInboundGateway` implements a `BindableService` and exposes a `ServerServiceDefinition` based on the mentioned proxy for an `AsyncService` contract of the gRPC service. +Therefore, exactly instance of this gateway has to be registered into a `ServerBuilder` and no need in any other `*ImplBase` implementations in the application. + +IMPORTANT: With https://spring.io/projects/spring-grpc[Spring gRPC] and its auto-discovery for `BindableService` implementations, the `GrpcInboundGateway` has to be declared as a top-level bean. +Therefore, Java DSL API like `IntegrationFlow.from(new GrpcInboundGateway(TestHelloWorldGrpc.TestHelloWorldImplBase.class))` is not recommended because such a `BindableService` implementation won't make it visible for respective Spring gRPC infrastructure. + +The `GrpcInboundGateway` uses a `sendAndReceiveMessageReactive()` API to interact with the downstream flow and adapts a `Mono` reply to the gRPC `StreamObserver`. +As mentioned before, the request message payload is exactly a gRPC request message, and it expects a reply in a from of gRPC response message. +The downstream logic could be type-safe and deal with gRPC messages similar way as if `*ImplBase` would be implemented manually. + +The `MethodDescriptor.MethodType.UNARY` and `MethodDescriptor.MethodType.BIDI_STREAMING` are same from the downstream handling logic perspective. +In other words, the `BIDI_STREAMING` is handled as a loop on request items and the gateway produces a response item for each of them. +For different `BIDI_STREAMING` logic, the regular gRPC service implementation is recommended. + +The `MethodDescriptor.MethodType.CLIENT_STREAMING` mode produces a message with a `Flux` as a payload of gRPC request items. + +For the `MethodDescriptor.MethodType.SERVER_STREAMING` mode a reply payload can be a single gRPC response message or a `Flux` of them. + +The following example demonstrates an `IntegrationFlow` implementation for the mentioned `TestHelloWorldGrpc.TestHelloWorldImplBase` service: + +[source, java] +---- +@Bean +IntegrationFlow grpcIntegrationFlow(GrpcInboundGateway helloWorldService) { + return IntegrationFlow.from(helloWorldService) + .route(Message.class, message -> + message.getHeaders().get(GrpcHeaders.SERVICE_METHOD, String.class), + router -> router + + .subFlowMapping("SayHello", flow -> flow + .transform(this::requestReply)) + + .subFlowMapping("StreamSayHello", flow -> flow + .transform(this::streamReply)) + + .subFlowMapping("HelloToEveryOne", flow -> flow + .transformWith(transformSpec -> transformSpec + .transformer(this::streamRequest) + .async(true))) + + .subFlowMapping("BidiStreamHello", flow -> flow + .transform(this::requestReply)) + ) + .get(); +} + +private HelloReply requestReply(HelloRequest helloRequest) { + return newHelloReply("Hello " + helloRequest.getName()); +} + +private Flux streamReply(HelloRequest helloRequest) { + return Flux.just( + newHelloReply("Hello " + helloRequest.getName()), + newHelloReply("Hello again!")); +} + +private Mono streamRequest(Flux request) { + return request + .map(HelloRequest::getName) + .collectList() + .map(names -> StringUtils.collectionToDelimitedString(names, ", ")) + .map("Hello "::concat) + .map(TestConfig::newHelloReply); +} + +private static HelloReply newHelloReply(String message) { + return HelloReply.newBuilder().setMessage(message).build(); +} +---- + +The routing is done on the `GrpcHeaders.SERVICE_METHOD` header populated by the `GrpcInboundGateway`. +All the downstream transformer business methods are type-safe in regard to gRPC messages for the `TestHelloWorldGrpc.TestHelloWorldImplBase` service. \ No newline at end of file From 72be7934bb2e262f3c85106044d90e0f61d85646 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 18 Dec 2025 10:52:51 -0500 Subject: [PATCH 2/2] * Handle error as `StatusRuntimeException` * Add test with error from server * Fix typos in the doc --- .../grpc/inbound/GrpcInboundGateway.java | 32 ++++++++++++++----- .../grpc/TestInProcessConfiguration.java | 2 +- .../grpc/inbound/GrpcInboundGatewayTests.java | 21 ++++++++++++ .../src/test/proto/test_hello.proto | 3 ++ .../antora/modules/ROOT/pages/grpc.adoc | 8 ++--- 5 files changed, 53 insertions(+), 13 deletions(-) diff --git a/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/inbound/GrpcInboundGateway.java b/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/inbound/GrpcInboundGateway.java index 1eb9e46cdaf..34b93428f4e 100644 --- a/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/inbound/GrpcInboundGateway.java +++ b/spring-integration-grpc/src/main/java/org/springframework/integration/grpc/inbound/GrpcInboundGateway.java @@ -22,6 +22,8 @@ import io.grpc.BindableService; import io.grpc.MethodDescriptor; import io.grpc.ServerServiceDefinition; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; @@ -152,7 +154,9 @@ private void unary(MethodDescriptor methodDescriptor, Object requestPayloa StreamObserver responseObserver) { sendRequestAndProduceReply(methodDescriptor, requestPayload) - .subscribe(responseObserver::onNext, responseObserver::onError, responseObserver::onCompleted); + .subscribe(responseObserver::onNext, + t -> responseObserver.onError(toGrpcStatusException(t)), + responseObserver::onCompleted); } private void serverStreaming(MethodDescriptor methodDescriptor, Object requestPayload, @@ -160,7 +164,9 @@ private void serverStreaming(MethodDescriptor methodDescriptor, Object req sendRequestAndProduceReply(methodDescriptor, requestPayload) .flatMapMany(payload -> payload instanceof Flux flux ? flux : Flux.just(payload)) - .subscribe(responseObserver::onNext, responseObserver::onError, responseObserver::onCompleted); + .subscribe(responseObserver::onNext, + t -> responseObserver.onError(toGrpcStatusException(t)), + responseObserver::onCompleted); } private StreamObserver clientStreaming(MethodDescriptor methodDescriptor, @@ -177,15 +183,15 @@ public void onNext(Object value) { @Override public void onError(Throwable t) { - throw new IllegalStateException( - "gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed", t); + throw toGrpcStatusException(t, "gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed"); } @Override public void onCompleted() { requestPayload.tryEmitComplete(); sendRequestAndProduceReply(methodDescriptor, requestPayload.asFlux()) - .subscribe(responseObserver::onNext, responseObserver::onError, + .subscribe(responseObserver::onNext, + t -> responseObserver.onError(toGrpcStatusException(t)), responseObserver::onCompleted); } @@ -200,13 +206,13 @@ private StreamObserver bidiStreaming(MethodDescriptor methodDescriptor, @Override public void onNext(Object value) { sendRequestAndProduceReply(methodDescriptor, value) - .subscribe(responseObserver::onNext, responseObserver::onError); + .subscribe(responseObserver::onNext, + t -> responseObserver.onError(toGrpcStatusException(t))); } @Override public void onError(Throwable t) { - throw new IllegalStateException( - "gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed", t); + throw toGrpcStatusException(t, "gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed"); } @Override @@ -231,4 +237,14 @@ private Mono sendRequestAndProduceReply(MethodDescriptor serviceMethod, .map(Message::getPayload); } + private static StatusRuntimeException toGrpcStatusException(Throwable throwable) { + return toGrpcStatusException(throwable, throwable.getMessage()); + } + + private static StatusRuntimeException toGrpcStatusException(Throwable throwable, @Nullable String description) { + return Status.fromThrowable(throwable) + .withDescription(description) + .asRuntimeException(); + } + } diff --git a/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/TestInProcessConfiguration.java b/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/TestInProcessConfiguration.java index aa709a5cb8f..0b969f38e98 100644 --- a/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/TestInProcessConfiguration.java +++ b/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/TestInProcessConfiguration.java @@ -73,7 +73,7 @@ void startServer() throws IOException { @Override public void destroy() { - this.server.shutdown(); + this.server.shutdownNow(); } } diff --git a/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/inbound/GrpcInboundGatewayTests.java b/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/inbound/GrpcInboundGatewayTests.java index d01af0d46fc..b3363eaa3c3 100644 --- a/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/inbound/GrpcInboundGatewayTests.java +++ b/spring-integration-grpc/src/test/java/org/springframework/integration/grpc/inbound/GrpcInboundGatewayTests.java @@ -27,6 +27,8 @@ import com.google.common.util.concurrent.ListenableFuture; import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; @@ -49,6 +51,7 @@ import org.springframework.util.StringUtils; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; /** * @author Artem Bilan @@ -146,6 +149,18 @@ void bidiStreaming() throws InterruptedException { .containsAll(Arrays.stream(names).map("Hello "::concat).toList()); } + @Test + void errorFromServer() { + assertThatExceptionOfType(StatusRuntimeException.class) + .isThrownBy(() -> this.testHelloWorldBlockingStub.errorOnHello(newHelloRequest("Error"))) + .satisfies(e -> { + assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(e.getStatus().getDescription()) + .contains("Failed to transform Message in bean " + + "'grpcIntegrationFlow.subFlow#4.method-invoking-transformer#1'"); + }); + } + private static HelloRequest newHelloRequest(String message) { return HelloRequest.newBuilder().setName(message).build(); } @@ -218,6 +233,12 @@ IntegrationFlow grpcIntegrationFlow(GrpcInboundGateway helloWorldService) { .subFlowMapping("BidiStreamHello", flow -> flow .transform(this::requestReply)) + + .subFlowMapping("ErrorOnHello", flow -> flow + .transform(p -> { + throw Status.UNAVAILABLE.withDescription("intentional") + .asRuntimeException(); + })) ) .get(); } diff --git a/spring-integration-grpc/src/test/proto/test_hello.proto b/spring-integration-grpc/src/test/proto/test_hello.proto index 1e77605b71c..186f2d84def 100644 --- a/spring-integration-grpc/src/test/proto/test_hello.proto +++ b/spring-integration-grpc/src/test/proto/test_hello.proto @@ -23,6 +23,9 @@ service TestHelloWorld { // Streams requests and replies rpc BidiStreamHello(stream HelloRequest) returns (stream HelloReply) {} + // Fail with error + rpc ErrorOnHello(HelloRequest) returns (HelloReply) {} + } // The request message containing the user's name. diff --git a/src/reference/antora/modules/ROOT/pages/grpc.adoc b/src/reference/antora/modules/ROOT/pages/grpc.adoc index 4786a1b8083..973a5ab8e8c 100644 --- a/src/reference/antora/modules/ROOT/pages/grpc.adoc +++ b/src/reference/antora/modules/ROOT/pages/grpc.adoc @@ -66,7 +66,7 @@ The `GrpcInboundGateway` is a `MessagingGatewaySupport` implementation to receiv For initialization, the instance of this gateway requires only an abstract gRPC service class implementing `BindableService`, usually generated from Protobuf and comes with a `*ImplBase` class name. WARNING: Only standard gRPC services are supported: a generated `AsyncService` contract is what `GrpcInboundGateway` logic based on. -The Reactor and Kotlin-based service generation don't make sense in Spring Integration logic since those types are not exposed anyhow from the gateway definition. +The Reactor and Kotlin-based service generation don't make sense in Spring Integration logic since those types are not exposed from the gateway definition. The gateway uses the mentioned `AsyncService` interface to create proxy and intercept gRPC service methods. @@ -81,17 +81,17 @@ GrpcInboundGateway helloWorldService() { ---- The `GrpcInboundGateway` implements a `BindableService` and exposes a `ServerServiceDefinition` based on the mentioned proxy for an `AsyncService` contract of the gRPC service. -Therefore, exactly instance of this gateway has to be registered into a `ServerBuilder` and no need in any other `*ImplBase` implementations in the application. +Therefore, an instance of this gateway has to be registered into a `ServerBuilder` and no need in any other `*ImplBase` implementations in the application. IMPORTANT: With https://spring.io/projects/spring-grpc[Spring gRPC] and its auto-discovery for `BindableService` implementations, the `GrpcInboundGateway` has to be declared as a top-level bean. Therefore, Java DSL API like `IntegrationFlow.from(new GrpcInboundGateway(TestHelloWorldGrpc.TestHelloWorldImplBase.class))` is not recommended because such a `BindableService` implementation won't make it visible for respective Spring gRPC infrastructure. The `GrpcInboundGateway` uses a `sendAndReceiveMessageReactive()` API to interact with the downstream flow and adapts a `Mono` reply to the gRPC `StreamObserver`. As mentioned before, the request message payload is exactly a gRPC request message, and it expects a reply in a from of gRPC response message. -The downstream logic could be type-safe and deal with gRPC messages similar way as if `*ImplBase` would be implemented manually. +The downstream logic can be type-safe and deal with gRPC messages similar way as if `*ImplBase` would be implemented manually. The `MethodDescriptor.MethodType.UNARY` and `MethodDescriptor.MethodType.BIDI_STREAMING` are same from the downstream handling logic perspective. -In other words, the `BIDI_STREAMING` is handled as a loop on request items and the gateway produces a response item for each of them. +In other words, the `BIDI_STREAMING` is handled as a loop on request items and the gateway produces a response item immediately into the response `StreamObserver`. For different `BIDI_STREAMING` logic, the regular gRPC service implementation is recommended. The `MethodDescriptor.MethodType.CLIENT_STREAMING` mode produces a message with a `Flux` as a payload of gRPC request items.