Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions src/main/java/ch/powerunit/extensions/async/impl/FilePool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package ch.powerunit.extensions.async.impl;

import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.function.Predicate;

public final class FilePool implements Callable<Collection<WatchEvent<Path>>> {

private static final Predicate<WatchEvent<?>> IGNORE_OVERFLOW = e -> !Objects.equals(e.kind(), OVERFLOW);

@SuppressWarnings("unchecked")
private static final Function<WatchEvent<?>, WatchEvent<Path>> COERCE_TO_PATH = e -> (WatchEvent<Path>) e;

private final Path directory;
private final WatchEvent.Kind<?> events[];

private WatchService watcher; // Late init
private WatchKey key; // Late init

public FilePool(Path directory, WatchEvent.Kind<?>... events) {
this.directory = directory;
this.events = events;
}

@Override
public Collection<WatchEvent<Path>> call() throws Exception {
initIfNeeded();
try {
return key.pollEvents().stream().filter(IGNORE_OVERFLOW).map(COERCE_TO_PATH)
.collect(collectingAndThen(toList(), Collections::unmodifiableList));
} finally {
key.reset();
}
}

public void close() {
Optional.ofNullable(watcher).ifPresent(FilePool::safeCloseWatchService);
}

private void initIfNeeded() throws IOException {
if (watcher == null) {
watcher = directory.getFileSystem().newWatchService();
key = directory.register(watcher, events);
}
}

private static void safeCloseWatchService(WatchService watcher) {
try {
watcher.close();
} catch (IOException e) {
// ignore
}
}

}
124 changes: 124 additions & 0 deletions src/main/java/ch/powerunit/extensions/async/lang/WaitFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/**
* Powerunit - A JDK1.8 test framework
* Copyright (C) 2014 Mathieu Boretti.
*
* This file is part of Powerunit
*
* Powerunit is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Powerunit is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Powerunit. If not, see <http://www.gnu.org/licenses/>.
*/
package ch.powerunit.extensions.async.lang;

import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;

import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.nio.file.WatchEvent.Kind;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.Callable;

import ch.powerunit.extensions.async.impl.FilePool;

