diff --git a/build.gradle b/build.gradle index d7366429f..ac8e51c65 100644 --- a/build.gradle +++ b/build.gradle @@ -378,6 +378,7 @@ project('spring-xd-dirt') { compile "org.springframework.cloud:spring-cloud-cloudfoundry-connector" compile "org.springframework.boot:spring-boot-autoconfigure" compile "org.springframework.boot:spring-boot-actuator" + compile "org.springframework:spring-web" compile("org.springframework.boot:spring-boot-starter-security") { exclude group: 'org.springframework.boot', module: "spring-boot-starter-logging" } @@ -1086,6 +1087,7 @@ project('spring-xd-test') { compile "org.springframework:spring-context" compile "org.springframework:spring-context-support" compile "org.springframework:spring-tx" + compile "org.springframework:spring-web" compile "org.springframework:spring-test" compile "org.springframework.data:spring-data-hadoop-test" compile "redis.clients:jedis" diff --git a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/BusCleaner.java b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/BusCleaner.java new file mode 100644 index 000000000..a9c83a5ee --- /dev/null +++ b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/BusCleaner.java @@ -0,0 +1,39 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.xd.dirt.integration.bus; + +import java.util.List; +import java.util.Map; + + +/** + * Interface for implementations that perform cleanup for message buses. + * + * @author Gary Russell + * @since 1.2 + */ +public interface BusCleaner { + + /** + * Clean up all resources for the supplied stream/job. + * @param entity the stream or job. + * @param isJob true if the entity is a job. + * @return a map of lists of resources removed. + */ + Map> clean(String entity, boolean isJob); + +} diff --git a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/rabbit/NothingToDeleteException.java b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/rabbit/NothingToDeleteException.java new file mode 100644 index 000000000..15b792d6a --- /dev/null +++ b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/rabbit/NothingToDeleteException.java @@ -0,0 +1,33 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.xd.dirt.integration.bus.rabbit; + + +/** + * Thrown when a delete operation has nothing to delete. + * + * @author Gary Russell + * @since 1.2 + */ +@SuppressWarnings("serial") +public class NothingToDeleteException extends RabbitAdminException { + + public NothingToDeleteException(String message) { + super(message); + } + +} diff --git a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitAdminException.java b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitAdminException.java new file mode 100644 index 000000000..75003c81e --- /dev/null +++ b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitAdminException.java @@ -0,0 +1,39 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.xd.dirt.integration.bus.rabbit; + +import org.springframework.xd.dirt.XDRuntimeException; + + +/** + * Exceptions thrown while interfacing with the RabbitMQ admin plugin. + * + * @author Gary Russell + * @since 1.2 + */ +@SuppressWarnings("serial") +public class RabbitAdminException extends XDRuntimeException { + + public RabbitAdminException(String message, Throwable cause) { + super(message, cause); + } + + public RabbitAdminException(String message) { + super(message); + } + +} diff --git a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitBusCleaner.java b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitBusCleaner.java new file mode 100644 index 000000000..bafec10de --- /dev/null +++ b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitBusCleaner.java @@ -0,0 +1,299 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.xd.dirt.integration.bus.rabbit; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.AuthCache; +import org.apache.http.client.HttpClient; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.impl.auth.BasicScheme; +import org.apache.http.impl.client.BasicAuthCache; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.protocol.HttpContext; + +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.http.converter.HttpMessageConverter; +import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; +import org.springframework.web.client.HttpClientErrorException; +import org.springframework.web.client.RestTemplate; +import org.springframework.web.util.UriComponentsBuilder; +import org.springframework.xd.dirt.integration.bus.BusCleaner; +import org.springframework.xd.dirt.integration.bus.MessageBusSupport; +import org.springframework.xd.dirt.plugins.AbstractJobPlugin; +import org.springframework.xd.dirt.plugins.AbstractStreamPlugin; +import org.springframework.xd.dirt.plugins.job.JobEventsListenerPlugin; + +import com.google.common.annotations.VisibleForTesting; + + +/** + * Implementation of {@link BusCleaner} for the {@code RabbitMessageBus}. + * + * @author Gary Russell + * @since 1.2 + */ +public class RabbitBusCleaner implements BusCleaner { + + private final static Log logger = LogFactory.getLog(RabbitBusCleaner.class); + + @Override + public Map> clean(String entity, boolean isJob) { + return clean("http://localhost:15672", "guest", "guest", "/", "xdbus.", entity, isJob); + } + + public Map> clean(String adminUri, String user, String pw, String vhost, + String busPrefix, String entity, boolean isJob) { + return doClean( + adminUri == null ? "http://localhost:15672" : adminUri, + user == null ? "guest" : user, + pw == null ? "guest" : pw, + vhost == null ? "/" : vhost, + busPrefix == null ? "xdbus." : busPrefix, + entity, isJob); + } + + private Map> doClean(String adminUri, String user, String pw, String vhost, + String busPrefix, String entity, boolean isJob) { + RestTemplate restTemplate = buildRestTemplate(adminUri, user, pw); + List removedQueues = isJob + ? findJobQueues(adminUri, vhost, busPrefix, entity, restTemplate) + : findStreamQueues(adminUri, vhost, busPrefix, entity, restTemplate); + ExchangeCandidateCallback callback; + if (isJob) { + Collection exchangeNames = JobEventsListenerPlugin.getEventListenerChannels(entity).values(); + final Set jobExchanges = new HashSet<>(); + for (String exchange : exchangeNames) { + jobExchanges.add(MessageBusSupport.applyPrefix(busPrefix, MessageBusSupport.applyPubSub(exchange))); + } + jobExchanges.add(MessageBusSupport.applyPrefix(busPrefix, MessageBusSupport.applyPubSub( + JobEventsListenerPlugin.getEventListenerChannelName(entity)))); + callback = new ExchangeCandidateCallback() { + + @Override + public boolean isCandidate(String exchangeName) { + return jobExchanges.contains(exchangeName); + } + + }; + } + else { + final String tapPrefix = MessageBusSupport.applyPrefix(busPrefix, + MessageBusSupport.applyPubSub(AbstractStreamPlugin.constructTapPrefix(entity))); + callback = new ExchangeCandidateCallback() { + + @Override + public boolean isCandidate(String exchangeName) { + return exchangeName.startsWith(tapPrefix); + } + }; + } + List removedExchanges = findExchanges(adminUri, vhost, busPrefix, entity, restTemplate, callback); + // Delete the queues in reverse order to enable re-running after a partial success. + // The queue search above starts with 0 and terminates on a not found. + for (int i = removedQueues.size() - 1; i >= 0; i--) { + String queueName = removedQueues.get(i); + URI uri = UriComponentsBuilder.fromUriString(adminUri + "/api") + .pathSegment("queues", "{vhost}", "{stream}") + .buildAndExpand(vhost, queueName).encode().toUri(); + restTemplate.delete(uri); + if (logger.isDebugEnabled()) { + logger.debug("deleted queue: " + queueName); + } + } + Map> results = new HashMap<>(); + if (removedQueues.size() > 0) { + results.put("queues", removedQueues); + } + // Fanout exchanges for taps + for (String exchange : removedExchanges) { + URI uri = UriComponentsBuilder.fromUriString(adminUri + "/api") + .pathSegment("exchanges", "{vhost}", "{name}") + .buildAndExpand(vhost, exchange).encode().toUri(); + restTemplate.delete(uri); + if (logger.isDebugEnabled()) { + logger.debug("deleted exchange: " + exchange); + } + } + if (removedExchanges.size() > 0) { + results.put("exchanges", removedExchanges); + } + return results; + } + + private List findStreamQueues(String adminUri, String vhost, String busPrefix, String stream, + RestTemplate restTemplate) { + List removedQueues = new ArrayList<>(); + int n = 0; + while (true) { // exits when no queue found + String queueName = MessageBusSupport.applyPrefix(busPrefix, + AbstractStreamPlugin.constructPipeName(stream, n++)); + URI uri = UriComponentsBuilder.fromUriString(adminUri + "/api") + .pathSegment("queues", "{vhost}", "{stream}") + .buildAndExpand(vhost, queueName).encode().toUri(); + try { + getQueueDetails(restTemplate, queueName, uri); + removedQueues.add(queueName); + } + catch (HttpClientErrorException e) { + if (e.getStatusCode().equals(HttpStatus.NOT_FOUND)) { + break; // No more for this stream + } + throw new RabbitAdminException("Failed to lookup queue " + queueName, e); + } + queueName = MessageBusSupport.constructDLQName(queueName); + uri = UriComponentsBuilder.fromUriString(adminUri + "/api") + .pathSegment("queues", "{vhost}", "{stream}") + .buildAndExpand(vhost, queueName).encode().toUri(); + try { + getQueueDetails(restTemplate, queueName, uri); + removedQueues.add(queueName); + } + catch (HttpClientErrorException e) { + if (e.getStatusCode().equals(HttpStatus.NOT_FOUND)) { + continue; // DLQs are not mandatory + } + throw new RabbitAdminException("Failed to lookup queue " + queueName, e); + } + } + return removedQueues; + } + + private List findJobQueues(String adminUri, String vhost, String busPrefix, String job, + RestTemplate restTemplate) { + List removedQueues = new ArrayList<>(); + String jobQueueName = MessageBusSupport.applyPrefix(busPrefix, AbstractJobPlugin.getJobChannelName(job)); + URI uri = UriComponentsBuilder.fromUriString(adminUri + "/api") + .pathSegment("queues", "{vhost}", "{stream}") + .buildAndExpand(vhost, jobQueueName).encode().toUri(); + try { + getQueueDetails(restTemplate, jobQueueName, uri); + removedQueues.add(jobQueueName); + } + catch (HttpClientErrorException e) { + if (!e.getStatusCode().equals(HttpStatus.NOT_FOUND)) { + throw new RabbitAdminException("Failed to lookup queue " + jobQueueName, e); + } + } + return removedQueues; + } + + @SuppressWarnings("unchecked") + private void getQueueDetails(RestTemplate restTemplate, String queueName, URI uri) { + Map queue = restTemplate.getForObject(uri, Map.class); + if (queue.get("consumers") != Integer.valueOf(0)) { + throw new RabbitAdminException("Queue " + queueName + " is in use"); + } + } + + @SuppressWarnings("unchecked") + private List findExchanges(String adminUri, String vhost, String busPrefix, String entity, + RestTemplate restTemplate, ExchangeCandidateCallback callback) { + List removedExchanges = new ArrayList<>(); + URI uri = UriComponentsBuilder.fromUriString(adminUri + "/api") + .pathSegment("exchanges", "{vhost}") + .buildAndExpand(vhost).encode().toUri(); + List> exchanges = restTemplate.getForObject(uri, List.class); + for (Map exchange : exchanges) { + String exchangeName = (String) exchange.get("name"); + if (callback.isCandidate(exchangeName)) { + uri = UriComponentsBuilder.fromUriString(adminUri + "/api") + .pathSegment("exchanges", "{vhost}", "{name}", "bindings", "source") + .buildAndExpand(vhost, exchangeName).encode().toUri(); + List> bindings = restTemplate.getForObject(uri, List.class); + if (bindings.size() == 0) { + uri = UriComponentsBuilder.fromUriString(adminUri + "/api") + .pathSegment("exchanges", "{vhost}", "{name}", "bindings", "destination") + .buildAndExpand(vhost, exchangeName).encode().toUri(); + bindings = restTemplate.getForObject(uri, List.class); + if (bindings.size() == 0) { + removedExchanges.add((String) exchange.get("name")); + } + else { + throw new RabbitAdminException("Cannot delete exchange " + exchangeName + + "; it is a destination: " + bindings); + } + } + else { + throw new RabbitAdminException("Cannot delete exchange " + exchangeName + "; it has bindings: " + + bindings); + } + } + } + return removedExchanges; + } + + private interface ExchangeCandidateCallback { + + boolean isCandidate(String exchangeName); + } + + @VisibleForTesting + static RestTemplate buildRestTemplate(String adminUri, String user, String password) { + BasicCredentialsProvider credsProvider = new BasicCredentialsProvider(); + credsProvider.setCredentials( + new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT), + new UsernamePasswordCredentials(user, password)); + HttpClient httpClient = HttpClients.custom().setDefaultCredentialsProvider(credsProvider).build(); + // Set up pre-emptive basic Auth because the rabbit plugin doesn't currently support challenge/response for PUT + // Create AuthCache instance + AuthCache authCache = new BasicAuthCache(); + // Generate BASIC scheme object and add it to the local; from the apache docs... + // auth cache + BasicScheme basicAuth = new BasicScheme(); + URI uri; + try { + uri = new URI(adminUri); + } + catch (URISyntaxException e) { + throw new RabbitAdminException("Invalid URI", e); + } + authCache.put(new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme()), basicAuth); + // Add AuthCache to the execution context + final HttpClientContext localContext = HttpClientContext.create(); + localContext.setAuthCache(authCache); + RestTemplate restTemplate = new RestTemplate(new HttpComponentsClientHttpRequestFactory(httpClient) { + + @Override + protected HttpContext createHttpContext(HttpMethod httpMethod, URI uri) { + return localContext; + } + + }); + restTemplate.setMessageConverters(Collections.> singletonList( + new MappingJackson2HttpMessageConverter())); + return restTemplate; + } + +} diff --git a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/rabbit/package-info.java b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/rabbit/package-info.java new file mode 100644 index 000000000..2c114d3b0 --- /dev/null +++ b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/integration/bus/rabbit/package-info.java @@ -0,0 +1,6 @@ +/** + * Package for classes related to the RabbitMessageBus. + */ + +package org.springframework.xd.dirt.integration.bus.rabbit; + diff --git a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/plugins/AbstractJobPlugin.java b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/plugins/AbstractJobPlugin.java index fe7d5106f..ad8679363 100644 --- a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/plugins/AbstractJobPlugin.java +++ b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/plugins/AbstractJobPlugin.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2014 the original author or authors. + * Copyright 2013-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ * job plugins. * * @author Ilayaperumal Gopinathan + * @author Gary Russell */ public class AbstractJobPlugin extends AbstractMessageBusBinderPlugin { @@ -33,9 +34,14 @@ public AbstractJobPlugin(MessageBus messageBus) { super(messageBus); } + public static String getJobChannelName(String jobName) { + return JOB_CHANNEL_PREFIX + jobName; + } + @Override protected String getInputChannelName(Module module) { - return JOB_CHANNEL_PREFIX + module.getDescriptor().getGroup(); + String group = module.getDescriptor().getGroup(); + return getJobChannelName(group); } @Override diff --git a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/plugins/AbstractStreamPlugin.java b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/plugins/AbstractStreamPlugin.java index 48ce2930f..4c680f2e1 100644 --- a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/plugins/AbstractStreamPlugin.java +++ b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/plugins/AbstractStreamPlugin.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2014 the original author or authors. + * Copyright 2013-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ * * @author Ilayaperumal Gopinathan * @author Mark Fisher + * @author Gary Russell */ public abstract class AbstractStreamPlugin extends AbstractMessageBusBinderPlugin { @@ -41,18 +42,36 @@ public AbstractStreamPlugin(MessageBus messageBus, ZooKeeperConnection zkConnect super(messageBus, zkConnection); } + /** + * Construct a pipe name from the group and index. + * @param group the group. + * @param index the index. + * @return the name. + */ + public static String constructPipeName(String group, int index) { + return group + "." + index; + } + + public static String constructTapPrefix(String group) { + return TAP_CHANNEL_PREFIX + "stream:" + group; + } + @Override protected String getInputChannelName(Module module) { ModuleDescriptor descriptor = module.getDescriptor(); String sourceChannel = descriptor.getSourceChannelName(); - return (sourceChannel != null) ? sourceChannel : descriptor.getGroup() + "." + (descriptor.getIndex() - 1); + return (sourceChannel != null) + ? sourceChannel + : constructPipeName(descriptor.getGroup(), descriptor.getIndex() - 1); } @Override protected String getOutputChannelName(Module module) { ModuleDescriptor descriptor = module.getDescriptor(); String sinkChannel = descriptor.getSinkChannelName(); - return (sinkChannel != null) ? sinkChannel : descriptor.getGroup() + "." + descriptor.getIndex(); + return (sinkChannel != null) + ? sinkChannel + : constructPipeName(descriptor.getGroup(), descriptor.getIndex()); } @Override @@ -60,14 +79,14 @@ protected String buildTapChannelName(Module module) { Assert.isTrue(module.getType() != ModuleType.job, "Job module type not supported."); ModuleDescriptor descriptor = module.getDescriptor(); // for Stream return channel name with indexed elements - return String.format("%s%s%s.%s.%s", TAP_CHANNEL_PREFIX, "stream:", descriptor.getGroup(), + return String.format("%s.%s.%s", constructTapPrefix(descriptor.getGroup()), module.getName(), descriptor.getIndex()); } @Override public boolean supports(Module module) { ModuleType moduleType = module.getType(); - return (module.shouldBind() && - (moduleType == ModuleType.source || moduleType == ModuleType.processor || moduleType == ModuleType.sink)); + return (module.shouldBind() && (moduleType == ModuleType.source || moduleType == ModuleType.processor || moduleType == ModuleType.sink)); } + } diff --git a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/plugins/job/JobEventsListenerPlugin.java b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/plugins/job/JobEventsListenerPlugin.java index cafb815a8..c0af2d9d9 100644 --- a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/plugins/job/JobEventsListenerPlugin.java +++ b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/plugins/job/JobEventsListenerPlugin.java @@ -45,7 +45,7 @@ public JobEventsListenerPlugin(MessageBus messageBus) { @Override public void postProcessModule(Module module) { boolean disableListeners = true; - Map eventChannels = getEventListenerChannels(module); + Map eventChannels = getEventListenerChannels(module.getDescriptor().getGroup()); for (Map.Entry entry : eventChannels.entrySet()) { MessageChannel eventChannel = module.getComponent(entry.getKey(), SubscribableChannel.class); if (eventChannel != null) { @@ -60,13 +60,12 @@ public void postProcessModule(Module module) { } /** - * @param module + * @param jobName the job name. * @return the map containing the entries for the channels used by the job listeners with bean name of the channel * as the key and channel name as the value. */ - private Map getEventListenerChannels(Module module) { + public static Map getEventListenerChannels(String jobName) { Map eventListenerChannels = new HashMap(); - String jobName = module.getDescriptor().getGroup(); Assert.notNull(jobName, "Job name should not be null"); eventListenerChannels.put(XD_JOB_EXECUTION_EVENTS_CHANNEL, getEventListenerChannelName(jobName, JOB_EXECUTION_EVENTS_SUFFIX)); @@ -79,11 +78,15 @@ private Map getEventListenerChannels(Module module) { } - private String getEventListenerChannelName(String jobName, String channelNameSuffix) { + private static String getEventListenerChannelName(String jobName, String channelNameSuffix) { return String.format("%s%s.%s", JOB_TAP_CHANNEL_PREFIX, jobName, channelNameSuffix); } - private String getEventListenerChannelName(String jobName) { + /** + * @param jobName the job name. + * @return the aggregated event channel name. + */ + public static String getEventListenerChannelName(String jobName) { return String.format("%s%s", JOB_TAP_CHANNEL_PREFIX, jobName); } @@ -97,7 +100,7 @@ private void bindAggregatedEventsChannel(Module module) { @Override public void removeModule(Module module) { - Map eventListenerChannels = getEventListenerChannels(module); + Map eventListenerChannels = getEventListenerChannels(module.getDescriptor().getGroup()); for (Map.Entry channelEntry : eventListenerChannels.entrySet()) { messageBus.unbindProducers(channelEntry.getValue()); } diff --git a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/JobsController.java b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/JobsController.java index 30e0ba96c..35a0cd26e 100644 --- a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/JobsController.java +++ b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/JobsController.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Pageable; @@ -26,6 +27,7 @@ import org.springframework.hateoas.PagedResources; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; @@ -101,6 +103,18 @@ public PagedResources list(Pageable pageable, pagedResources.getLinks()); } + @ResponseBody + @RequestMapping(value = "/clean/rabbit/{job}", method = RequestMethod.DELETE) + @ResponseStatus(HttpStatus.OK) + public Map> clean(@PathVariable String job, + @RequestParam(required = false) String adminUri, + @RequestParam(required = false) String user, + @RequestParam(required = false) String pw, + @RequestParam(required = false) String vhost, + @RequestParam(required = false) String busPrefix) { + return cleanRabbitBus(job, adminUri, user, pw, vhost, busPrefix, true); + } + @Override protected JobDefinition createDefinition(String name, String definition) { return new JobDefinition(name, definition); diff --git a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/RestControllerAdvice.java b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/RestControllerAdvice.java index 3396c1af6..2d24776f4 100644 --- a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/RestControllerAdvice.java +++ b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/RestControllerAdvice.java @@ -31,6 +31,8 @@ import org.springframework.xd.dirt.cluster.ContainerShutdownException; import org.springframework.xd.dirt.cluster.ModuleMessageRateNotFoundException; import org.springframework.xd.dirt.cluster.NoSuchContainerException; +import org.springframework.xd.dirt.integration.bus.rabbit.NothingToDeleteException; +import org.springframework.xd.dirt.integration.bus.rabbit.RabbitAdminException; import org.springframework.xd.dirt.job.BatchJobAlreadyExistsException; import org.springframework.xd.dirt.job.JobExecutionAlreadyRunningException; import org.springframework.xd.dirt.job.JobExecutionNotRunningException; @@ -302,4 +304,20 @@ public VndErrors onNoSuchPageException(PageNotFoundException e) { return new VndErrors(logref, e.getMessage()); } + @ResponseBody + @ExceptionHandler + @ResponseStatus(HttpStatus.NO_CONTENT) + public VndErrors onNothingToDeleteException(NothingToDeleteException e) { + String logref = logDebug(e); + return new VndErrors(logref, e.getMessage()); + } + + @ResponseBody + @ExceptionHandler + @ResponseStatus(HttpStatus.BAD_REQUEST) + public VndErrors onRabbitAdminException(RabbitAdminException e) { + String logref = logDebug(e); + return new VndErrors(logref, e.getMessage()); + } + } diff --git a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/StreamsController.java b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/StreamsController.java index a82a19d30..0e198dcee 100644 --- a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/StreamsController.java +++ b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/StreamsController.java @@ -1,5 +1,5 @@ /* - * Copyright 2013 the original author or authors. + * Copyright 2013-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,9 @@ package org.springframework.xd.dirt.rest; +import java.util.List; +import java.util.Map; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Pageable; import org.springframework.data.web.PagedResourcesAssembler; @@ -23,8 +26,10 @@ import org.springframework.hateoas.PagedResources; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.xd.dirt.stream.StreamDefinition; @@ -34,11 +39,12 @@ /** * Handles all Stream related interaction. - * + * * @author Eric Bottard * @author Gunnar Hillert * @author David Turanski - * + * @author Gary Russell + * * @since 1.0 */ @Controller @@ -67,4 +73,17 @@ public PagedResources list(Pageable pageable, protected StreamDefinition createDefinition(String name, String definition) { return new StreamDefinition(name, definition); } + + @ResponseBody + @RequestMapping(value = "/clean/rabbit/{stream}", method = RequestMethod.DELETE) + @ResponseStatus(HttpStatus.OK) + public Map> clean(@PathVariable String stream, + @RequestParam(required = false) String adminUri, + @RequestParam(required = false) String user, + @RequestParam(required = false) String pw, + @RequestParam(required = false) String vhost, + @RequestParam(required = false) String busPrefix) { + return cleanRabbitBus(stream, adminUri, user, pw, vhost, busPrefix, false); + } + } diff --git a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/XDController.java b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/XDController.java index 091fde009..c0f76fe70 100644 --- a/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/XDController.java +++ b/spring-xd-dirt/src/main/java/org/springframework/xd/dirt/rest/XDController.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -38,6 +39,8 @@ import org.springframework.xd.dirt.core.BaseDefinition; import org.springframework.xd.dirt.core.DeploymentUnitStatus; import org.springframework.xd.dirt.core.ResourceDeployer; +import org.springframework.xd.dirt.integration.bus.rabbit.NothingToDeleteException; +import org.springframework.xd.dirt.integration.bus.rabbit.RabbitBusCleaner; import org.springframework.xd.dirt.stream.AbstractDeployer; import org.springframework.xd.dirt.stream.AbstractInstancePersistingDeployer; import org.springframework.xd.dirt.stream.BaseInstance; @@ -65,6 +68,8 @@ public abstract class XDController> cleanRabbitBus(String stream, String adminUri, String user, String pw, + String vhost, String busPrefix, boolean isJob) { + Map> results = busCleaner.clean(adminUri, user, pw, vhost, busPrefix, stream, isJob); + if (results == null || results.size() == 0) { + throw new NothingToDeleteException("Nothing to delete for stream " + stream); + } + return results; + } } diff --git a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/kafka/KafkaMessageBusTests.java b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/kafka/KafkaMessageBusTests.java similarity index 97% rename from spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/kafka/KafkaMessageBusTests.java rename to spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/kafka/KafkaMessageBusTests.java index 2d144ab3e..a083f7c5e 100644 --- a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/kafka/KafkaMessageBusTests.java +++ b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/kafka/KafkaMessageBusTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.xd.dirt.integration.kafka; +package org.springframework.xd.dirt.integration.bus.kafka; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertNotNull; @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import kafka.api.OffsetRequest; + import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -40,6 +41,7 @@ import org.springframework.xd.dirt.integration.bus.EmbeddedHeadersMessageConverter; import org.springframework.xd.dirt.integration.bus.MessageBus; import org.springframework.xd.dirt.integration.bus.PartitionCapableBusTests; +import org.springframework.xd.dirt.integration.kafka.KafkaMessageBus; import org.springframework.xd.test.kafka.KafkaTestSupport; diff --git a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/kafka/KafkaTestMessageBus.java b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/kafka/KafkaTestMessageBus.java similarity index 93% rename from spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/kafka/KafkaTestMessageBus.java rename to spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/kafka/KafkaTestMessageBus.java index 7faee1b8e..b20e9da05 100644 --- a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/kafka/KafkaTestMessageBus.java +++ b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/kafka/KafkaTestMessageBus.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.xd.dirt.integration.kafka; +package org.springframework.xd.dirt.integration.bus.kafka; import java.util.HashMap; import java.util.Map; @@ -27,6 +27,8 @@ import org.springframework.xd.dirt.integration.bus.serializer.MultiTypeCodec; import org.springframework.xd.dirt.integration.bus.serializer.kryo.PojoCodec; import org.springframework.xd.dirt.integration.bus.serializer.kryo.TupleCodec; +import org.springframework.xd.dirt.integration.kafka.KafkaMessageBus; +import org.springframework.xd.dirt.integration.kafka.TestKafkaCluster; import org.springframework.xd.test.kafka.KafkaTestSupport; import org.springframework.xd.tuple.Tuple; diff --git a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitBusCleanerTests.java b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitBusCleanerTests.java new file mode 100644 index 000000000..faadbdd84 --- /dev/null +++ b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitBusCleanerTests.java @@ -0,0 +1,215 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.xd.dirt.integration.bus.rabbit; + + +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.URI; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.junit.Rule; +import org.junit.Test; + +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.core.ChannelCallback; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.web.client.RestTemplate; +import org.springframework.web.util.UriComponentsBuilder; +import org.springframework.xd.dirt.integration.bus.MessageBusSupport; +import org.springframework.xd.dirt.plugins.AbstractJobPlugin; +import org.springframework.xd.dirt.plugins.AbstractStreamPlugin; +import org.springframework.xd.dirt.plugins.job.JobEventsListenerPlugin; +import org.springframework.xd.test.rabbit.RabbitAdminTestSupport; +import org.springframework.xd.test.rabbit.RabbitTestSupport; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; + +/** + * @author Gary Russell + * @since 1.2 + */ +public class RabbitBusCleanerTests { + + @Rule + public RabbitAdminTestSupport adminTest = new RabbitAdminTestSupport(); + + @Rule + public RabbitTestSupport test = new RabbitTestSupport(); + + @Test + public void testCleanStream() { + final RabbitBusCleaner cleaner = new RabbitBusCleaner(); + final RestTemplate template = RabbitBusCleaner.buildRestTemplate("http://localhost:15672", "guest", "guest"); + final String uuid = UUID.randomUUID().toString(); + String firstQueue = null; + for (int i = 0; i < 5; i++) { + String queueName = MessageBusSupport.applyPrefix("xdbus.", + AbstractStreamPlugin.constructPipeName(uuid, i)); + if (firstQueue == null) { + firstQueue = queueName; + } + URI uri = UriComponentsBuilder.fromUriString("http://localhost:15672/api/queues") + .pathSegment("{vhost}", "{queue}") + .buildAndExpand("/", queueName) + .encode().toUri(); + template.put(uri, new AmqpQueue(false, true)); + uri = UriComponentsBuilder.fromUriString("http://localhost:15672/api/queues") + .pathSegment("{vhost}", "{queue}") + .buildAndExpand("/", MessageBusSupport.constructDLQName(queueName)).encode().toUri(); + template.put(uri, new AmqpQueue(false, true)); + } + CachingConnectionFactory connectionFactory = test.getResource(); + RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + final FanoutExchange fanout = new FanoutExchange( + MessageBusSupport.applyPrefix("xdbus.", MessageBusSupport.applyPubSub( + AbstractStreamPlugin.constructTapPrefix(uuid) + ".foo.bar"))); + rabbitAdmin.declareExchange(fanout); + rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(firstQueue)).to(fanout)); + new RabbitTemplate(connectionFactory).execute(new ChannelCallback() { + + @Override + public Void doInRabbit(Channel channel) throws Exception { + String queueName = MessageBusSupport.applyPrefix("xdbus.", + AbstractStreamPlugin.constructPipeName(uuid, 4)); + String consumerTag = channel.basicConsume(queueName, new DefaultConsumer(channel)); + try { + waitForConsumerStateNot(queueName, 0); + cleaner.clean(uuid, false); + fail("Expected exception"); + } + catch (RabbitAdminException e) { + assertEquals("Queue " + queueName + " is in use", e.getMessage()); + } + channel.basicCancel(consumerTag); + waitForConsumerStateNot(queueName, 1); + try { + cleaner.clean(uuid, false); + fail("Expected exception"); + } + catch (RabbitAdminException e) { + assertThat(e.getMessage(), startsWith("Cannot delete exchange " + + fanout.getName() + "; it has bindings:")); + } + return null; + } + + private void waitForConsumerStateNot(String queueName, int state) throws InterruptedException { + int n = 0; + URI uri = UriComponentsBuilder.fromUriString("http://localhost:15672/api/queues").pathSegment( + "{vhost}", "{queue}") + .buildAndExpand("/", queueName).encode().toUri(); + while (n++ < 100) { + @SuppressWarnings("unchecked") + Map queueInfo = template.getForObject(uri, Map.class); + if (!queueInfo.get("consumers").equals(Integer.valueOf(state))) { + break; + } + Thread.sleep(100); + } + assertTrue("Consumer state remained at " + state + " after 10 seconds", n < 100); + } + + }); + rabbitAdmin.deleteExchange(fanout.getName()); // easier than deleting the binding + rabbitAdmin.declareExchange(fanout); + connectionFactory.destroy(); + Map> cleanedMap = cleaner.clean(uuid, false); + assertEquals(2, cleanedMap.size()); + List cleanedQueues = cleanedMap.get("queues"); + assertEquals(10, cleanedQueues.size()); + for (int i = 0; i < 5; i++) { + assertEquals("xdbus." + uuid + "." + i, cleanedQueues.get(i * 2)); + assertEquals("xdbus." + uuid + "." + i + ".dlq", cleanedQueues.get(i * 2 + 1)); + } + List cleanedExchanges = cleanedMap.get("exchanges"); + assertEquals(1, cleanedExchanges.size()); + assertEquals(fanout.getName(), cleanedExchanges.get(0)); + } + + @Test + public void testCleanJob() { + final RabbitBusCleaner cleaner = new RabbitBusCleaner(); + final String uuid = UUID.randomUUID().toString(); + Set jobExchanges = new HashSet<>(JobEventsListenerPlugin.getEventListenerChannels(uuid).values()); + jobExchanges.add(JobEventsListenerPlugin.getEventListenerChannelName(uuid)); + CachingConnectionFactory connectionFactory = test.getResource(); + RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + for (String exchange : jobExchanges) { + FanoutExchange fanout = new FanoutExchange( + MessageBusSupport.applyPrefix("xdbus.", MessageBusSupport.applyPubSub(exchange))); + rabbitAdmin.declareExchange(fanout); + } + rabbitAdmin.declareQueue(new Queue(MessageBusSupport.applyPrefix("xdbus.", + AbstractJobPlugin.getJobChannelName(uuid)))); + Map> cleanedMap = cleaner.clean(uuid, true); + assertEquals(2, cleanedMap.size()); + List cleanedQueues = cleanedMap.get("queues"); + assertEquals(1, cleanedQueues.size()); + List cleanedExchanges = cleanedMap.get("exchanges"); + assertEquals(6, cleanedExchanges.size()); + } + + public static class AmqpQueue { + + private boolean autoDelete; + + private boolean durable; + + public AmqpQueue(boolean autoDelete, boolean durable) { + this.autoDelete = autoDelete; + this.durable = durable; + } + + + @JsonProperty("auto_delete") + protected boolean isAutoDelete() { + return autoDelete; + } + + + protected void setAutoDelete(boolean autoDelete) { + this.autoDelete = autoDelete; + } + + + protected boolean isDurable() { + return durable; + } + + + protected void setDurable(boolean durable) { + this.durable = durable; + } + + } + +} diff --git a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/rabbit/RabbitMessageBusTests.java b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitMessageBusTests.java similarity index 99% rename from spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/rabbit/RabbitMessageBusTests.java rename to spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitMessageBusTests.java index 7a2b6ef30..eb228e0d6 100644 --- a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/rabbit/RabbitMessageBusTests.java +++ b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitMessageBusTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.xd.dirt.integration.rabbit; +package org.springframework.xd.dirt.integration.bus.rabbit; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; @@ -69,7 +69,6 @@ import org.springframework.xd.dirt.integration.bus.Binding; import org.springframework.xd.dirt.integration.bus.MessageBus; import org.springframework.xd.dirt.integration.bus.PartitionCapableBusTests; -import org.springframework.xd.dirt.integration.bus.RabbitTestMessageBus; import org.springframework.xd.test.rabbit.RabbitTestSupport; /** diff --git a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/RabbitTestMessageBus.java b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitTestMessageBus.java similarity index 95% rename from spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/RabbitTestMessageBus.java rename to spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitTestMessageBus.java index c2cc1fb20..20fd13188 100644 --- a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/RabbitTestMessageBus.java +++ b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/rabbit/RabbitTestMessageBus.java @@ -14,13 +14,14 @@ * limitations under the License. */ -package org.springframework.xd.dirt.integration.bus; +package org.springframework.xd.dirt.integration.bus.rabbit; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.xd.dirt.integration.bus.AbstractTestMessageBus; import org.springframework.xd.dirt.integration.bus.serializer.MultiTypeCodec; import org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus; diff --git a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/redis/RedisMessageBusTests.java b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/redis/RedisMessageBusTests.java similarity index 99% rename from spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/redis/RedisMessageBusTests.java rename to spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/redis/RedisMessageBusTests.java index a9c57d4b3..50c20c12e 100644 --- a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/redis/RedisMessageBusTests.java +++ b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/redis/RedisMessageBusTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.xd.dirt.integration.redis; +package org.springframework.xd.dirt.integration.bus.redis; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; @@ -51,7 +51,7 @@ import org.springframework.xd.dirt.integration.bus.EmbeddedHeadersMessageConverter; import org.springframework.xd.dirt.integration.bus.MessageBus; import org.springframework.xd.dirt.integration.bus.PartitionCapableBusTests; -import org.springframework.xd.dirt.integration.bus.RedisTestMessageBus; +import org.springframework.xd.dirt.integration.redis.RedisMessageBus; import org.springframework.xd.test.redis.RedisTestSupport; /** diff --git a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/RedisTestMessageBus.java b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/redis/RedisTestMessageBus.java similarity index 95% rename from spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/RedisTestMessageBus.java rename to spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/redis/RedisTestMessageBus.java index 299b499f0..cd6b87dba 100644 --- a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/RedisTestMessageBus.java +++ b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/bus/redis/RedisTestMessageBus.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.xd.dirt.integration.bus; +package org.springframework.xd.dirt.integration.bus.redis; import org.springframework.context.support.GenericApplicationContext; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -24,6 +24,7 @@ import org.springframework.integration.support.DefaultMessageBuilderFactory; import org.springframework.integration.support.utils.IntegrationUtils; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.xd.dirt.integration.bus.AbstractTestMessageBus; import org.springframework.xd.dirt.integration.bus.serializer.MultiTypeCodec; import org.springframework.xd.dirt.integration.redis.RedisMessageBus; diff --git a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/kafka/TestKafkaCluster.java b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/kafka/TestKafkaCluster.java index 891581751..a88647373 100644 --- a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/kafka/TestKafkaCluster.java +++ b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/integration/kafka/TestKafkaCluster.java @@ -23,6 +23,7 @@ import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; import kafka.utils.TestUtils; + import org.I0Itec.zkclient.ZkClient; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -34,6 +35,7 @@ import org.springframework.util.Assert; import org.springframework.util.SocketUtils; +import org.springframework.xd.dirt.integration.kafka.KafkaMessageBus; import java.io.IOException; import java.util.Collections; diff --git a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/plugins/job/RabbitJobPluginTests.java b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/plugins/job/RabbitJobPluginTests.java index fbd065835..ac7dd7f6e 100644 --- a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/plugins/job/RabbitJobPluginTests.java +++ b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/plugins/job/RabbitJobPluginTests.java @@ -26,7 +26,7 @@ import org.springframework.integration.test.util.TestUtils; import org.springframework.xd.dirt.integration.bus.MessageBus; -import org.springframework.xd.dirt.integration.bus.RabbitTestMessageBus; +import org.springframework.xd.dirt.integration.bus.rabbit.RabbitTestMessageBus; import org.springframework.xd.dirt.integration.bus.serializer.AbstractCodec; import org.springframework.xd.dirt.integration.bus.serializer.CompositeCodec; import org.springframework.xd.dirt.integration.bus.serializer.MultiTypeCodec; diff --git a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/plugins/job/RedisJobPluginTests.java b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/plugins/job/RedisJobPluginTests.java index 8827e7e47..f19aff9b1 100644 --- a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/plugins/job/RedisJobPluginTests.java +++ b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/plugins/job/RedisJobPluginTests.java @@ -27,7 +27,7 @@ import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.integration.test.util.TestUtils; import org.springframework.xd.dirt.integration.bus.MessageBus; -import org.springframework.xd.dirt.integration.bus.RedisTestMessageBus; +import org.springframework.xd.dirt.integration.bus.redis.RedisTestMessageBus; import org.springframework.xd.dirt.integration.bus.serializer.AbstractCodec; import org.springframework.xd.dirt.integration.bus.serializer.CompositeCodec; import org.springframework.xd.dirt.integration.bus.serializer.MultiTypeCodec; diff --git a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/stream/KafkaSingleNodeStreamDeploymentIntegrationTests.java b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/stream/KafkaSingleNodeStreamDeploymentIntegrationTests.java index 5b15d9069..0a98991fc 100644 --- a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/stream/KafkaSingleNodeStreamDeploymentIntegrationTests.java +++ b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/stream/KafkaSingleNodeStreamDeploymentIntegrationTests.java @@ -24,6 +24,7 @@ import java.util.Map; import kafka.api.OffsetRequest; + import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -42,8 +43,8 @@ import org.springframework.integration.kafka.support.ZookeeperConnect; import org.springframework.integration.kafka.util.MessageUtils; import org.springframework.messaging.MessageChannel; +import org.springframework.xd.dirt.integration.bus.kafka.KafkaTestMessageBus; import org.springframework.xd.dirt.integration.kafka.KafkaMessageBus; -import org.springframework.xd.dirt.integration.kafka.KafkaTestMessageBus; import org.springframework.xd.test.kafka.KafkaTestSupport; /** diff --git a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/stream/RabbitSingleNodeStreamDeploymentIntegrationTests.java b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/stream/RabbitSingleNodeStreamDeploymentIntegrationTests.java index bb879735d..fd3516bf7 100644 --- a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/stream/RabbitSingleNodeStreamDeploymentIntegrationTests.java +++ b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/stream/RabbitSingleNodeStreamDeploymentIntegrationTests.java @@ -33,7 +33,7 @@ import org.springframework.messaging.MessageChannel; import org.springframework.xd.dirt.integration.bus.Binding; import org.springframework.xd.dirt.integration.bus.MessageBus; -import org.springframework.xd.dirt.integration.bus.RabbitTestMessageBus; +import org.springframework.xd.dirt.integration.bus.rabbit.RabbitTestMessageBus; import org.springframework.xd.dirt.test.sink.NamedChannelSink; import org.springframework.xd.dirt.test.sink.SingleNodeNamedChannelSinkFactory; import org.springframework.xd.dirt.test.source.NamedChannelSource; diff --git a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/stream/RedisSingleNodeStreamDeploymentIntegrationTests.java b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/stream/RedisSingleNodeStreamDeploymentIntegrationTests.java index 203676910..918816075 100644 --- a/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/stream/RedisSingleNodeStreamDeploymentIntegrationTests.java +++ b/spring-xd-dirt/src/test/java/org/springframework/xd/dirt/stream/RedisSingleNodeStreamDeploymentIntegrationTests.java @@ -25,7 +25,7 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.messaging.MessageChannel; -import org.springframework.xd.dirt.integration.bus.RedisTestMessageBus; +import org.springframework.xd.dirt.integration.bus.redis.RedisTestMessageBus; import org.springframework.xd.test.redis.RedisTestSupport; /** diff --git a/spring-xd-messagebus-rabbit/src/main/java/org/springframework/xd/dirt/integration/rabbit/RabbitMessageBus.java b/spring-xd-messagebus-rabbit/src/main/java/org/springframework/xd/dirt/integration/rabbit/RabbitMessageBus.java index cf11b5885..e6e98843e 100644 --- a/spring-xd-messagebus-rabbit/src/main/java/org/springframework/xd/dirt/integration/rabbit/RabbitMessageBus.java +++ b/spring-xd-messagebus-rabbit/src/main/java/org/springframework/xd/dirt/integration/rabbit/RabbitMessageBus.java @@ -364,7 +364,7 @@ public void bindConsumer(final String name, MessageChannel moduleInputChannel, P validateConsumerProperties(name, properties, SUPPORTED_CONSUMER_PROPERTIES); } RabbitPropertiesAccessor accessor = new RabbitPropertiesAccessor(properties); - String queueName = accessor.getPrefix(this.defaultPrefix) + name; + String queueName = applyPrefix(accessor.getPrefix(this.defaultPrefix), name); int partitionIndex = accessor.getPartitionIndex(); if (partitionIndex >= 0) { queueName += "-" + partitionIndex; @@ -384,7 +384,7 @@ public void bindPubSubConsumer(String name, MessageChannel moduleInputChannel, P RabbitPropertiesAccessor accessor = new RabbitPropertiesAccessor(properties); validateConsumerProperties(name, properties, SUPPORTED_PUBSUB_CONSUMER_PROPERTIES); String prefix = accessor.getPrefix(this.defaultPrefix); - FanoutExchange exchange = new FanoutExchange(prefix + "topic." + name); + FanoutExchange exchange = new FanoutExchange(applyPrefix(prefix, applyPubSub(name))); declareExchangeIfNotPresent(exchange); String uniqueName = name + "." + UUID.randomUUID().toString(); Queue queue = new Queue(prefix + uniqueName, false, true, true); @@ -489,7 +489,7 @@ public void bindProducer(final String name, MessageChannel moduleOutputChannel, private AmqpOutboundEndpoint buildOutboundEndpoint(final String name, RabbitPropertiesAccessor properties, RabbitTemplate rabbitTemplate) { - String queueName = properties.getPrefix(this.defaultPrefix) + name; + String queueName = applyPrefix(properties.getPrefix(this.defaultPrefix), name); String partitionKeyExtractorClass = properties.getPartitionKeyExtractorClass(); Expression partitionKeyExpression = properties.getPartitionKeyExpression(); AmqpOutboundEndpoint queue = new AmqpOutboundEndpoint(rabbitTemplate); @@ -522,7 +522,7 @@ public void bindPubSubProducer(String name, MessageChannel moduleOutputChannel, Properties properties) { validateProducerProperties(name, properties, SUPPORTED_PUBSUB_PRODUCER_PROPERTIES); RabbitPropertiesAccessor accessor = new RabbitPropertiesAccessor(properties); - String exchangeName = accessor.getPrefix(this.defaultPrefix) + "topic." + name; + String exchangeName = applyPrefix(accessor.getPrefix(this.defaultPrefix), applyPubSub(name)); declareExchangeIfNotPresent(new FanoutExchange(exchangeName)); AmqpOutboundEndpoint fanout = new AmqpOutboundEndpoint(determineRabbitTemplate(accessor)); fanout.setExchangeName(exchangeName); @@ -657,13 +657,14 @@ public Void doInRabbit(Channel channel) throws Exception { private void autoBindDLQ(final String name, RabbitPropertiesAccessor properties) { if (properties.getAutoBindDLQ(this.defaultAutoBindDLQ)) { String prefix = properties.getPrefix(this.defaultPrefix); - String dlqName = prefix + name + ".dlq"; + String queueName = applyPrefix(prefix, name); + String dlqName = constructDLQName(queueName); Queue dlq = new Queue(dlqName); declareQueueIfNotPresent(dlq); - final String dlxName = properties.getPrefix(this.defaultPrefix) + "DLX"; + final String dlxName = prefix + "DLX"; final DirectExchange dlx = new DirectExchange(dlxName); declareExchangeIfNotPresent(dlx); - this.rabbitAdmin.declareBinding(BindingBuilder.bind(dlq).to(dlx).with(prefix + name)); + this.rabbitAdmin.declareBinding(BindingBuilder.bind(dlq).to(dlx).with(queueName)); } } diff --git a/spring-xd-messagebus-redis/src/main/java/org/springframework/xd/dirt/integration/redis/RedisMessageBus.java b/spring-xd-messagebus-redis/src/main/java/org/springframework/xd/dirt/integration/redis/RedisMessageBus.java index a053233e9..6e5d8e58f 100644 --- a/spring-xd-messagebus-redis/src/main/java/org/springframework/xd/dirt/integration/redis/RedisMessageBus.java +++ b/spring-xd-messagebus-redis/src/main/java/org/springframework/xd/dirt/integration/redis/RedisMessageBus.java @@ -156,8 +156,10 @@ public RedisMessageBus(RedisConnectionFactory connectionFactory, MultiTypeCodec< parser.parseExpression("headers['" + ERROR_HEADER + "']"), connectionFactory); if (headersToMap != null && headersToMap.length > 0) { String[] combinedHeadersToMap = - Arrays.copyOfRange(XdHeaders.STANDARD_HEADERS, 0, XdHeaders.STANDARD_HEADERS.length + headersToMap.length); - System.arraycopy(headersToMap, 0, combinedHeadersToMap, XdHeaders.STANDARD_HEADERS.length, headersToMap.length); + Arrays.copyOfRange(XdHeaders.STANDARD_HEADERS, 0, XdHeaders.STANDARD_HEADERS.length + + headersToMap.length); + System.arraycopy(headersToMap, 0, combinedHeadersToMap, XdHeaders.STANDARD_HEADERS.length, + headersToMap.length); this.headersToMap = combinedHeadersToMap; } else { @@ -217,7 +219,7 @@ public void bindPubSubConsumer(final String name, MessageChannel moduleInputChan RedisInboundChannelAdapter adapter = new RedisInboundChannelAdapter(this.connectionFactory); adapter.setBeanFactory(this.getBeanFactory()); adapter.setSerializer(null); - adapter.setTopics("topic." + name); + adapter.setTopics(applyPubSub(name)); doRegisterConsumer(name, name, moduleInputChannel, adapter, new RedisPropertiesAccessor(properties)); } @@ -333,7 +335,7 @@ public void bindPubSubProducer(final String name, MessageChannel moduleOutputCha validateProducerProperties(name, properties, SUPPORTED_PUBSUB_PRODUCER_PROPERTIES); RedisPublishingMessageHandler topic = new RedisPublishingMessageHandler(connectionFactory); topic.setBeanFactory(this.getBeanFactory()); - topic.setTopic("topic." + name); + topic.setTopic(applyPubSub(name)); topic.afterPropertiesSet(); doRegisterProducer(name, moduleOutputChannel, topic, new RedisPropertiesAccessor(properties)); } diff --git a/spring-xd-messagebus-spi/src/main/java/org/springframework/xd/dirt/integration/bus/MessageBusSupport.java b/spring-xd-messagebus-spi/src/main/java/org/springframework/xd/dirt/integration/bus/MessageBusSupport.java index e7e152a1a..d542f5841 100644 --- a/spring-xd-messagebus-spi/src/main/java/org/springframework/xd/dirt/integration/bus/MessageBusSupport.java +++ b/spring-xd-messagebus-spi/src/main/java/org/springframework/xd/dirt/integration/bus/MessageBusSupport.java @@ -197,6 +197,35 @@ protected DirectChannel createSharedChannel(String name) { protected volatile boolean defaultCompress = false; + /** + * For bus implementations that support a prefix, apply the prefix + * to the name. + * @param prefix the prefix. + * @param name the name. + */ + public static String applyPrefix(String prefix, String name) { + return prefix + name; + } + + /** + * For bus implementations that include a pub/sub component in identifiers, + * construct the name. + * @param prefix the prefix. + * @param name the name. + */ + public static String applyPubSub(String name) { + return "topic." + name; + } + + /** + * For bus implementations that support dead lettering, construct the name of the + * dead letter entity for the underlying pipe name. + * @param name the name. + */ + public static String constructDLQName(String name) { + return name + ".dlq"; + } + @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Assert.isInstanceOf(AbstractApplicationContext.class, applicationContext); @@ -333,6 +362,7 @@ public void afterPropertiesSet() throws Exception { protected void onInit() { } + /** * Dynamically create a producer for the named channel. * @param name The name. diff --git a/spring-xd-shell/src/test/java/org/springframework/xd/shell/AbstractShellIntegrationTest.java b/spring-xd-shell/src/test/java/org/springframework/xd/shell/AbstractShellIntegrationTest.java index 530691685..4ce18ff6b 100644 --- a/spring-xd-shell/src/test/java/org/springframework/xd/shell/AbstractShellIntegrationTest.java +++ b/spring-xd-shell/src/test/java/org/springframework/xd/shell/AbstractShellIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2014 the original author or authors. + * Copyright 2013-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,6 +59,7 @@ * @author Kashyap Parikh * @author David Turanski * @author Ilayaperumal Gopinathan + * @author Gary Russell */ public abstract class AbstractShellIntegrationTest { @@ -81,6 +82,8 @@ public abstract class AbstractShellIntegrationTest { protected Random random = new Random(); + protected static int adminPort; + /** * Used to capture currently executing test method. */ @@ -94,7 +97,9 @@ public static synchronized void startUp() throws InterruptedException, IOExcepti application = new SingleNodeApplication().run("--transport", "local", "--analytics", "redis"); integrationTestSupport = new SingleNodeIntegrationTestSupport(application); integrationTestSupport.addModuleRegistry(new ArchiveModuleRegistry("classpath:/spring-xd/xd/modules")); - Bootstrap bootstrap = new Bootstrap(new String[] { "--port", randomConfigSupport.getAdminServerPort() }); + String adminServerPort = randomConfigSupport.getAdminServerPort(); + adminPort = Integer.valueOf(adminServerPort); + Bootstrap bootstrap = new Bootstrap(new String[] { "--port", adminServerPort }); shell = bootstrap.getJLineShellComponent(); } if (!shell.isRunning()) { @@ -185,4 +190,3 @@ protected CommandResult executeCommandExpectingFailure(String command) { } } - diff --git a/spring-xd-shell/src/test/java/org/springframework/xd/shell/bus/RabbitBusCleanerIntegrationTests.java b/spring-xd-shell/src/test/java/org/springframework/xd/shell/bus/RabbitBusCleanerIntegrationTests.java new file mode 100644 index 000000000..03911d368 --- /dev/null +++ b/spring-xd-shell/src/test/java/org/springframework/xd/shell/bus/RabbitBusCleanerIntegrationTests.java @@ -0,0 +1,91 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.xd.shell.bus; + +import static org.junit.Assert.assertEquals; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.junit.Rule; +import org.junit.Test; + +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; +import org.springframework.xd.dirt.integration.bus.MessageBusSupport; +import org.springframework.xd.dirt.plugins.AbstractStreamPlugin; +import org.springframework.xd.shell.AbstractShellIntegrationTest; +import org.springframework.xd.test.rabbit.RabbitAdminTestSupport; +import org.springframework.xd.test.rabbit.RabbitTestSupport; + + +/** + * + * @author Gary Russell + * @since 1.2 + */ +public class RabbitBusCleanerIntegrationTests extends AbstractShellIntegrationTest { + + @Rule + public RabbitAdminTestSupport adminTest = new RabbitAdminTestSupport(); + + @Rule + public RabbitTestSupport test = new RabbitTestSupport(); + + @SuppressWarnings("unchecked") + @Test + public void testClean() throws Exception { + RabbitAdmin admin = new RabbitAdmin(test.getResource()); + final String uuid = UUID.randomUUID().toString(); + String queueName = MessageBusSupport.applyPrefix("xdbus.", + AbstractStreamPlugin.constructPipeName(uuid, 0)); + admin.declareQueue(new Queue(queueName)); + final FanoutExchange fanout = new FanoutExchange( + MessageBusSupport.applyPrefix("xdbus.", MessageBusSupport.applyPubSub( + AbstractStreamPlugin.constructTapPrefix(uuid) + ".foo.bar"))); + admin.declareExchange(fanout); + RestTemplate template = new RestTemplate(); + URI uri = new URI("http://localhost:" + adminPort + "/streams/clean/rabbit/" + queueName.substring(6)); + RequestEntity request = new RequestEntity<>(HttpMethod.DELETE, uri); + HttpStatus status = HttpStatus.NO_CONTENT; + ResponseEntity reply = null; + int n = 0; + while (n++ < 100 && !status.equals(HttpStatus.OK)) { + reply = template.exchange(request, Map.class); + status = reply.getStatusCode(); + Thread.sleep(100); + } + assertEquals("Didn't get OK after 10 seconds", HttpStatus.OK, reply.getStatusCode()); + Map> body = (Map>) reply.getBody(); + assertEquals(2, body.size()); + List queues = body.get("queues"); + assertEquals(queueName, queues.get(0)); + List exchanges = body.get("exchanges"); + assertEquals(fanout.getName(), exchanges.get(0)); + reply = template.exchange(request, Map.class); + assertEquals(HttpStatus.NO_CONTENT, reply.getStatusCode()); + } + +} diff --git a/spring-xd-test/src/main/java/org/springframework/xd/test/rabbit/RabbitAdminTestSupport.java b/spring-xd-test/src/main/java/org/springframework/xd/test/rabbit/RabbitAdminTestSupport.java new file mode 100644 index 000000000..2ec34a4bf --- /dev/null +++ b/spring-xd-test/src/main/java/org/springframework/xd/test/rabbit/RabbitAdminTestSupport.java @@ -0,0 +1,59 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.xd.test.rabbit; + + +import org.junit.Rule; + +import org.springframework.http.HttpStatus; +import org.springframework.web.client.HttpClientErrorException; +import org.springframework.web.client.RestTemplate; +import org.springframework.xd.test.AbstractExternalResourceTestSupport; + +import com.sun.javafx.collections.MappingChange.Map; + +/** + * JUnit {@link Rule} that detects the fact that RabbitMQ is available on localhost with + * the management plugin enabled. + * + * @author Gary Russell + * @since 1.2 + */ +public class RabbitAdminTestSupport extends AbstractExternalResourceTestSupport { + + public RabbitAdminTestSupport() { + super("RABBITADMIN"); + } + + @Override + protected void obtainResource() throws Exception { + resource = new RestTemplate(); + try { + resource.getForObject("http://localhost:15672/api/overview", Map.class); + } + catch (HttpClientErrorException e) { + if (e.getStatusCode() != HttpStatus.UNAUTHORIZED) { + throw e; + } + } + } + + @Override + protected void cleanupResource() throws Exception { + } + +}