diff --git a/build.gradle b/build.gradle index 4ec14970d0..2684b30b7c 100644 --- a/build.gradle +++ b/build.gradle @@ -44,6 +44,7 @@ ext { assertkVersion = '0.24' awaitilityVersion = '4.2.0' commonsCompressVersion = '1.20' + commonsHttpClientVersion = '5.1.3' commonsPoolVersion = '2.11.1' googleJsr305Version = '3.0.2' hamcrestVersion = '2.2' @@ -387,7 +388,9 @@ project('spring-rabbit') { api 'org.springframework:spring-messaging' api 'org.springframework:spring-tx' optionalApi 'org.springframework:spring-webflux' + optionalApi "org.apache.httpcomponents.client5:httpclient5:$commonsHttpClientVersion" optionalApi 'io.projectreactor:reactor-core' + optionalApi 'io.projectreactor.netty:reactor-netty-http' optionalApi "ch.qos.logback:logback-classic:$logbackVersion" optionalApi 'org.apache.logging.log4j:log4j-core' optionalApi 'io.micrometer:micrometer-core' @@ -473,6 +476,7 @@ project('spring-rabbit-stream') { testRuntimeOnly "com.github.luben:zstd-jni:$zstdJniVersion" testImplementation "org.testcontainers:rabbitmq:1.17.3" testImplementation "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion" + testImplementation 'org.springframework:spring-webflux' } } @@ -488,14 +492,12 @@ project('spring-rabbit-junit') { exclude group: 'org.hamcrest', module: 'hamcrest-core' } api "com.rabbitmq:amqp-client:$rabbitmqVersion" - api 'org.springframework:spring-webflux' + api 'org.springframework:spring-web' api 'org.junit.jupiter:junit-jupiter-api' api "org.assertj:assertj-core:$assertjVersion" optionalApi "ch.qos.logback:logback-classic:$logbackVersion" optionalApi 'org.apache.logging.log4j:log4j-core' compileOnly 'org.apiguardian:apiguardian-api:1.0.0' - testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core' - testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind' } } diff --git a/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java b/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java index 90b2d9f7e3..9caf727a57 100644 --- a/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java +++ b/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java @@ -17,11 +17,16 @@ package org.springframework.amqp.rabbit.junit; import java.io.IOException; +import java.net.Authenticator; +import java.net.PasswordAuthentication; import java.net.URI; import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -33,12 +38,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.MediaType; import org.springframework.util.Base64Utils; import org.springframework.util.StringUtils; -import org.springframework.web.reactive.function.client.ExchangeFilterFunctions; -import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.util.UriUtils; import com.rabbitmq.client.Channel; @@ -388,22 +389,35 @@ private Channel createQueues(Connection connection) throws IOException, URISynta } private boolean alivenessTest() throws URISyntaxException { - WebClient client = WebClient.builder() - .filter(ExchangeFilterFunctions.basicAuthentication(this.adminUser, this.adminPassword)) + HttpClient client = HttpClient.newBuilder() + .authenticator(new Authenticator() { + + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(getAdminUser(), getAdminPassword().toCharArray()); + } + + }) .build(); URI uri = new URI(getAdminUri()) .resolve("/api/aliveness-test/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8)); - HashMap result = client.get() + HttpRequest request = HttpRequest.newBuilder() + .GET() .uri(uri) - .accept(MediaType.APPLICATION_JSON) - .retrieve() - .bodyToMono(new ParameterizedTypeReference>() { - }) - .block(Duration.ofSeconds(10)); // NOSONAR magic# - if (result != null) { - return result.get("status").equals("ok"); + .build(); + HttpResponse response; + try { + response = client.send(request, BodyHandlers.ofString()); + } + catch (IOException ex) { + LOGGER.error("Exception checking admin aliveness", ex); + return false; + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return false; } - return false; + return response.body().contentEquals("{\"status\":\"ok\"}"); } public static boolean fatal() { diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java index decb22fa1c..83885e82cc 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java @@ -35,6 +35,7 @@ import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; /** * A {@link RoutingConnectionFactory} that determines the node on which a queue is located and @@ -54,6 +55,13 @@ */ public class LocalizedQueueConnectionFactory implements ConnectionFactory, RoutingConnectionFactory, DisposableBean { + private static final boolean USING_WEBFLUX; + + static { + USING_WEBFLUX = ClassUtils.isPresent("org.springframework.web.reactive.function.client.WebClient", + LocalizedQueueConnectionFactory.class.getClassLoader()); + } + private final Log logger = LogFactory.getLog(getClass()); private final Map nodeFactories = new HashMap(); @@ -82,7 +90,7 @@ public class LocalizedQueueConnectionFactory implements ConnectionFactory, Routi private final String trustStorePassPhrase; - private NodeLocator nodeLocator = new WebFluxNodeLocator(); + private NodeLocator nodeLocator; /** * @param defaultConnectionFactory the fallback connection factory to use if the queue @@ -190,6 +198,12 @@ private LocalizedQueueConnectionFactory(ConnectionFactory defaultConnectionFacto this.trustStore = trustStore; this.keyStorePassPhrase = keyStorePassPhrase; this.trustStorePassPhrase = trustStorePassPhrase; + if (USING_WEBFLUX) { + this.nodeLocator = new WebFluxNodeLocator(); + } + else { + this.nodeLocator = new RestTemplateNodeLocator(); + } } private static Map nodesAddressesToMap(String[] nodes, String[] addresses) { @@ -206,6 +220,7 @@ private static Map nodesAddressesToMap(String[] nodes, String[] * @since 2.4.8 */ public void setNodeLocator(NodeLocator nodeLocator) { + Assert.notNull(nodeLocator, "'nodeLocator' cannot be null"); this.nodeLocator = nodeLocator; } @@ -378,7 +393,7 @@ default ConnectionFactory locate(String[] adminUris, Map nodeToA try { String uri = new URI(adminUri) .resolve("/api/queues/").toString(); - HashMap queueInfo = restCall(client, uri, vhost, queue); + Map queueInfo = restCall(client, uri, vhost, queue); if (queueInfo != null) { String node = (String) queueInfo.get("node"); if (node != null) { @@ -423,13 +438,15 @@ default void close(T client) { /** * Retrieve a map of queue properties using the RabbitMQ Management REST API. + * @param client the client. * @param baseUri the base uri. * @param vhost the virtual host. * @param queue the queue name. * @return the map of queue properties. * @throws URISyntaxException if the syntax is bad. */ - HashMap restCall(T client, String baseUri, String vhost, String queue) + @Nullable + Map restCall(T client, String baseUri, String vhost, String queue) throws URISyntaxException; } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateHolder.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateHolder.java new file mode 100644 index 0000000000..056435925c --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateHolder.java @@ -0,0 +1,41 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.connection; + +import org.springframework.web.client.RestTemplate; + +/** + * Holder for a {@link RestTemplate} and credentials. + * + * @author Gary Russell + * @since 2.4.8 + * + */ +class RestTemplateHolder { + + final String userName; // NOSONAR + + final String password; // NOSONAR + + RestTemplate template; // NOSONAR + + RestTemplateHolder(String userName, String password) { + this.userName = userName; + this.password = password; + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateNodeLocator.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateNodeLocator.java new file mode 100644 index 0000000000..be7fcae452 --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateNodeLocator.java @@ -0,0 +1,97 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.connection; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import org.apache.hc.client5.http.auth.AuthCache; +import org.apache.hc.client5.http.impl.auth.BasicAuthCache; +import org.apache.hc.client5.http.impl.auth.BasicScheme; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.protocol.BasicHttpContext; +import org.apache.hc.core5.http.protocol.HttpContext; + +import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory.NodeLocator; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.http.client.support.BasicAuthenticationInterceptor; +import org.springframework.lang.Nullable; +import org.springframework.web.client.RestTemplate; +import org.springframework.web.util.UriUtils; + +/** + * A {@link NodeLocator} using the {@link RestTemplate}. + * + * @author Gary Russell + * @since 3.0 + * + */ +public class RestTemplateNodeLocator implements NodeLocator { + + @Override + public RestTemplateHolder createClient(String userName, String password) { + return new RestTemplateHolder(userName, password); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + @Nullable + public Map restCall(RestTemplateHolder client, String baseUri, String vhost, String queue) + throws URISyntaxException { + + if (client.template == null) { + URI uri = new URI(baseUri); + HttpHost host = new HttpHost(uri.getHost(), uri.getPort()); + client.template = new RestTemplate(new HttpComponentsClientHttpRequestFactory() { + + @Override + @Nullable + protected HttpContext createHttpContext(HttpMethod httpMethod, URI uri) { + AuthCache cache = new BasicAuthCache(); + BasicScheme scheme = new BasicScheme(); + cache.put(host, scheme); + BasicHttpContext context = new BasicHttpContext(); + context.setAttribute(HttpClientContext.AUTH_CACHE, cache); + return context; + } + + }); + client.template.getInterceptors().add(new BasicAuthenticationInterceptor(client.userName, client.password)); + } + URI uri = new URI(baseUri) + .resolve("/api/queues/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/" + queue); + ResponseEntity response = client.template.exchange(uri, HttpMethod.GET, null, Map.class); + return response.getStatusCode().equals(HttpStatus.OK) ? response.getBody() : null; + } + + @Override + public void close(RestTemplateHolder client) { + try { + client.template.close(); + } + catch (IOException e) { + } + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java index bedf67f017..5179ea378a 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java @@ -21,10 +21,12 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.HashMap; +import java.util.Map; import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory.NodeLocator; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.MediaType; +import org.springframework.lang.Nullable; import org.springframework.web.reactive.function.client.ExchangeFilterFunctions; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.util.UriUtils; @@ -39,7 +41,8 @@ public class WebFluxNodeLocator implements NodeLocator { @Override - public HashMap restCall(WebClient client, String baseUri, String vhost, String queue) + @Nullable + public Map restCall(WebClient client, String baseUri, String vhost, String queue) throws URISyntaxException { URI uri = new URI(baseUri) diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java index 9a20edf235..9c2a2dd7d2 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java @@ -83,6 +83,9 @@ void findLocal() { ConnectionFactory cf = lqcf.getTargetConnectionFactory("[local]"); RabbitAdmin admin = new RabbitAdmin(cf); assertThat(admin.getQueueProperties("local")).isNotNull(); + lqcf.setNodeLocator(new RestTemplateNodeLocator()); + ConnectionFactory cf2 = lqcf.getTargetConnectionFactory("[local]"); + assertThat(cf2).isSameAs(cf); lqcf.destroy(); } diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index e206ebf140..36ea2bcacc 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -796,7 +796,9 @@ Notice that the first three parameters are arrays of `addresses`, `adminUris`, a These are positional in that, when a container attempts to connect to a queue, it uses the admin API to determine which node is the lead for the queue and connects to the address in the same array position as that node. IMPORTANT: Starting with version 3.0, the RabbitMQ `http-client` is no longer used to access the Rest API. -Instead, by default, the `WebClient` from Spring Webflux is used; which is not added to the class path by default. +Instead, by default, the `WebClient` from Spring Webflux is used if `spring-webflux` is on the class path; otherwise a `RestTemplate` is used. + +To add `WebFlux` to the class path: .Maven ==== @@ -821,7 +823,7 @@ You can also use other REST technology by implementing `LocalizedQueueConnection ==== [source, java] ---- -lqcf.setNodeLocator(new DefaultNodeLocator() { +lqcf.setNodeLocator(new NodeLocator() { @Override public MyClient createClient(String userName, String password) { @@ -837,6 +839,8 @@ lqcf.setNodeLocator(new DefaultNodeLocator() { ---- ==== +The framework provides the `WebFluxNodeLocator` and `RestTemplateNodeLocator`, with the default as discussed above. + [[cf-pub-conf-ret]] ===== Publisher Confirms and Returns