/**
* This class provides methods to wait for fileSystem events.
*
* @since 1.1.0
*
*/
public final class WaitFile {
private WaitFile() {
}

/**
* Wait for a folder to have some event.
* <p>
* The wait starts at the first try to get the result.
*
* @param directory
* the directory to be verified.
* @param events
* the events to wait for.
* @return {@link WaitResultBuilder1} the next step of the builder.
*/
@SafeVarargs
public static WaitResultBuilder1<Collection<WatchEvent<Path>>> eventIn(Path directory, Kind<Path>... events) {
requireNonNull(directory, "directory can't be null");
FilePool filePool = new FilePool(directory, events);
return WaitResult.of(filePool, filePool::close);
}

/**
* Wait for a folder to contains new entry.
* <p>
* The wait starts at the first try to get the result.
*
* @param directory
* the directory to be verified.
* @return {@link WaitResultBuilder1} the next step of the builder.
*/
public static WaitResultBuilder1<Collection<Path>> newFileIn(Path directory) {
requireNonNull(directory, "directory can't be null");
FilePool filePool = new FilePool(directory, ENTRY_CREATE);
return WaitResult.of(toPathCollection(filePool), filePool::close);
}

/**
* Wait for a folder to contains new entry based on his name.
* <p>
* The wait starts at the first try to get the result.
*
* @param directory
* the directory to be verified.
* @param name
* the expected name
* @return {@link WaitResultBuilder1} the next step of the builder.
*/
public static WaitResultBuilder1<Path> newFileNamedIn(Path directory, String name) {
requireNonNull(directory, "directory can't be null");
requireNonNull(name, "name can't be null");
FilePool filePool = new FilePool(directory, ENTRY_CREATE);
return WaitResult.of(toPathByName(toPathCollection(filePool), name), filePool::close);
}

/**
* Wait for a folder to have entry removed.
* <p>
* The wait starts at the first try to get the result.
*
* @param directory
* the directory to be verified.
* @return {@link WaitResultBuilder1} the next step of the builder.
*/
public static WaitResultBuilder1<Collection<Path>> removeFileFrom(Path directory) {
requireNonNull(directory, "directory can't be null");
FilePool filePool = new FilePool(directory, ENTRY_DELETE);
return WaitResult.of(toPathCollection(filePool), filePool::close);
}

private static Callable<Collection<Path>> toPathCollection(Callable<Collection<WatchEvent<Path>>> callable) {
return () -> callable.call().stream().map(WatchEvent::context)
.collect(collectingAndThen(toList(), Collections::unmodifiableList));
}

private static Callable<Path> toPathByName(Callable<Collection<Path>> callable, String name) {
return () -> callable.call().stream()
.filter(p -> Objects.equals(p.getName(p.getNameCount() - 1).toString(), name)).findFirst().orElse(null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ default Optional<T> finish() {
try {
return asyncExec().get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof AssertionError) {
throw (AssertionError) e.getCause();
}
throw new AssertionError("Unexpected error " + e.getMessage(), e);
throw Optional.ofNullable(e.getCause()).filter(c -> c instanceof AssertionError)
.map(AssertionError.class::cast)
.orElseGet(() -> new AssertionError("Unexpected error " + e.getMessage(), e));
}
}

Expand Down
119 changes: 119 additions & 0 deletions src/test/java/ch/powerunit/extensions/async/lang/WaitFileTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* Powerunit - A JDK1.8 test framework
* Copyright (C) 2014 Mathieu Boretti.
*
* This file is part of Powerunit
*
* Powerunit is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Powerunit is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Powerunit. If not, see <http://www.gnu.org/licenses/>.
*/
package ch.powerunit.extensions.async.lang;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import ch.powerunit.Rule;
import ch.powerunit.Test;
import ch.powerunit.TestSuite;
import ch.powerunit.rules.TemporaryFolder;

public class WaitFileTest implements TestSuite {

@Rule
public final TemporaryFolder folder = temporaryFolder();

@Test
public void testFound() throws IOException, InterruptedException {
Path test = folder.newFolder();
CompletableFuture<Optional<Collection<Path>>> wait = WaitFile.newFileIn(test).expecting(l -> !l.isEmpty())
.repeat(5).everySecond().usingDefaultExecutor().asyncExec();
Thread.sleep(1010);
new File(test.toFile(), "test").mkdir();
assertThat(wait.join()).is(optionalIsPresent());
}

@Test
public void testFoundNamed() throws IOException, InterruptedException {
Path test = folder.newFolder();
CompletableFuture<Optional<Path>> wait = WaitFile.newFileNamedIn(test, "test2").expecting(l -> true).repeat(5)
.everySecond().usingDefaultExecutor().asyncExec();
Thread.sleep(500);
new File(test.toFile(), "test").mkdir();
Thread.sleep(700);
new File(test.toFile(), "test2").mkdir();
assertThat(wait.join()).is(optionalIsPresent());
}

@Test
public void testNeverFound() throws IOException, InterruptedException {
Path test = folder.newFolder();
CompletableFuture<Optional<Collection<Path>>> wait = WaitFile.newFileIn(test).expecting(l -> !l.isEmpty())
.repeat(2).everySecond().usingDefaultExecutor().asyncExec();
assertThat(wait.join()).is(optionalIsNotPresent());
}

@Test
public void testNeverFoundButCreateBeforeStartOfExecution() throws IOException, InterruptedException {
Path test = folder.newFolder();
WaitResultBuilder6<Collection<Path>> wait = WaitFile.newFileIn(test).expecting(l -> !l.isEmpty()).repeat(2)
.everySecond().usingDefaultExecutor();
new File(test.toFile(), "test").mkdir();
assertThat(wait.join()).is(optionalIsNotPresent());
}

@Test
public void testRemoved() throws IOException, InterruptedException {
Path test = folder.newFolder();
new File(test.toFile(), "test").mkdir();
CompletableFuture<Optional<Collection<Path>>> wait = WaitFile.removeFileFrom(test).expecting(l -> !l.isEmpty())
.repeat(3).everySecond().usingDefaultExecutor().asyncExec();
Thread.sleep(1010);
new File(test.toFile(), "test").delete();
assertThat(wait.join()).is(optionalIsPresent());
}

@Test
public void testNeverRemoved() throws IOException, InterruptedException {
Path test = folder.newFolder();
CompletableFuture<Optional<Collection<Path>>> wait = WaitFile.removeFileFrom(test).expecting(l -> !l.isEmpty())
.repeat(2).everySecond().usingDefaultExecutor().asyncExec();
assertThat(wait.join()).is(optionalIsNotPresent());
}

@Test
public void testEventFound() throws IOException, InterruptedException {
Path test = folder.newFolder();
CompletableFuture<Optional<Collection<WatchEvent<Path>>>> wait = WaitFile
.eventIn(test, StandardWatchEventKinds.ENTRY_CREATE).expecting(l -> !l.isEmpty()).repeat(3)
.everySecond().usingDefaultExecutor().asyncExec();
Thread.sleep(1010);
new File(test.toFile(), "test").mkdir();
assertThat(wait.join()).is(optionalIsPresent());
}

@Test
public void testEventNeverFound() throws IOException, InterruptedException {
Path test = folder.newFolder();
CompletableFuture<Optional<Collection<WatchEvent<Path>>>> wait = WaitFile
.eventIn(test, StandardWatchEventKinds.ENTRY_CREATE).expecting(l -> !l.isEmpty()).repeat(2)
.everySecond().usingDefaultExecutor().asyncExec();
assertThat(wait.join()).is(optionalIsNotPresent());
}

}