From e7423d08f33b3ce6ad9d08071362dcfc1c42e251 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 1 Mar 2017 09:26:38 -0500 Subject: [PATCH] INT-4237: FWMH: Acquire Lock Before Flushing JIRA: https://jira.spring.io/browse/INT-4237 It was possible to flush (close) the file while a write was in process; more likely when `flushWhenIdle` is false. This probably would not occur in the real world, just tests with short flush intervals, but certainly possible. There is already a lock used to prevent concurrent writes while appending; use the same lock when flushing. --- .../util/WhileLockedProcessor.java | 6 +- .../file/FileWritingMessageHandler.java | 55 +++++++++++++++---- .../file/FileWritingMessageHandlerTests.java | 49 +++++++++++++++++ 3 files changed, 97 insertions(+), 13 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/WhileLockedProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/util/WhileLockedProcessor.java index 93cfb727906..9a75006c840 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/util/WhileLockedProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/WhileLockedProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,13 +34,16 @@ * */ public abstract class WhileLockedProcessor { + private final Object key; + private final LockRegistry lockRegistry; public WhileLockedProcessor(LockRegistry lockRegistry, Object key) { this.key = key; this.lockRegistry = lockRegistry; } + public final void doWhileLocked() throws IOException { Lock lock = this.lockRegistry.obtain(this.key); try { @@ -65,4 +68,5 @@ public final void doWhileLocked() throws IOException { * @throws IOException Any IOException. */ protected abstract void whileLocked() throws IOException; + } diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java b/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java index 8f5f0df1ce9..f61be27cf3f 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.locks.Lock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -813,10 +814,12 @@ private synchronized FileState getFileState(final File fileToWriteTo, boolean is } if (state == null) { if (isString) { - state = new FileState(createWriter(fileToWriteTo, true)); + state = new FileState(createWriter(fileToWriteTo, true), + this.lockRegistry.obtain(fileToWriteTo.getAbsolutePath())); } else { - state = new FileState(createOutputStream(fileToWriteTo, true)); + state = new FileState(createOutputStream(fileToWriteTo, true), + this.lockRegistry.obtain(fileToWriteTo.getAbsolutePath())); } this.fileStates.put(absolutePath, state); } @@ -828,12 +831,27 @@ private synchronized FileState getFileState(final File fileToWriteTo, boolean is return state; } - private BufferedWriter createWriter(final File fileToWriteTo, final boolean append) throws FileNotFoundException { + /** + * Create a buffered writer for the file, for String payloads. + * @param fileToWriteTo the file. + * @param append true if we are appending. + * @return the writer. + * @throws FileNotFoundException if the file does not exist. + * @since 4.3.8 + */ + protected BufferedWriter createWriter(final File fileToWriteTo, final boolean append) throws FileNotFoundException { return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(fileToWriteTo, append), this.charset), this.bufferSize); } - private BufferedOutputStream createOutputStream(File fileToWriteTo, final boolean append) + /** + * Create a buffered output stream for the file. + * @param fileToWriteTo the file. + * @param append true if we are appending. + * @return the stream. + * @since 4.3.8 + */ + protected BufferedOutputStream createOutputStream(File fileToWriteTo, final boolean append) throws FileNotFoundException { return new BufferedOutputStream(new FileOutputStream(fileToWriteTo, append), this.bufferSize); } @@ -908,31 +926,44 @@ private static final class FileState { private final BufferedOutputStream stream; + private final Lock lock; + private final long firstWrite = System.currentTimeMillis(); private volatile long lastWrite; - FileState(BufferedWriter writer) { + FileState(BufferedWriter writer, Lock lock) { this.writer = writer; this.stream = null; + this.lock = lock; } - FileState(BufferedOutputStream stream) { + FileState(BufferedOutputStream stream, Lock lock) { this.writer = null; this.stream = stream; + this.lock = lock; } private void close() { try { - if (this.writer != null) { - this.writer.close(); + this.lock.lockInterruptibly(); + try { + if (this.writer != null) { + this.writer.close(); + } + else { + this.stream.close(); + } } - else { - this.stream.close(); + catch (IOException e) { + // ignore } } - catch (IOException e) { - // ignore + catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + finally { + this.lock.unlock(); } } } diff --git a/spring-integration-file/src/test/java/org/springframework/integration/file/FileWritingMessageHandlerTests.java b/spring-integration-file/src/test/java/org/springframework/integration/file/FileWritingMessageHandlerTests.java index c10a2082619..9bd073c2318 100644 --- a/spring-integration-file/src/test/java/org/springframework/integration/file/FileWritingMessageHandlerTests.java +++ b/spring-integration-file/src/test/java/org/springframework/integration/file/FileWritingMessageHandlerTests.java @@ -29,12 +29,17 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.startsWith; +import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; @@ -46,6 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; +import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.commons.logging.Log; import org.junit.Before; import org.junit.Ignore; @@ -518,6 +524,49 @@ public void noFlushAppend() throws Exception { handler.stop(); } + @Test + public void lockForFlush() throws Exception { + File tempFolder = this.temp.newFolder(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final BufferedOutputStream out = spy(new BufferedOutputStream(baos)); + FileWritingMessageHandler handler = new FileWritingMessageHandler(tempFolder) { + + @Override + protected BufferedOutputStream createOutputStream(File fileToWriteTo, boolean append) { + return out; + } + + }; + handler.setFileExistsMode(FileExistsMode.APPEND_NO_FLUSH); + handler.setFileNameGenerator(message -> "foo.txt"); + ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); + taskScheduler.afterPropertiesSet(); + handler.setTaskScheduler(taskScheduler); + handler.setOutputChannel(new NullChannel()); + handler.setBeanFactory(mock(BeanFactory.class)); + handler.setFlushInterval(10); + handler.setFlushWhenIdle(false); + handler.afterPropertiesSet(); + handler.start(); + + final AtomicBoolean writing = new AtomicBoolean(); + final AtomicBoolean closeWhileWriting = new AtomicBoolean(); + willAnswer(i -> { + writing.set(true); + Thread.sleep(500); + writing.set(false); + return null; + }).given(out).write(any(byte[].class), anyInt(), anyInt()); + willAnswer(i -> { + closeWhileWriting.compareAndSet(false, writing.get()); + return null; + }).given(out).close(); + handler.handleMessage(new GenericMessage<>("foo".getBytes())); + verify(out).write(any(byte[].class), anyInt(), anyInt()); + assertFalse(closeWhileWriting.get()); + handler.stop(); + } + @Test public void replaceIfDifferent() throws IOException { QueueChannel output = new QueueChannel();