Skip to content

Commit

Permalink
Improve logging. (#1680)
Browse files Browse the repository at this point in the history
* Improve logging.

WM-1345 Improve logging.
Reduce logging on task retry for polling.

Co-authored-by: Shota Suzuki <aqr.aqua@gmail.com>

* Fix build failures

* Remove a test which is not related.

Co-authored-by: Shota Suzuki <aqr.aqua@gmail.com>
  • Loading branch information
yoyama and szyn committed Jan 13, 2022
1 parent eafffaf commit 5f08ded
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -254,7 +255,12 @@ protected void runWithWorkspace(Path projectPath, TaskRequest request)
String type;
if (config.has("_type")) {
type = config.get("_type", String.class);
logger.info("type: {}", type);
if (checkTaskLogPrintable(request)) { // Reduce task log output for polling
logger.info("type: {}", type);
}
else {
logger.debug("(retry: {}) type: {}", request.getRetryCount(), type);
}
shouldBeUsedKeys.remove("_type");
}
else {
Expand All @@ -273,7 +279,12 @@ protected void runWithWorkspace(Path projectPath, TaskRequest request)
Object command = config.getOptional(operatorKey.get(), Object.class).orNull();
config.set("_type", type);
config.set("_command", command);
logger.info("{}>: {}", type, Optional.fromNullable(command).or(""));
if (checkTaskLogPrintable(request)) { // Reduce task log output for polling
logger.info("{}>: {}", type, Optional.fromNullable(command).or(""));
}
else {
logger.debug("(retry: {}) {}>: {}", request.getRetryCount(), type, Optional.fromNullable(command).or(""));
}
shouldBeUsedKeys.remove(operatorKey.get());
}

Expand All @@ -288,7 +299,6 @@ protected void runWithWorkspace(Path projectPath, TaskRequest request)

// Track accessed keys using UsedKeysSet class
CheckedConfig.UsedKeysSet usedKeys = new CheckedConfig.UsedKeysSet();

TaskRequest mergedRequest = TaskRequest.builder()
.from(request)
.localConfig(new CheckedConfig(localConfig, usedKeys))
Expand All @@ -307,6 +317,30 @@ protected void runWithWorkspace(Path projectPath, TaskRequest request)
callback.taskSucceeded(request, agentId, result);
}

/***
* Check task log should be printed or not.
* Reduce iterating output in polling
* Conditions:
* true if the first polling
* true if task duration larger than 30 min and every 10 polling.
* @param request
* @return
*/
@VisibleForTesting
static boolean checkTaskLogPrintable(TaskRequest request)
{
boolean print = false;
if (request.getRetryCount() == 0) {
print = true;
}
else if (request.getStartedAt().isPresent() &&
Instant.now().isAfter(request.getStartedAt().get().plusSeconds(30*60)) &&
request.getRetryCount() % 10 == 0){
print = true;
}
return print;
}

private void warnUnusedKeys(TaskRequest request, Set<String> shouldBeUsedButNotUsedKeys, Collection<String> candidateKeys)
{
for (String key : shouldBeUsedButNotUsedKeys) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,8 @@ private Optional<TaskRequest> getTaskRequest(long taskId, String lockId)
.config(params)
.lastStateParams(task.getStateParams())
.workflowDefinitionId(attempt.getWorkflowDefinitionId())
.retryCount(task.getRetryCount())
.startedAt(task.getStartedAt())
.build();

return Optional.of(request);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.digdag.core.agent;

import com.google.common.base.Optional;
import com.google.common.io.Resources;
import io.digdag.client.DigdagClient;
import io.digdag.client.config.Config;
Expand All @@ -8,6 +9,7 @@
import io.digdag.client.config.ConfigUtils;
import io.digdag.core.Limits;
import io.digdag.core.workflow.OperatorTestingUtils;
import io.digdag.spi.ImmutableTaskRequest;
import io.digdag.spi.Operator;
import io.digdag.spi.OperatorFactory;
import io.digdag.spi.SecretStoreManager;
Expand All @@ -21,12 +23,24 @@
import org.mockito.runners.MockitoJUnitRunner;

import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -176,4 +190,24 @@ public void testRunWithHeartbeatWithFailedTaskWithUnexpectedErrorButTheTaskShoul
verify(callback, times(1)).taskFailed(eq(taskRequest), any(), any());
verify(callback, times(0)).retryTask(any(), any(), anyInt(), any(), any());
}

