Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,13 +34,16 @@
*
*/
public abstract class WhileLockedProcessor {

private final Object key;

private final LockRegistry lockRegistry;

public WhileLockedProcessor(LockRegistry lockRegistry, Object key) {
this.key = key;
this.lockRegistry = lockRegistry;
}

public final void doWhileLocked() throws IOException {
Lock lock = this.lockRegistry.obtain(this.key);
try {
Expand All @@ -65,4 +68,5 @@ public final void doWhileLocked() throws IOException {
* @throws IOException Any IOException.
*/
protected abstract void whileLocked() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -813,10 +814,12 @@ private synchronized FileState getFileState(final File fileToWriteTo, boolean is
}
if (state == null) {
if (isString) {
state = new FileState(createWriter(fileToWriteTo, true));
state = new FileState(createWriter(fileToWriteTo, true),
this.lockRegistry.obtain(fileToWriteTo.getAbsolutePath()));
}
else {
state = new FileState(createOutputStream(fileToWriteTo, true));
state = new FileState(createOutputStream(fileToWriteTo, true),
this.lockRegistry.obtain(fileToWriteTo.getAbsolutePath()));
}
this.fileStates.put(absolutePath, state);
}
Expand All @@ -828,12 +831,27 @@ private synchronized FileState getFileState(final File fileToWriteTo, boolean is
return state;
}

private BufferedWriter createWriter(final File fileToWriteTo, final boolean append) throws FileNotFoundException {
/**
* Create a buffered writer for the file, for String payloads.
* @param fileToWriteTo the file.
* @param append true if we are appending.
* @return the writer.
* @throws FileNotFoundException if the file does not exist.
* @since 4.3.8
*/
protected BufferedWriter createWriter(final File fileToWriteTo, final boolean append) throws FileNotFoundException {
return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(fileToWriteTo, append), this.charset),
this.bufferSize);
}

private BufferedOutputStream createOutputStream(File fileToWriteTo, final boolean append)
/**
* Create a buffered output stream for the file.
* @param fileToWriteTo the file.
* @param append true if we are appending.
* @return the stream.
* @since 4.3.8
*/
protected BufferedOutputStream createOutputStream(File fileToWriteTo, final boolean append)
throws FileNotFoundException {
return new BufferedOutputStream(new FileOutputStream(fileToWriteTo, append), this.bufferSize);
}
Expand Down Expand Up @@ -908,31 +926,44 @@ private static final class FileState {

private final BufferedOutputStream stream;

private final Lock lock;

private final long firstWrite = System.currentTimeMillis();

private volatile long lastWrite;

FileState(BufferedWriter writer) {
FileState(BufferedWriter writer, Lock lock) {
this.writer = writer;
this.stream = null;
this.lock = lock;
}

FileState(BufferedOutputStream stream) {
FileState(BufferedOutputStream stream, Lock lock) {
this.writer = null;
this.stream = stream;
this.lock = lock;
}

private void close() {
try {
if (this.writer != null) {
this.writer.close();
this.lock.lockInterruptibly();
try {
if (this.writer != null) {
this.writer.close();
}
else {
this.stream.close();
}
}
else {
this.stream.close();
catch (IOException e) {
// ignore
}
}
catch (IOException e) {
// ignore
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
finally {
this.lock.unlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
Expand All @@ -46,6 +51,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;

import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.logging.Log;
import org.junit.Before;
import org.junit.Ignore;
Expand Down Expand Up @@ -518,6 +524,49 @@ public void noFlushAppend() throws Exception {
handler.stop();
}

@Test
public void lockForFlush() throws Exception {
File tempFolder = this.temp.newFolder();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final BufferedOutputStream out = spy(new BufferedOutputStream(baos));
FileWritingMessageHandler handler = new FileWritingMessageHandler(tempFolder) {

@Override
protected BufferedOutputStream createOutputStream(File fileToWriteTo, boolean append) {
return out;
}

};
handler.setFileExistsMode(FileExistsMode.APPEND_NO_FLUSH);
handler.setFileNameGenerator(message -> "foo.txt");
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
handler.setTaskScheduler(taskScheduler);
handler.setOutputChannel(new NullChannel());
handler.setBeanFactory(mock(BeanFactory.class));
handler.setFlushInterval(10);
handler.setFlushWhenIdle(false);
handler.afterPropertiesSet();
handler.start();

final AtomicBoolean writing = new AtomicBoolean();
final AtomicBoolean closeWhileWriting = new AtomicBoolean();
willAnswer(i -> {
writing.set(true);
Thread.sleep(500);
writing.set(false);
return null;
}).given(out).write(any(byte[].class), anyInt(), anyInt());
willAnswer(i -> {
closeWhileWriting.compareAndSet(false, writing.get());
return null;
}).given(out).close();
handler.handleMessage(new GenericMessage<>("foo".getBytes()));
verify(out).write(any(byte[].class), anyInt(), anyInt());
assertFalse(closeWhileWriting.get());
handler.stop();
}

@Test
public void replaceIfDifferent() throws IOException {
QueueChannel output = new QueueChannel();
Expand Down