Skip to content

Commit

Permalink
INT-4560: Fix Race in FileSystemPersistentAOFLF
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-4560

Reproduced and tested with

```java
@SpringBootApplication
public class So53521593Application {

	private static final Logger logger = LoggerFactory.getLogger(So53521593Application.class);

	public static void main(String[] args) {
		SpringApplication.run(So53521593Application.class, args);
	}

	@bean
	public IntegrationFlow flow() {
		ExecutorService exec = Executors.newFixedThreadPool(10);
		return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
					new MyFilter(new SimpleMetadataStore(), "foo")),
						e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
								.maxMessagesPerPoll(10)))
				.channel(MessageChannels.executor(exec))
				.<File>handle((p, h) -> {
					try {
						p.delete();
						logger.info(p.toString());
						Thread.sleep(10_000);
					}
					catch (InterruptedException e1) {
						Thread.currentThread().interrupt();
					}
					return null;
				})
				.get();
	}
}

class MyFilter extends FileSystemPersistentAcceptOnceFileListFilter {

	public MyFilter(ConcurrentMetadataStore store, String prefix) {
		super(store, prefix);
	}

	@OverRide
	protected long modified(File file) {
		long modified = super.modified(file);
		System.out.println(modified);
		return modified;
	}

}
```

**cherry-pick to 5.0.x, 4.3.x**
  • Loading branch information
garyrussell authored and artembilan committed Nov 28, 2018
1 parent a4746f7 commit 39fef2c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
Expand Up @@ -77,17 +77,27 @@ public boolean accept(F file) {
String oldValue = this.store.putIfAbsent(key, newValue);
if (oldValue == null) { // not in store
flushIfNeeded();
return true;
return fileStillExists(file);
}
// same value in store
if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
flushIfNeeded();
return true;
return fileStillExists(file);
}
return false;
}
}

/**
* Check if the file still exists; default implementation returns true.
* @param file the file.
* @return true if the filter should return true.
* @since 4.3.19
*/
protected boolean fileStillExists(F file) {
return true;
}

/**
* {@inheritDoc}
* @since 4.0.4
Expand Down
Expand Up @@ -42,4 +42,15 @@ protected String fileName(File file) {
return file.getAbsolutePath();
}

/**
* Check that the file still exists, to avoid a race condition when multi-threaded and
* another thread removed the file while we were waiting for the lock.
* @since 4.3.19
*/
@Override
protected boolean fileStillExists(File file) {
return file.exists();
}


}
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2015 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -126,6 +126,11 @@ public void testRollbackFileSystem() throws Exception {
new SimpleMetadataStore(), "rollback:");
File[] files = new File[] {new File("foo"), new File("bar"), new File("baz")};
List<File> passed = filter.filterFiles(files);
assertEquals(0, passed.size());
for (File file : files) {
file.createNewFile();
}
passed = filter.filterFiles(files);
assertTrue(Arrays.equals(files, passed.toArray()));
List<File> now = filter.filterFiles(files);
assertEquals(0, now.size());
Expand All @@ -137,6 +142,9 @@ public void testRollbackFileSystem() throws Exception {
now = filter.filterFiles(files);
assertEquals(0, now.size());
filter.close();
for (File file : files) {
file.delete();
}
}

@Test
Expand Down

3 comments on commit 39fef2c

@muhdkhokhar
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this change will be available in the Maven central?

@artembilan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we are in release process for 5.0.10 and 5.1.1 versions.
Hopefully there are going to appear in Maven Central in a couple hours.

If you talk about 4.3.19, then it's not going to happen today.
You will need to copy/paste a fix into your own local FileSystemPersistentAcceptOnceFileListFilter class and use it accordingly in the channel adapters.

@muhdkhokhar
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

Please sign in to comment.