Skip to content

Commit

Permalink
fix: wait for logs in task runners
Browse files Browse the repository at this point in the history
Logs can be available in the log stream after the job ends as everything is anynchronous so we don't have any other option than to wait a little for them to arrives. This wait time is configurable and set to a conservative value of 5s.

Note that calling cancel() on the stream will make the stream iterator to return false to hasNext() effectively ending the stream, no need for a boolean for that.

Fixes kestra-io/kestra#3695
  • Loading branch information
loicmathieu committed May 17, 2024
1 parent 37dcc5d commit 58c6910
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
10 changes: 8 additions & 2 deletions src/main/java/io/kestra/plugin/gcp/runner/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ public class Batch extends TaskRunner implements GcpInterface, RemoteRunnerInter
@PluginProperty
private final Duration completionCheckInterval = Duration.ofSeconds(1);

@Schema(
title = "Additional time after the job ends to wait for late logs."
)
@Builder.Default
@PluginProperty
private final Duration waitForLogInterval = Duration.ofSeconds(5);

@Override
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToUpload, List<String> filesToDownload) throws Exception {
String renderedBucket = runContext.render(this.bucket);
Expand All @@ -211,7 +218,6 @@ public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<S
Logging logging = LoggingOptions.getDefaultInstance().toBuilder().setCredentials(credentials).build().getService()) {
Duration waitDuration = Optional.ofNullable(taskCommands.getTimeout()).orElse(this.waitUntilCompletion);
Map<String, String> labels = LabelUtils.labels(runContext);
System.out.println(labels);

Job result = null;

Expand Down Expand Up @@ -342,7 +348,7 @@ public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<S
result.getUid()
);
LogEntryServerStream stream = logging.tailLogEntries(Logging.TailOption.filter(logFilter));
try (LogTail ignored = new LogTail(stream, taskCommands.getLogConsumer())) {
try (LogTail ignored = new LogTail(stream, taskCommands.getLogConsumer(), this.waitForLogInterval)) {
// Wait for the job termination
result = waitForTerminated(batchServiceClient, result, waitDuration);
if (result == null) {
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/io/kestra/plugin/gcp/runner/CloudRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ public class CloudRun extends TaskRunner implements GcpInterface, RemoteRunnerIn
@PluginProperty
private final Duration completionCheckInterval = Duration.ofSeconds(1);

@Schema(
title = "Additional time after the job ends to wait for late logs."
)
@Builder.Default
@PluginProperty
private final Duration waitForLogInterval = Duration.ofSeconds(5);

@Override
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToUpload, List<String> filesToDownload) throws Exception {

Expand Down Expand Up @@ -320,7 +327,7 @@ public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<S
);

LogEntryServerStream stream = logging.tailLogEntries(Logging.TailOption.filter(logFilter));
try (LogTail ignored = new LogTail(stream, taskCommands.getLogConsumer())) {
try (LogTail ignored = new LogTail(stream, taskCommands.getLogConsumer(), this.waitForLogInterval)) {
if (!isTerminated(execution)) {
runContext.logger().info("Waiting for execution completion: {}.", executionName);
execution = awaitJobExecutionTermination(executionsClient, executionName, timeout);
Expand Down
22 changes: 14 additions & 8 deletions src/main/java/io/kestra/plugin/gcp/runner/LogTail.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,24 @@
import com.google.cloud.logging.Severity;
import io.kestra.core.models.tasks.runners.AbstractLogConsumer;

import java.time.Duration;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class LogTail implements AutoCloseable {
private final LogEntryServerStream stream;
private final ExecutorService executorService;
private final Duration waitForLogInterval;

private volatile boolean stopped = false;

LogTail(LogEntryServerStream stream, AbstractLogConsumer logConsumer) {
LogTail(LogEntryServerStream stream, AbstractLogConsumer logConsumer, Duration waitForLogInterval) {
this.stream = stream;
this.waitForLogInterval = waitForLogInterval;
this.executorService = Executors.newSingleThreadExecutor();

this.executorService.submit(
() -> {
Iterator<LogEntry> it = this.stream.iterator();
while (it.hasNext() && !stopped) {
LogEntry entry = it.next();
for (LogEntry entry : this.stream) {
logConsumer.accept(entry.<Payload.StringPayload>getPayload().getData(), isError(entry.getSeverity()));
}
}
Expand All @@ -38,8 +37,15 @@ private boolean isError(Severity severity) {

@Override
public void close() {
this.stopped = true;
this.stream.cancel();
this.executorService.shutdown();

// sleep 1s before cancelling the stream to wait for late logs
try {
Thread.sleep(waitForLogInterval.toMillis());
} catch (InterruptedException e) {
// if we are interrupted, do nothing.
}

this.stream.cancel();
}
}

0 comments on commit 58c6910

Please sign in to comment.