diff --git a/client/src/main/java/org/jboss/fuse/mvnd/client/DaemonClientConnection.java b/client/src/main/java/org/jboss/fuse/mvnd/client/DaemonClientConnection.java index fea05bc86..c4aae5bd7 100644 --- a/client/src/main/java/org/jboss/fuse/mvnd/client/DaemonClientConnection.java +++ b/client/src/main/java/org/jboss/fuse/mvnd/client/DaemonClientConnection.java @@ -32,7 +32,7 @@ import org.jboss.fuse.mvnd.common.DaemonException.StaleAddressException; import org.jboss.fuse.mvnd.common.DaemonInfo; import org.jboss.fuse.mvnd.common.Message; -import org.jboss.fuse.mvnd.common.Message.Prompt; +import org.jboss.fuse.mvnd.common.Message.BuildStarted; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,9 +130,9 @@ protected void doReceive() { if (m == null) { break; } - if (m.getType() == Message.PROMPT) { - final Prompt prompt = (Prompt) m; - m = prompt.withCallback(response -> dispatch(prompt.response(response))); + if (m.getType() == Message.BUILD_STARTED) { + final BuildStarted bs = (BuildStarted) m; + m = bs.withDaemonDispatch(this::dispatch); } queue.put(m); } diff --git a/client/src/main/java/org/jboss/fuse/mvnd/client/DefaultClient.java b/client/src/main/java/org/jboss/fuse/mvnd/client/DefaultClient.java index 3ab8cd172..87ba0bd45 100644 --- a/client/src/main/java/org/jboss/fuse/mvnd/client/DefaultClient.java +++ b/client/src/main/java/org/jboss/fuse/mvnd/client/DefaultClient.java @@ -24,6 +24,7 @@ import java.util.List; import org.fusesource.jansi.Ansi; import org.jboss.fuse.mvnd.common.BuildProperties; +import org.jboss.fuse.mvnd.common.DaemonException; import org.jboss.fuse.mvnd.common.DaemonInfo; import org.jboss.fuse.mvnd.common.DaemonRegistry; import org.jboss.fuse.mvnd.common.Environment; @@ -37,8 +38,6 @@ public class DefaultClient implements Client { - public static final int CANCEL_TIMEOUT = 10 * 1000; - private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClient.class); private final DaemonParameters parameters; @@ -62,7 +61,11 @@ public static void main(String[] argv) throws Exception { } try (TerminalOutput output = new TerminalOutput(logFile)) { - new DefaultClient(new DaemonParameters()).execute(output, args); + try { + new DefaultClient(new DaemonParameters()).execute(output, args); + } catch (DaemonException.InterruptedException e) { + output.accept(Message.log(System.lineSeparator() + "The build was canceled")); + } } } diff --git a/common/src/main/java/org/jboss/fuse/mvnd/common/Message.java b/common/src/main/java/org/jboss/fuse/mvnd/common/Message.java index 8a89feecb..d3c5c6b4b 100644 --- a/common/src/main/java/org/jboss/fuse/mvnd/common/Message.java +++ b/common/src/main/java/org/jboss/fuse/mvnd/common/Message.java @@ -44,10 +44,12 @@ public abstract class Message { public static final int PROMPT_RESPONSE = 12; public static final int BUILD_STATUS = 13; public static final int KEYBOARD_INPUT = 14; + public static final int CANCEL_BUILD = 15; public static final SimpleMessage KEEP_ALIVE_SINGLETON = new SimpleMessage(KEEP_ALIVE); public static final SimpleMessage STOP_SINGLETON = new SimpleMessage(STOP); public static final SimpleMessage BUILD_STOPPED_SINGLETON = new SimpleMessage(BUILD_STOPPED); + public static final SimpleMessage CANCEL_BUILD_SINGLETON = new SimpleMessage(CANCEL_BUILD); final int type; @@ -87,6 +89,8 @@ public static Message read(DataInputStream input) throws IOException { return PromptResponse.read(input); case BUILD_STATUS: return StringMessage.read(BUILD_STATUS, input); + case CANCEL_BUILD: + return SimpleMessage.CANCEL_BUILD_SINGLETON; } throw new IllegalStateException("Unexpected message type: " + type); } @@ -413,6 +417,7 @@ public static class BuildStarted extends Message { final String projectId; final int projectCount; final int maxThreads; + final Consumer daemonDispatch; public static BuildStarted read(DataInputStream input) throws IOException { final String projectId = readUTF(input); @@ -422,10 +427,15 @@ public static BuildStarted read(DataInputStream input) throws IOException { } public BuildStarted(String projectId, int projectCount, int maxThreads) { + this(projectId, projectCount, maxThreads, null); + } + + public BuildStarted(String projectId, int projectCount, int maxThreads, Consumer daemonDispatch) { super(BUILD_STARTED); this.projectId = projectId; this.projectCount = projectCount; this.maxThreads = maxThreads; + this.daemonDispatch = daemonDispatch; } public String getProjectId() { @@ -454,6 +464,14 @@ public void write(DataOutputStream output) throws IOException { output.writeInt(projectCount); output.writeInt(maxThreads); } + + public BuildStarted withDaemonDispatch(Consumer daemonDispatch) { + return new BuildStarted(projectId, projectCount, maxThreads, daemonDispatch); + } + + public Consumer getDaemonDispatch() { + return daemonDispatch; + } } public static class BuildMessage extends Message { @@ -607,7 +625,6 @@ public static class Prompt extends Message { final String uid; final String message; final boolean password; - final Consumer callback; public static Prompt read(DataInputStream input) throws IOException { String projectId = Message.readUTF(input); @@ -618,16 +635,11 @@ public static Prompt read(DataInputStream input) throws IOException { } public Prompt(String projectId, String uid, String message, boolean password) { - this(projectId, uid, message, password, null); - } - - public Prompt(String projectId, String uid, String message, boolean password, Consumer callback) { super(PROMPT); this.projectId = projectId; this.uid = uid; this.message = message; this.password = password; - this.callback = callback; } public String getProjectId() { @@ -665,18 +677,10 @@ public void write(DataOutputStream output) throws IOException { output.writeBoolean(password); } - public Prompt withCallback(Consumer callback) { - return new Prompt(projectId, uid, message, password, callback); - } - public PromptResponse response(String message) { return new PromptResponse(projectId, uid, message); } - public Consumer getCallback() { - return callback; - } - } public static class PromptResponse extends Message { diff --git a/common/src/main/java/org/jboss/fuse/mvnd/common/logging/TerminalOutput.java b/common/src/main/java/org/jboss/fuse/mvnd/common/logging/TerminalOutput.java index 76dfa2e8e..0702fd26e 100644 --- a/common/src/main/java/org/jboss/fuse/mvnd/common/logging/TerminalOutput.java +++ b/common/src/main/java/org/jboss/fuse/mvnd/common/logging/TerminalOutput.java @@ -60,6 +60,7 @@ public class TerminalOutput implements ClientOutput { public static final int CTRL_M = 'M' & 0x1f; private final Terminal terminal; + private final Terminal.SignalHandler previousIntHandler; private final Display display; private final LinkedHashMap projects = new LinkedHashMap<>(); private final ClientLog log; @@ -69,6 +70,7 @@ public class TerminalOutput implements ClientOutput { private final CountDownLatch closed = new CountDownLatch(1); private final long start; private final ReadWriteLock readInput = new ReentrantReadWriteLock(); + private final DaemonDispatch dispatch = new DaemonDispatch(); private volatile String name; private volatile int totalProjects; @@ -97,6 +99,11 @@ public TerminalOutput(Path logFile) throws IOException { this.start = System.currentTimeMillis(); this.terminal = TerminalBuilder.terminal(); terminal.enterRawMode(); + this.previousIntHandler = terminal.handle(Terminal.Signal.INT, + sig -> { + accept(Message.buildStatus("Cancelling...")); + dispatch.dispatch(Message.CANCEL_BUILD_SINGLETON); + }); this.display = new Display(terminal, false); this.log = logFile == null ? new MessageCollector() : new FileLog(logFile); final Thread r = new Thread(this::readInputLoop); @@ -131,6 +138,7 @@ private boolean doAccept(Message entry) { this.name = bs.getProjectId(); this.totalProjects = bs.getProjectCount(); this.maxThreads = bs.getMaxThreads(); + this.dispatch.setSink(bs.getDaemonDispatch()); break; } case Message.BUILD_EXCEPTION: { @@ -208,8 +216,8 @@ private boolean doAccept(Message entry) { if (c < 0) { break; } else if (c == '\n' || c == '\r') { - prompt.getCallback().accept(sb.toString()); terminal.writer().println(); + dispatch.dispatch(prompt.response(sb.toString())); break; } else if (c == 127) { if (sb.length() > 0) { @@ -249,7 +257,7 @@ private boolean doAccept(Message entry) { } break; } - case Message.KEYBOARD_INPUT: + case Message.KEYBOARD_INPUT: { char keyStroke = ((StringMessage) entry).getPayload().charAt(0); switch (keyStroke) { case '+': @@ -268,6 +276,10 @@ private boolean doAccept(Message entry) { } break; } + default: + throw new IllegalStateException("Unexpected message " + entry); + } + return true; } @@ -324,6 +336,7 @@ public void close() throws Exception { reader.interrupt(); accept(SimpleMessage.BUILD_STOPPED_SINGLETON); reader.join(); + terminal.handle(Terminal.Signal.INT, previousIntHandler); terminal.close(); closed.countDown(); if (exception != null) { @@ -509,6 +522,41 @@ public void close() throws IOException { } + public static class DaemonDispatch { + private final Object lock = new Object(); + private List queue; + private Consumer sink; + + public void dispatch(Message m) { + synchronized (lock) { + if (sink != null) { + if (queue != null) { + for (Message msg : queue) { + sink.accept(msg); + } + } + sink.accept(m); + } else { + if (queue == null) { + queue = new ArrayList(); + } + queue.add(m); + } + } + } + + public void setSink(Consumer sink) { + synchronized (lock) { + this.sink = sink; + if (queue != null) { + for (Message msg : queue) { + sink.accept(msg); + } + } + } + } + } + /** * A {@link ClientLog} that first collects all incoming messages in a {@link List} and outputs them to a JLine * {@link Terminal} upon {@link #close()}. diff --git a/daemon/src/main/java/org/jboss/fuse/mvnd/builder/ProjectExecutorService.java b/daemon/src/main/java/org/jboss/fuse/mvnd/builder/ProjectExecutorService.java index 7e5f6f1d9..a013ad3c8 100644 --- a/daemon/src/main/java/org/jboss/fuse/mvnd/builder/ProjectExecutorService.java +++ b/daemon/src/main/java/org/jboss/fuse/mvnd/builder/ProjectExecutorService.java @@ -88,6 +88,10 @@ public void shutdown() { executor.shutdown(); } + public void cancel() { + executor.shutdownNow(); + } + // hook to allow pausing executor during unit tests protected void beforeExecute(Thread t, Runnable r) { } diff --git a/daemon/src/main/java/org/jboss/fuse/mvnd/builder/SmartBuilder.java b/daemon/src/main/java/org/jboss/fuse/mvnd/builder/SmartBuilder.java index 9f4f789df..3a1b61a6c 100644 --- a/daemon/src/main/java/org/jboss/fuse/mvnd/builder/SmartBuilder.java +++ b/daemon/src/main/java/org/jboss/fuse/mvnd/builder/SmartBuilder.java @@ -70,16 +70,41 @@ public class SmartBuilder implements Builder { private final LifecycleModuleBuilder moduleBuilder; + private volatile SmartBuilderImpl builder; + private volatile boolean canceled; + + private static SmartBuilder INSTANCE; + + public static SmartBuilder cancel() { + SmartBuilder builder = INSTANCE; + if (builder != null) { + builder.doCancel(); + } + return builder; + } + @Inject public SmartBuilder(LifecycleModuleBuilder moduleBuilder) { this.moduleBuilder = moduleBuilder; + INSTANCE = this; + } + + void doCancel() { + canceled = true; + SmartBuilderImpl b = builder; + if (b != null) { + b.cancel(); + } + } + + public void doneCancel() { + canceled = false; } @Override - public void build(final MavenSession session, final ReactorContext reactorContext, + public synchronized void build(final MavenSession session, final ReactorContext reactorContext, ProjectBuildList projectBuilds, final List taskSegments, ReactorBuildStatus reactorBuildStatus) throws ExecutionException, InterruptedException { - List list = new ArrayList<>(); String providerScript = null; @@ -165,9 +190,16 @@ public void build(final MavenSession session, final ReactorContext reactorContex List> allstats = new ArrayList<>(); for (TaskSegment taskSegment : taskSegments) { Set projects = projectBuilds.getByTaskSegment(taskSegment).getProjects(); - ReactorBuildStats stats = new SmartBuilderImpl(moduleBuilder, session, reactorContext, taskSegment, projects, graph) - .build(); - allstats.add(new AbstractMap.SimpleEntry<>(taskSegment, stats)); + if (canceled) { + return; + } + builder = new SmartBuilderImpl(moduleBuilder, session, reactorContext, taskSegment, projects, graph); + try { + ReactorBuildStats stats = builder.build(); + allstats.add(new AbstractMap.SimpleEntry<>(taskSegment, stats)); + } finally { + builder = null; + } } if (session.getResult().hasExceptions()) { diff --git a/daemon/src/main/java/org/jboss/fuse/mvnd/builder/SmartBuilderImpl.java b/daemon/src/main/java/org/jboss/fuse/mvnd/builder/SmartBuilderImpl.java index 008b79128..2faf93e42 100644 --- a/daemon/src/main/java/org/jboss/fuse/mvnd/builder/SmartBuilderImpl.java +++ b/daemon/src/main/java/org/jboss/fuse/mvnd/builder/SmartBuilderImpl.java @@ -157,6 +157,10 @@ private void shutdown() { executor.shutdown(); } + public void cancel() { + executor.cancel(); + } + private void submitAll(Set readyProjects) { List tasks = new ArrayList<>(); for (MavenProject project : readyProjects) { diff --git a/daemon/src/main/java/org/jboss/fuse/mvnd/daemon/Server.java b/daemon/src/main/java/org/jboss/fuse/mvnd/daemon/Server.java index 75ad674d0..264e3eb60 100644 --- a/daemon/src/main/java/org/jboss/fuse/mvnd/daemon/Server.java +++ b/daemon/src/main/java/org/jboss/fuse/mvnd/daemon/Server.java @@ -41,6 +41,7 @@ import java.util.stream.Collectors; import org.apache.maven.cli.DaemonMavenCli; import org.apache.maven.execution.MavenSession; +import org.jboss.fuse.mvnd.builder.SmartBuilder; import org.jboss.fuse.mvnd.common.DaemonConnection; import org.jboss.fuse.mvnd.common.DaemonException; import org.jboss.fuse.mvnd.common.DaemonExpirationStatus; @@ -61,7 +62,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.jboss.fuse.mvnd.common.DaemonState.Broken; import static org.jboss.fuse.mvnd.common.DaemonState.Busy; +import static org.jboss.fuse.mvnd.common.DaemonState.Canceled; import static org.jboss.fuse.mvnd.common.DaemonState.StopRequested; import static org.jboss.fuse.mvnd.common.DaemonState.Stopped; @@ -267,7 +270,6 @@ boolean awaitStop() { condition.await(); break; case Canceled: - LOGGER.debug("cancel requested."); cancelNow(); break; case Broken: @@ -355,17 +357,12 @@ private void stopNow(String reason) { private void cancelNow() { long time = System.currentTimeMillis() + CANCEL_TIMEOUT; - // LOGGER.debug("Cancel requested: will wait for daemon to become idle."); - // try { - // cancellationToken.cancel(); - // } catch (Exception ex) { - // LOGGER.error("Cancel processing failed. Will continue.", ex); - // } - + LOGGER.debug("Cancel requested: will wait for daemon to become idle."); + final SmartBuilder builder = SmartBuilder.cancel(); stateLock.lock(); try { long rem; - while ((rem = System.currentTimeMillis() - time) > 0) { + while ((rem = time - System.currentTimeMillis()) > 0) { try { switch (getState()) { case Idle: @@ -391,6 +388,9 @@ private void cancelNow() { stopNow("cancel requested but timed out"); } finally { stateLock.unlock(); + if (builder != null) { + builder.doneCancel(); + } } } @@ -447,12 +447,20 @@ private void handle(DaemonConnection connection, BuildRequest buildRequest) { break; } LOGGER.info("Received message: {}", message); - synchronized (recvQueue) { - recvQueue.put(message); - recvQueue.notifyAll(); + if (message == Message.CANCEL_BUILD_SINGLETON) { + updateState(DaemonState.Canceled); + return; + } else { + synchronized (recvQueue) { + recvQueue.put(message); + recvQueue.notifyAll(); + } } } + } catch (DaemonException.RecoverableMessageIOException t) { + updateState(Canceled); } catch (Throwable t) { + updateState(Broken); LOGGER.error("Error receiving events", t); } }); diff --git a/integration-tests/src/test/java/org/jboss/fuse/mvnd/assertj/TestClientOutput.java b/integration-tests/src/test/java/org/jboss/fuse/mvnd/assertj/TestClientOutput.java index 43f1eb6d7..44d33cbb5 100644 --- a/integration-tests/src/test/java/org/jboss/fuse/mvnd/assertj/TestClientOutput.java +++ b/integration-tests/src/test/java/org/jboss/fuse/mvnd/assertj/TestClientOutput.java @@ -20,10 +20,13 @@ import java.util.stream.Collectors; import org.assertj.core.api.Assertions; import org.jboss.fuse.mvnd.common.Message; +import org.jboss.fuse.mvnd.common.Message.BuildStarted; import org.jboss.fuse.mvnd.common.logging.ClientOutput; +import org.jboss.fuse.mvnd.common.logging.TerminalOutput.DaemonDispatch; public class TestClientOutput implements ClientOutput { private final List messages = new ArrayList<>(); + protected final DaemonDispatch dispatch = new DaemonDispatch(); @Override public void close() throws Exception { @@ -31,6 +34,10 @@ public void close() throws Exception { @Override public void accept(Message message) { + if (message.getType() == Message.BUILD_STARTED) { + BuildStarted bs = (BuildStarted) message; + this.dispatch.setSink(bs.getDaemonDispatch()); + } messages.add(message); } diff --git a/integration-tests/src/test/java/org/jboss/fuse/mvnd/it/InteractiveTest.java b/integration-tests/src/test/java/org/jboss/fuse/mvnd/it/InteractiveTest.java index 2182f6574..9163749f0 100644 --- a/integration-tests/src/test/java/org/jboss/fuse/mvnd/it/InteractiveTest.java +++ b/integration-tests/src/test/java/org/jboss/fuse/mvnd/it/InteractiveTest.java @@ -44,7 +44,7 @@ void versionsSet() throws IOException, InterruptedException { @Override public void accept(Message m) { if (m instanceof Prompt) { - ((Prompt) m).getCallback().accept("0.1.0-SNAPSHOT"); + dispatch.dispatch(((Prompt) m).response("0.1.0-SNAPSHOT")); } super.accept(m); }