From 39fef2c70a3de0e631cd83aab7bdcec8b60ccb95 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 28 Nov 2018 14:03:02 -0500 Subject: [PATCH] INT-4560: Fix Race in FileSystemPersistentAOFLF JIRA: https://jira.spring.io/browse/INT-4560 Reproduced and tested with ```java @SpringBootApplication public class So53521593Application { private static final Logger logger = LoggerFactory.getLogger(So53521593Application.class); public static void main(String[] args) { SpringApplication.run(So53521593Application.class, args); } @Bean public IntegrationFlow flow() { ExecutorService exec = Executors.newFixedThreadPool(10); return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter( new MyFilter(new SimpleMetadataStore(), "foo")), e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS) .maxMessagesPerPoll(10))) .channel(MessageChannels.executor(exec)) .handle((p, h) -> { try { p.delete(); logger.info(p.toString()); Thread.sleep(10_000); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } return null; }) .get(); } } class MyFilter extends FileSystemPersistentAcceptOnceFileListFilter { public MyFilter(ConcurrentMetadataStore store, String prefix) { super(store, prefix); } @Override protected long modified(File file) { long modified = super.modified(file); System.out.println(modified); return modified; } } ``` **cherry-pick to 5.0.x, 4.3.x** --- ...AbstractPersistentAcceptOnceFileListFilter.java | 14 ++++++++++++-- ...leSystemPersistentAcceptOnceFileListFilter.java | 11 +++++++++++ .../PersistentAcceptOnceFileListFilterTests.java | 10 +++++++++- 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractPersistentAcceptOnceFileListFilter.java b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractPersistentAcceptOnceFileListFilter.java index 406c20d2347..a3027ef83a6 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractPersistentAcceptOnceFileListFilter.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractPersistentAcceptOnceFileListFilter.java @@ -77,17 +77,27 @@ public boolean accept(F file) { String oldValue = this.store.putIfAbsent(key, newValue); if (oldValue == null) { // not in store flushIfNeeded(); - return true; + return fileStillExists(file); } // same value in store if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) { flushIfNeeded(); - return true; + return fileStillExists(file); } return false; } } + /** + * Check if the file still exists; default implementation returns true. + * @param file the file. + * @return true if the filter should return true. + * @since 4.3.19 + */ + protected boolean fileStillExists(F file) { + return true; + } + /** * {@inheritDoc} * @since 4.0.4 diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/FileSystemPersistentAcceptOnceFileListFilter.java b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/FileSystemPersistentAcceptOnceFileListFilter.java index e0b68ca6cb5..c8a1eb4cd87 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/FileSystemPersistentAcceptOnceFileListFilter.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/FileSystemPersistentAcceptOnceFileListFilter.java @@ -42,4 +42,15 @@ protected String fileName(File file) { return file.getAbsolutePath(); } + /** + * Check that the file still exists, to avoid a race condition when multi-threaded and + * another thread removed the file while we were waiting for the lock. + * @since 4.3.19 + */ + @Override + protected boolean fileStillExists(File file) { + return file.exists(); + } + + } diff --git a/spring-integration-file/src/test/java/org/springframework/integration/file/filters/PersistentAcceptOnceFileListFilterTests.java b/spring-integration-file/src/test/java/org/springframework/integration/file/filters/PersistentAcceptOnceFileListFilterTests.java index ccc1f18a2c5..83bc9e5caf8 100644 --- a/spring-integration-file/src/test/java/org/springframework/integration/file/filters/PersistentAcceptOnceFileListFilterTests.java +++ b/spring-integration-file/src/test/java/org/springframework/integration/file/filters/PersistentAcceptOnceFileListFilterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2015 the original author or authors. + * Copyright 2013-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -126,6 +126,11 @@ public void testRollbackFileSystem() throws Exception { new SimpleMetadataStore(), "rollback:"); File[] files = new File[] {new File("foo"), new File("bar"), new File("baz")}; List passed = filter.filterFiles(files); + assertEquals(0, passed.size()); + for (File file : files) { + file.createNewFile(); + } + passed = filter.filterFiles(files); assertTrue(Arrays.equals(files, passed.toArray())); List now = filter.filterFiles(files); assertEquals(0, now.size()); @@ -137,6 +142,9 @@ public void testRollbackFileSystem() throws Exception { now = filter.filterFiles(files); assertEquals(0, now.size()); filter.close(); + for (File file : files) { + file.delete(); + } } @Test