Skip to content

Commit

Permalink
Implement build cancelation, fixes apache#127
Browse files Browse the repository at this point in the history
  • Loading branch information
gnodet authored and ppalaga committed Nov 10, 2020
1 parent 02ef4a2 commit 5c81bce
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 41 deletions.
Expand Up @@ -32,7 +32,7 @@
import org.jboss.fuse.mvnd.common.DaemonException.StaleAddressException;
import org.jboss.fuse.mvnd.common.DaemonInfo;
import org.jboss.fuse.mvnd.common.Message;
import org.jboss.fuse.mvnd.common.Message.Prompt;
import org.jboss.fuse.mvnd.common.Message.BuildStarted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -130,9 +130,9 @@ protected void doReceive() {
if (m == null) {
break;
}
if (m.getType() == Message.PROMPT) {
final Prompt prompt = (Prompt) m;
m = prompt.withCallback(response -> dispatch(prompt.response(response)));
if (m.getType() == Message.BUILD_STARTED) {
final BuildStarted bs = (BuildStarted) m;
m = bs.withDaemonDispatch(this::dispatch);
}
queue.put(m);
}
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import org.fusesource.jansi.Ansi;
import org.jboss.fuse.mvnd.common.BuildProperties;
import org.jboss.fuse.mvnd.common.DaemonException;
import org.jboss.fuse.mvnd.common.DaemonInfo;
import org.jboss.fuse.mvnd.common.DaemonRegistry;
import org.jboss.fuse.mvnd.common.Environment;
Expand All @@ -37,8 +38,6 @@

public class DefaultClient implements Client {

public static final int CANCEL_TIMEOUT = 10 * 1000;

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClient.class);

private final DaemonParameters parameters;
Expand All @@ -62,7 +61,11 @@ public static void main(String[] argv) throws Exception {
}

try (TerminalOutput output = new TerminalOutput(logFile)) {
new DefaultClient(new DaemonParameters()).execute(output, args);
try {
new DefaultClient(new DaemonParameters()).execute(output, args);
} catch (DaemonException.InterruptedException e) {
output.accept(Message.log(System.lineSeparator() + "The build was canceled"));
}
}
}