@Test
public void testCheckTaskLogPrintable()
{
// First polling must be true
ImmutableTaskRequest request = OperatorTestingUtils.newTaskRequest(simpleConfig)
.withRetryCount(0);
assertTrue(OperatorManager.checkTaskLogPrintable(request));
request = request.withRetryCount(1);
assertFalse(OperatorManager.checkTaskLogPrintable(request));

// After 30 min from start, every 10 polling must be true
request = request.withStartedAt(Optional.of(Instant.now().minusSeconds(60*30+1)));
assertFalse(OperatorManager.checkTaskLogPrintable(request.withRetryCount(5)));
assertFalse(OperatorManager.checkTaskLogPrintable(request.withRetryCount(6)));
assertFalse(OperatorManager.checkTaskLogPrintable(request.withRetryCount(7)));
assertFalse(OperatorManager.checkTaskLogPrintable(request.withRetryCount(8)));
assertFalse(OperatorManager.checkTaskLogPrintable(request.withRetryCount(9)));
assertTrue(OperatorManager.checkTaskLogPrintable(request.withRetryCount(10)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public static ImmutableTaskRequest newTaskRequest(Config config)
.config(newConfig())
.localConfig(config)
.lastStateParams(newConfig())
.retryCount(0)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public Object invoke(MethodInvocation invocation)
return invokeMain(invocation);
}
catch (Exception e) {
logger.debug("invocationMain Failed. {}", e.toString());
logger.trace("invocationMain Failed. {}", e.toString());
throw e;
}
}
Expand Down
12 changes: 12 additions & 0 deletions digdag-spi/src/main/java/io/digdag/spi/TaskRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ public interface TaskRequest

Config getLastStateParams();

@Value.Default
default int getRetryCount()
{
return 0;
}

@Value.Default
default Optional<Instant> getStartedAt()
{
return Optional.absent();
}

static ImmutableTaskRequest.Builder builder()
{
return ImmutableTaskRequest.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public <T> T await(Operation<Optional<T>> f)
pollState.params().set(ITERATION, iteration + 1);
int interval = calculateNextInterval(now, startTime.get(), iteration);
String formattedErrorMessage = String.format(waitMessage, waitMessageParameters);
logger.info("{}: checking again in {}", formattedErrorMessage, Durations.formatDuration(Duration.ofSeconds(interval)));
logger.debug("{}: checking again in {}", formattedErrorMessage, Durations.formatDuration(Duration.ofSeconds(interval)));
throw pollState.pollingTaskExecutionException(interval);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public TaskResult runTask()
throw state.pollingTaskExecutionException(queryPollInterval);
}

logger.info("Finished. Last job id={}", job.getJobId());

// The query condition was fulfilled, we're done.
return TaskResult.empty(request);
}
Expand Down Expand Up @@ -155,7 +157,13 @@ String startJob(TDOperator op, String domainKey)
.createTDJobRequest();

String jobId = op.submitNewJobWithRetry(req);
logger.info("Started {} job id={}:\n{}", engine, jobId, query);
if (request.getLastStateParams().getOptional("__last_job_id", String.class).isPresent()) {
logger.debug("Started {} job id={}:\n{}", engine, jobId, query);
}
else {
logger.info("Started {} job id={}:\n{}", engine, jobId, query);
}
state.params().set("__last_job_id", jobId);

return jobId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public TaskResult runTask()
if (!done) {
throw state.pollingTaskExecutionException(tablePollInterval);
}

logger.info("Finished. Last job id={}", job.getJobId());
// The row count condition was fulfilled. We're done.
return TaskResult.empty(request);
}
Expand Down Expand Up @@ -220,7 +220,13 @@ String startJob(TDOperator op, String domainKey)
.createTDJobRequest();

String jobId = op.submitNewJobWithRetry(req);
logger.info("Started {} job id={}:\n{}", engine, jobId, query);
if (request.getLastStateParams().getOptional("__last_job_id", String.class).isPresent()) {
logger.debug("Started {} job id={}:\n{}", engine, jobId, query);
}
else {
logger.info("Started {} job id={}:\n{}", engine, jobId, query);
}
state.params().set("__last_job_id", jobId);
return jobId;
}

Expand Down

0 comments on commit 5f08ded

Please sign in to comment.