Skip to content

Commit

Permalink
INT-4108: Fix idempotency for some Lifecycles
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-4108

Some `Lifecycle.start()/stop()` usage doesn't ensure robustness for components causing unexpected and difficulty tracing issues

* Fix `Lifecycle.start()/stop()` for `FileReadingMessageSource`, `FileWritingMessageHandler`, `AbstractMqttMessageHandler`
* In the `DefaultHeaderChannelRegistry`, `LockRegistryLeaderInitiator`, `MqttPahoMessageHandler` rework logic for shared variables to avoid `NPE`
* Increase receive timeouts in the `PayloadDeserializingTransformerParserTests` and `UdpChannelAdapterTests`
* Prove with the `WatchServiceDirectoryScannerTests` changes that several invocation for `FileReadingMessageSource.start()` are idempotent

**Cherry-pick to 4.3.x**
  • Loading branch information
artembilan committed Sep 13, 2016
1 parent 6790dbd commit 6c6e1d3
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 32 deletions.
Expand Up @@ -41,6 +41,7 @@
* The actual average expiry time will be 1.5x the delay.
*
* @author Gary Russell
* @author Artem Bilan
* @since 3.0
*
*/
Expand Down Expand Up @@ -135,6 +136,7 @@ public synchronized void stop() {
this.running = false;
if (this.reaperScheduledFuture != null) {
this.reaperScheduledFuture.cancel(true);
this.reaperScheduledFuture = null;
}
this.explicitlyStopped = true;
}
Expand Down Expand Up @@ -198,6 +200,7 @@ public MessageChannel channelNameToChannel(String name) {
public synchronized void runReaper() {
if (this.reaperScheduledFuture != null) {
this.reaperScheduledFuture.cancel(true);
this.reaperScheduledFuture = null;
}
this.run();
}
Expand Down
Expand Up @@ -54,6 +54,7 @@
* be useful.
*
* @author Dave Syer
* @author Artem Bilan
* @since 4.3.1
*/
public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBean, ApplicationEventPublisherAware {
Expand Down Expand Up @@ -277,7 +278,10 @@ public void stop() {
synchronized (this.lifecycleMonitor) {
if (this.running) {
this.running = false;
this.future.cancel(true);
if (this.future != null) {
this.future.cancel(true);
}
this.future = null;
logger.debug("Stopped LeaderInitiator");
}
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,7 +65,7 @@ public class PayloadDeserializingTransformerParserTests {
public void directChannelWithSerializedStringMessage() throws Exception {
byte[] bytes = serialize("foo");
directInput.send(new GenericMessage<byte[]>(bytes));
Message<?> result = output.receive(0);
Message<?> result = output.receive(10000);
assertNotNull(result);
assertTrue(result.getPayload() instanceof String);
assertEquals("foo", result.getPayload());
Expand All @@ -75,7 +75,7 @@ public void directChannelWithSerializedStringMessage() throws Exception {
public void queueChannelWithSerializedStringMessage() throws Exception {
byte[] bytes = serialize("foo");
queueInput.send(new GenericMessage<byte[]>(bytes));
Message<?> result = output.receive(3000);
Message<?> result = output.receive(10000);
assertNotNull(result);
assertTrue(result.getPayload() instanceof String);
assertEquals("foo", result.getPayload());
Expand All @@ -85,7 +85,7 @@ public void queueChannelWithSerializedStringMessage() throws Exception {
public void directChannelWithSerializedObjectMessage() throws Exception {
byte[] bytes = serialize(new TestBean());
directInput.send(new GenericMessage<byte[]>(bytes));
Message<?> result = output.receive(0);
Message<?> result = output.receive(10000);
assertNotNull(result);
assertEquals(TestBean.class, result.getPayload().getClass());
assertEquals("test", ((TestBean) result.getPayload()).name);
Expand All @@ -95,7 +95,7 @@ public void directChannelWithSerializedObjectMessage() throws Exception {
public void queueChannelWithSerializedObjectMessage() throws Exception {
byte[] bytes = serialize(new TestBean());
queueInput.send(new GenericMessage<byte[]>(bytes));
Message<?> result = output.receive(3000);
Message<?> result = output.receive(10000);
assertNotNull(result);
assertEquals(TestBean.class, result.getPayload().getClass());
assertEquals("test", ((TestBean) result.getPayload()).name);
Expand All @@ -110,7 +110,7 @@ public void invalidPayload() {
@Test
public void customDeserializer() throws Exception {
customDeserializerInput.send(new GenericMessage<byte[]>("test".getBytes("UTF-8")));
Message<?> result = output.receive(3000);
Message<?> result = output.receive(10000);
assertNotNull(result);
assertEquals(String.class, result.getPayload().getClass());
assertEquals("TEST", result.getPayload());
Expand Down Expand Up @@ -138,6 +138,7 @@ public static class TestDeserializer implements Deserializer<Object> {
public Object deserialize(InputStream source) throws IOException {
return FileCopyUtils.copyToString(new InputStreamReader(source, "UTF-8")).toUpperCase();
}

}

}
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -87,6 +88,8 @@ public class FileReadingMessageSource extends IntegrationObjectSupport implement

private static final Log logger = LogFactory.getLog(FileReadingMessageSource.class);

private final AtomicBoolean running = new AtomicBoolean();

/*
* {@link PriorityBlockingQueue#iterator()} throws
* {@link java.util.ConcurrentModificationException} in Java 5.
Expand All @@ -104,8 +107,6 @@ public class FileReadingMessageSource extends IntegrationObjectSupport implement

private volatile boolean scanEachPoll = false;

private volatile boolean running;

private FileListFilter<File> filter;

private FileLocker locker;
Expand Down Expand Up @@ -281,7 +282,7 @@ public void setUseWatchService(boolean useWatchService) {
public void setWatchEvents(WatchEventType... watchEvents) {
Assert.notEmpty(watchEvents, "'watchEvents' must not be empty.");
Assert.noNullElements(watchEvents, "'watchEvents' must not contain null elements.");
Assert.state(!this.running, "Cannot change watch events while running.");
Assert.state(!this.running.get(), "Cannot change watch events while running.");

this.watchEvents = Arrays.copyOf(watchEvents, watchEvents.length);
}
Expand All @@ -293,23 +294,21 @@ public String getComponentType() {

@Override
public void start() {
if (this.scanner instanceof Lifecycle) {
if (!this.running.getAndSet(true) && this.scanner instanceof Lifecycle) {
((Lifecycle) this.scanner).start();
}
this.running = true;
}

@Override
public void stop() {
if (this.scanner instanceof Lifecycle) {
((Lifecycle) this.scanner).start();
if (this.running.getAndSet(false) && this.scanner instanceof Lifecycle) {
((Lifecycle) this.scanner).stop();
}
this.running = false;
}

@Override
public boolean isRunning() {
return this.running;
return this.running.get();
}

@Override
Expand Down Expand Up @@ -444,6 +443,7 @@ public void stop() {
try {
this.watcher.close();
this.watcher = null;
this.pathKeys.clear();
}
catch (IOException e) {
logger.error("Failed to close watcher for " + FileReadingMessageSource.this.directory, e);
Expand Down
Expand Up @@ -364,7 +364,7 @@ protected void doInit() {

@Override
public void start() {
if (FileExistsMode.APPEND_NO_FLUSH.equals(this.fileExistsMode)) {
if (this.flushTask == null && FileExistsMode.APPEND_NO_FLUSH.equals(this.fileExistsMode)) {
TaskScheduler taskScheduler = getTaskScheduler();
Assert.state(taskScheduler != null, "'taskScheduler' is required for FileExistsMode.APPEND_NO_FLUSH");
this.flushTask = taskScheduler.scheduleAtFixedRate(new Flusher(), this.flushInterval / 3);
Expand Down
Expand Up @@ -105,6 +105,7 @@ public boolean remove(File fileToRemove) {
assertTrue(files.contains(top1));
assertTrue(files.contains(foo1));
assertTrue(files.contains(bar1));
fileReadingMessageSource.start();
File top2 = this.folder.newFile();
File foo2 = File.createTempFile("foo", ".txt", this.foo);
File bar2 = File.createTempFile("bar", ".txt", this.bar);
Expand All @@ -130,6 +131,7 @@ public boolean remove(File fileToRemove) {
var1 = StandardWatchEventKinds.OVERFLOW;
}
*/
fileReadingMessageSource.start();
List<File> filesForOverflow = new ArrayList<File>(600);

for (int i = 0; i < 600; i++) {
Expand Down
Expand Up @@ -197,7 +197,7 @@ public void testUnicastReceiverWithReply() throws Exception {
e.printStackTrace();
}
});
Message<byte[]> receivedMessage = (Message<byte[]>) channel.receive(2000);
Message<byte[]> receivedMessage = (Message<byte[]>) channel.receive(10000);
assertEquals(new String(message.getPayload()), new String(receivedMessage.getPayload()));
String replyString = "reply:" + System.currentTimeMillis();
byte[] replyBytes = replyString.getBytes();
Expand Down Expand Up @@ -241,7 +241,7 @@ public void testUnicastSender() throws Exception {
handler.start();
Message<byte[]> message = MessageBuilder.withPayload("ABCD".getBytes()).build();
handler.handleMessage(message);
Message<byte[]> receivedMessage = (Message<byte[]>) channel.receive(2000);
Message<byte[]> receivedMessage = (Message<byte[]>) channel.receive(10000);
assertEquals(new String(message.getPayload()), new String(receivedMessage.getPayload()));
adapter.stop();
handler.stop();
Expand All @@ -268,7 +268,7 @@ public void testMulticastReceiver() throws Exception {
datagramSocket.send(packet);
datagramSocket.close();

Message<byte[]> receivedMessage = (Message<byte[]>) channel.receive(2000);
Message<byte[]> receivedMessage = (Message<byte[]>) channel.receive(10000);
assertNotNull(receivedMessage);
assertEquals(new String(message.getPayload()), new String(receivedMessage.getPayload()));
adapter.stop();
Expand All @@ -292,7 +292,7 @@ public void testMulticastSender() throws Exception {
Message<byte[]> message = MessageBuilder.withPayload("ABCD".getBytes()).build();
handler.handleMessage(message);

Message<byte[]> receivedMessage = (Message<byte[]>) channel.receive(2000);
Message<byte[]> receivedMessage = (Message<byte[]>) channel.receive(10000);
assertNotNull(receivedMessage);
assertEquals(new String(message.getPayload()), new String(receivedMessage.getPayload()));
adapter.stop();
Expand Down Expand Up @@ -320,7 +320,7 @@ public void testUnicastReceiverException() throws Exception {
DatagramSocket datagramSocket = new DatagramSocket(0);
datagramSocket.send(packet);
datagramSocket.close();
Message<?> receivedMessage = errorChannel.receive(2000);
Message<?> receivedMessage = errorChannel.receive(10000);
assertNotNull(receivedMessage);
assertEquals("Failed", ((Exception) receivedMessage.getPayload()).getCause().getMessage());
adapter.stop();
Expand Down
Expand Up @@ -16,6 +16,8 @@

package org.springframework.integration.mqtt.outbound;

import java.util.concurrent.atomic.AtomicBoolean;

import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
Expand All @@ -29,11 +31,14 @@
* Abstract class for MQTT outbound channel adapters.
*
* @author Gary Russell
* @author Artem Bilan
* @since 4.0
*
*/
public abstract class AbstractMqttMessageHandler extends AbstractMessageHandler implements Lifecycle {

private final AtomicBoolean running = new AtomicBoolean();

private final String url;

private final String clientId;
Expand All @@ -46,8 +51,6 @@ public abstract class AbstractMqttMessageHandler extends AbstractMessageHandler

private volatile MessageConverter converter;

private boolean running;

private volatile int clientInstance;

public AbstractMqttMessageHandler(String url, String clientId) {
Expand Down Expand Up @@ -113,23 +116,25 @@ protected void onInit() throws Exception {

@Override
public final void start() {
this.doStart();
this.running = true;
if (!this.running.getAndSet(true)) {
doStart();
}
}

protected abstract void doStart();

@Override
public final void stop() {
this.doStop();
this.running = false;
if (this.running.getAndSet(false)) {
doStop();
}
}

protected abstract void doStop();

@Override
public boolean isRunning() {
return this.running;
return this.running.get();
}

@Override
Expand Down
Expand Up @@ -38,6 +38,7 @@
* Eclipse Paho implementation.
*
* @author Gary Russell
* @author Artem Bilan
* @since 4.0
*
*/
Expand Down Expand Up @@ -144,9 +145,10 @@ protected void doStart() {
@Override
protected void doStop() {
try {
if (this.client != null) {
this.client.disconnect().waitForCompletion(this.completionTimeout);
this.client.close();
IMqttAsyncClient client = this.client;
if (client != null) {
client.disconnect().waitForCompletion(this.completionTimeout);
client.close();
this.client = null;
}
}
Expand Down

0 comments on commit 6c6e1d3

Please sign in to comment.