Expand Down
32 changes: 18 additions & 14 deletions common/src/main/java/org/jboss/fuse/mvnd/common/Message.java
Expand Up @@ -44,10 +44,12 @@ public abstract class Message {
public static final int PROMPT_RESPONSE = 12;
public static final int BUILD_STATUS = 13;
public static final int KEYBOARD_INPUT = 14;
public static final int CANCEL_BUILD = 15;

public static final SimpleMessage KEEP_ALIVE_SINGLETON = new SimpleMessage(KEEP_ALIVE);
public static final SimpleMessage STOP_SINGLETON = new SimpleMessage(STOP);
public static final SimpleMessage BUILD_STOPPED_SINGLETON = new SimpleMessage(BUILD_STOPPED);
public static final SimpleMessage CANCEL_BUILD_SINGLETON = new SimpleMessage(CANCEL_BUILD);

final int type;

Expand Down Expand Up @@ -87,6 +89,8 @@ public static Message read(DataInputStream input) throws IOException {
return PromptResponse.read(input);
case BUILD_STATUS:
return StringMessage.read(BUILD_STATUS, input);
case CANCEL_BUILD:
return SimpleMessage.CANCEL_BUILD_SINGLETON;
}
throw new IllegalStateException("Unexpected message type: " + type);
}
Expand Down Expand Up @@ -413,6 +417,7 @@ public static class BuildStarted extends Message {
final String projectId;
final int projectCount;
final int maxThreads;
final Consumer<Message> daemonDispatch;

public static BuildStarted read(DataInputStream input) throws IOException {
final String projectId = readUTF(input);
Expand All @@ -422,10 +427,15 @@ public static BuildStarted read(DataInputStream input) throws IOException {
}

public BuildStarted(String projectId, int projectCount, int maxThreads) {
this(projectId, projectCount, maxThreads, null);
}

public BuildStarted(String projectId, int projectCount, int maxThreads, Consumer<Message> daemonDispatch) {
super(BUILD_STARTED);
this.projectId = projectId;
this.projectCount = projectCount;
this.maxThreads = maxThreads;
this.daemonDispatch = daemonDispatch;
}

public String getProjectId() {
Expand Down Expand Up @@ -454,6 +464,14 @@ public void write(DataOutputStream output) throws IOException {
output.writeInt(projectCount);
output.writeInt(maxThreads);
}

public BuildStarted withDaemonDispatch(Consumer<Message> daemonDispatch) {
return new BuildStarted(projectId, projectCount, maxThreads, daemonDispatch);
}

public Consumer<Message> getDaemonDispatch() {
return daemonDispatch;
}
}

public static class BuildMessage extends Message {
Expand Down Expand Up @@ -607,7 +625,6 @@ public static class Prompt extends Message {
final String uid;
final String message;
final boolean password;
final Consumer<String> callback;

public static Prompt read(DataInputStream input) throws IOException {
String projectId = Message.readUTF(input);
Expand All @@ -618,16 +635,11 @@ public static Prompt read(DataInputStream input) throws IOException {
}

public Prompt(String projectId, String uid, String message, boolean password) {
this(projectId, uid, message, password, null);
}

public Prompt(String projectId, String uid, String message, boolean password, Consumer<String> callback) {
super(PROMPT);
this.projectId = projectId;
this.uid = uid;
this.message = message;
this.password = password;
this.callback = callback;
}

public String getProjectId() {
Expand Down Expand Up @@ -665,18 +677,10 @@ public void write(DataOutputStream output) throws IOException {
output.writeBoolean(password);
}

public Prompt withCallback(Consumer<String> callback) {
return new Prompt(projectId, uid, message, password, callback);
}

public PromptResponse response(String message) {
return new PromptResponse(projectId, uid, message);
}

public Consumer<String> getCallback() {
return callback;
}

}

public static class PromptResponse extends Message {
Expand Down
Expand Up @@ -60,6 +60,7 @@ public class TerminalOutput implements ClientOutput {
public static final int CTRL_M = 'M' & 0x1f;

private final Terminal terminal;
private final Terminal.SignalHandler previousIntHandler;
private final Display display;
private final LinkedHashMap<String, Project> projects = new LinkedHashMap<>();
private final ClientLog log;
Expand All @@ -69,6 +70,7 @@ public class TerminalOutput implements ClientOutput {
private final CountDownLatch closed = new CountDownLatch(1);
private final long start;
private final ReadWriteLock readInput = new ReentrantReadWriteLock();
private final DaemonDispatch dispatch = new DaemonDispatch();

private volatile String name;
private volatile int totalProjects;
Expand Down Expand Up @@ -97,6 +99,11 @@ public TerminalOutput(Path logFile) throws IOException {
this.start = System.currentTimeMillis();
this.terminal = TerminalBuilder.terminal();
terminal.enterRawMode();
this.previousIntHandler = terminal.handle(Terminal.Signal.INT,
sig -> {
accept(Message.buildStatus("Cancelling..."));
dispatch.dispatch(Message.CANCEL_BUILD_SINGLETON);
});
this.display = new Display(terminal, false);
this.log = logFile == null ? new MessageCollector() : new FileLog(logFile);
final Thread r = new Thread(this::readInputLoop);
Expand Down Expand Up @@ -131,6 +138,7 @@ private boolean doAccept(Message entry) {
this.name = bs.getProjectId();
this.totalProjects = bs.getProjectCount();
this.maxThreads = bs.getMaxThreads();
this.dispatch.setSink(bs.getDaemonDispatch());
break;
}
case Message.BUILD_EXCEPTION: {
Expand Down Expand Up @@ -208,8 +216,8 @@ private boolean doAccept(Message entry) {
if (c < 0) {
break;
} else if (c == '\n' || c == '\r') {
prompt.getCallback().accept(sb.toString());
terminal.writer().println();
dispatch.dispatch(prompt.response(sb.toString()));
break;
} else if (c == 127) {
if (sb.length() > 0) {
Expand Down Expand Up @@ -249,7 +257,7 @@ private boolean doAccept(Message entry) {
}
break;
}
case Message.KEYBOARD_INPUT:
case Message.KEYBOARD_INPUT: {
char keyStroke = ((StringMessage) entry).getPayload().charAt(0);
switch (keyStroke) {
case '+':
Expand All @@ -268,6 +276,10 @@ private boolean doAccept(Message entry) {
}
break;
}
default:
throw new IllegalStateException("Unexpected message " + entry);
}

return true;
}

Expand Down Expand Up @@ -324,6 +336,7 @@ public void close() throws Exception {
reader.interrupt();
accept(SimpleMessage.BUILD_STOPPED_SINGLETON);
reader.join();
terminal.handle(Terminal.Signal.INT, previousIntHandler);
terminal.close();
closed.countDown();
if (exception != null) {
Expand Down Expand Up @@ -509,6 +522,41 @@ public void close() throws IOException {

}

public static class DaemonDispatch {
private final Object lock = new Object();
private List<Message> queue;
private Consumer<Message> sink;

public void dispatch(Message m) {
synchronized (lock) {
if (sink != null) {
if (queue != null) {
for (Message msg : queue) {
sink.accept(msg);
}
}
sink.accept(m);
} else {
if (queue == null) {
queue = new ArrayList<Message>();
}
queue.add(m);
}
}
}

public void setSink(Consumer<Message> sink) {
synchronized (lock) {
this.sink = sink;
if (queue != null) {
for (Message msg : queue) {
sink.accept(msg);
}
}
}
}
}

/**
* A {@link ClientLog} that first collects all incoming messages in a {@link List} and outputs them to a JLine
* {@link Terminal} upon {@link #close()}.
Expand Down
Expand Up @@ -88,6 +88,10 @@ public void shutdown() {
executor.shutdown();
}

public void cancel() {
executor.shutdownNow();
}

// hook to allow pausing executor during unit tests
protected void beforeExecute(Thread t, Runnable r) {
}
Expand Down
42 changes: 37 additions & 5 deletions daemon/src/main/java/org/jboss/fuse/mvnd/builder/SmartBuilder.java
Expand Up @@ -70,16 +70,41 @@ public class SmartBuilder implements Builder {

private final LifecycleModuleBuilder moduleBuilder;

private volatile SmartBuilderImpl builder;
private volatile boolean canceled;

private static SmartBuilder INSTANCE;

public static SmartBuilder cancel() {
SmartBuilder builder = INSTANCE;
if (builder != null) {
builder.doCancel();
}
return builder;
}

@Inject
public SmartBuilder(LifecycleModuleBuilder moduleBuilder) {
this.moduleBuilder = moduleBuilder;
INSTANCE = this;
}

void doCancel() {
canceled = true;
SmartBuilderImpl b = builder;
if (b != null) {
b.cancel();
}
}

public void doneCancel() {
canceled = false;
}

@Override
public void build(final MavenSession session, final ReactorContext reactorContext,
public synchronized void build(final MavenSession session, final ReactorContext reactorContext,
ProjectBuildList projectBuilds, final List<TaskSegment> taskSegments,
ReactorBuildStatus reactorBuildStatus) throws ExecutionException, InterruptedException {

List<String> list = new ArrayList<>();

String providerScript = null;
Expand Down Expand Up @@ -165,9 +190,16 @@ public void build(final MavenSession session, final ReactorContext reactorContex
List<Map.Entry<TaskSegment, ReactorBuildStats>> allstats = new ArrayList<>();
for (TaskSegment taskSegment : taskSegments) {
Set<MavenProject> projects = projectBuilds.getByTaskSegment(taskSegment).getProjects();
ReactorBuildStats stats = new SmartBuilderImpl(moduleBuilder, session, reactorContext, taskSegment, projects, graph)
.build();
allstats.add(new AbstractMap.SimpleEntry<>(taskSegment, stats));
if (canceled) {
return;
}
builder = new SmartBuilderImpl(moduleBuilder, session, reactorContext, taskSegment, projects, graph);
try {
ReactorBuildStats stats = builder.build();
allstats.add(new AbstractMap.SimpleEntry<>(taskSegment, stats));
} finally {
builder = null;
}
}

if (session.getResult().hasExceptions()) {
Expand Down
Expand Up @@ -157,6 +157,10 @@ private void shutdown() {
executor.shutdown();
}

public void cancel() {
executor.cancel();
}

private void submitAll(Set<MavenProject> readyProjects) {
List<ProjectBuildTask> tasks = new ArrayList<>();
for (MavenProject project : readyProjects) {
Expand Down

0 comments on commit 5c81bce

Please sign in to comment.