Skip to content

Commit

Permalink
Wait operator (#1684)
Browse files Browse the repository at this point in the history
* Add 'wait' operator

* Add test for `wait` operator

* Add description about `wait` operator

* Refactoring

* Add `blocking` and `poll_interval` options

to `wait` operator

* Small refactoring

* Replace nullable value with Optional

* Make the key name clearer

* Fix typo

* Add missing test case

* Refactoring

* Use a safer expected value

* Merge pull request #1520 from treasure-data/wait-oprator

Add `wait` operator by porting https://github.com/yoyama/digdag-wait-op

Co-authored-by: Mitsunori Komatsu <komamitsu@gmail.com>
  • Loading branch information
yoyama and komamitsu committed Jan 13, 2022
1 parent 5f08ded commit dd4b18f
Show file tree
Hide file tree
Showing 11 changed files with 328 additions and 0 deletions.
35 changes: 35 additions & 0 deletions digdag-docs/src/operators/wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# wait>: Wait for a specific duration

**wait>** operator waits a specific duration in the workflow.

This operator seems similar to `sh>: sleep 5`, but this works in both blocking and non-blocking modes and should be always available even in security-restricted environment.

+wait_10s:
wait>: 10s

## Options

* **wait>**: DURATION

Duration to wait.

* **blocking**: BOOLEAN

Digdag agent internally executes this operator in blocking mode and the agent keeps waiting if this option is set to true (default: false)

Examples:

```
blocking: true
```

* **poll_interval**: DURATION

This option is used only with non-blocking mode. If it's set, digdag agent internally gets awake and checks at a specific interval if the duration has passed. If not set, digdag agent gets awake only when a specific duration passes.

Examples:

```
poll_interval: 5s
```

1 change: 1 addition & 0 deletions digdag-docs/src/operators/workflow_control.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ Workflow control operators
if.md
fail.md
echo.md
wait.md

Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void configure(Binder binder)
addStandardOperatorFactory(binder, EchoOperatorFactory.class);
addStandardOperatorFactory(binder, IfOperatorFactory.class);
addStandardOperatorFactory(binder, FailOperatorFactory.class);
addStandardOperatorFactory(binder, WaitOperatorFactory.class);
addStandardOperatorFactory(binder, NotifyOperatorFactory.class);
addStandardOperatorFactory(binder, PgOperatorFactory.class);
addStandardOperatorFactory(binder, RedshiftOperatorFactory.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package io.digdag.standards.operator;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigElement;
import io.digdag.client.config.ConfigException;
import io.digdag.spi.Operator;
import io.digdag.spi.OperatorContext;
import io.digdag.spi.OperatorFactory;
import io.digdag.spi.TaskRequest;
import io.digdag.spi.TaskResult;
import io.digdag.spi.TaskExecutionException;

import io.digdag.util.Durations;
import io.digdag.util.Workspace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeUnit;


public class WaitOperatorFactory
implements OperatorFactory {

@Inject
public WaitOperatorFactory()
{
}

@Override
public String getType()
{
return "wait";
}

@Override
public Operator newOperator(OperatorContext context)
{
return new WaitOperator(context);
}

private static class WaitOperator
implements Operator
{
private static final String WAIT_START_TIME_PARAM = "wait_start_time_millis";

private static final Logger logger = LoggerFactory.getLogger(WaitOperator.class);

private final OperatorContext context;
private final TaskRequest request;
private final Workspace workspace;


private WaitOperator(OperatorContext context)
{
this.context = context;
this.request = context.getTaskRequest();
this.workspace = Workspace.ofTaskRequest(context.getProjectPath(), request);
}

private Duration duration(Config config)
{
Duration duration;
try {
duration = Durations.parseDuration(config.get("_command", String.class));
}
catch (RuntimeException re) {
throw new ConfigException("Invalid configuration", re);
}
logger.debug("wait duration: {}", duration);
return duration;
}

private boolean blocking(Config config)
{
boolean blocking = config.get("blocking", boolean.class, false);
logger.debug("wait blocking mode: {}", blocking);
return blocking;
}

private Optional<Duration> pollInterval(Config config)
{
try {
Optional<Duration> pollInterval = config
.getOptional("poll_interval", String.class)
.transform(Durations::parseDuration);
if (pollInterval.isPresent()) {
logger.debug("wait poll_interval: {}", pollInterval.get());
}
return pollInterval;
}
catch (RuntimeException re) {
throw new ConfigException("Invalid configuration", re);
}
}

public TaskResult run()
{
Config config = request.getConfig();

Duration duration = duration(config);
boolean blocking = blocking(config);
Optional<Duration> pollInterval = pollInterval(config);
if (blocking && pollInterval.isPresent()) {
throw new ConfigException("poll_interval can't be specified with blocking:true");
}

Instant now = Instant.now();
Instant start = request.getLastStateParams()
.getOptional(WAIT_START_TIME_PARAM, Long.class)
.transform(Instant::ofEpochMilli)
.or(now);

if (now.isAfter(start.plusMillis(duration.toMillis()))) {
logger.info("wait finished. start:{}", start);
return TaskResult.empty(request);
}

// Wait at least 1 second
long waitDurationSeconds = Math.max(
Duration.between(now, start.plusMillis(duration.toMillis())).getSeconds(),
1);

if (blocking) {
logger.debug("waiting for {}s", waitDurationSeconds);
try {
TimeUnit.SECONDS.sleep(waitDurationSeconds);
return TaskResult.empty(request);
}
catch (InterruptedException e) {
// The blocking wait task will be restarted from the beginning when interrupted.
//
// There is room to improve this behavior by making the task resume from when interrupted.
// But this operator, especially blocking mode, is for development use,
// so we'll go with this simple implementation for now.
throw new RuntimeException("`wait` operator with blocking mode is interrupted and this will be restarted from the beginning of the wait");
}
}
else {
if (pollInterval.isPresent()) {
waitDurationSeconds = pollInterval.get().getSeconds();
}
logger.debug("polling after {}s", waitDurationSeconds);
throw TaskExecutionException.ofNextPolling(
(int) waitDurationSeconds,
ConfigElement.copyOf(
request.getLastStateParams().set(WAIT_START_TIME_PARAM, start.toEpochMilli())));
}
}
}
}
120 changes: 120 additions & 0 deletions digdag-tests/src/test/java/acceptance/WaitIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package acceptance;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import utils.CommandStatus;

import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.function.Supplier;

import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertThat;
import static utils.TestUtils.copyResource;
import static utils.TestUtils.main;

public class WaitIT
{
@Rule
public TemporaryFolder folder = new TemporaryFolder();

private Path root()
{
return folder.getRoot().toPath().toAbsolutePath();
}

private static class ExecResult
{
CommandStatus commandStatus;
Duration duration;

public ExecResult(CommandStatus commandStatus, Duration duration)
{
this.commandStatus = commandStatus;
this.duration = duration;
}
}

private ExecResult runAndMonitorDuration(Supplier<CommandStatus> task)
{
Instant start = Instant.now();
CommandStatus commandStatus = task.get();
Duration duration = Duration.between(start, Instant.now());
return new ExecResult(commandStatus, duration);
}

private void testWorkflow(String workflowName, int expectedDuration)
throws Exception
{
String nowaitResourcePath = "acceptance/wait/nowait.dig";
String targetResourcePath = "acceptance/wait/" + workflowName;

Duration baselineDuration;
{
copyResource(nowaitResourcePath, root().resolve("wait.dig"));
ExecResult result = runAndMonitorDuration(() ->
main("run", "-o", root().toString(), "--project", root().toString(), "wait.dig"));
CommandStatus status = result.commandStatus;
assertThat(status.errUtf8(), status.code(), is(0));
baselineDuration = result.duration;
}

{
copyResource(targetResourcePath, root().resolve("wait.dig"));
ExecResult result = runAndMonitorDuration(() ->
main("run", "-o", root().toString(), "--project", root().toString(), "wait.dig"));
CommandStatus status = result.commandStatus;
assertThat(status.errUtf8(), status.code(), is(0));
assertThat(result.duration, greaterThan(baselineDuration));
assertThat(result.duration, lessThan(
// Actual wait duration can be longer than the specified 10 seconds for some reason
baselineDuration.plusSeconds(expectedDuration * 3)));
}
}

@Test
public void testSimpleVersion()
throws Exception
{
testWorkflow("wait.dig", 10);
}

@Test
public void testBlockingMode()
throws Exception
{
testWorkflow("wait_blocking.dig", 10);
}

@Test
public void testNonBlockingMode()
throws Exception
{
testWorkflow("wait_nonblocking.dig", 10);
}

@Test
public void testPollInterval()
throws Exception
{
testWorkflow("wait_poll_interval.dig", 10);
}

@Test
public void testInvalidConfig()
throws Exception
{
String targetResourcePath = "acceptance/wait/wait_invalid_config.dig";

copyResource(targetResourcePath, root().resolve("wait.dig"));
ExecResult result = runAndMonitorDuration(() ->
main("run", "-o", root().toString(), "--project", root().toString(), "wait.dig"));
CommandStatus status = result.commandStatus;
// The workflow contains a conflict configuration and it should fail.
assertThat(status.errUtf8(), status.code(), is(1));
}
}
2 changes: 2 additions & 0 deletions digdag-tests/src/test/resources/acceptance/wait/nowait.dig
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
+nowait:
echo>: nowait
2 changes: 2 additions & 0 deletions digdag-tests/src/test/resources/acceptance/wait/wait.dig
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
+wait:
wait>: 10s
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
+wait:
wait>: 10s
blocking: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
+wait:
wait>: 10s
blocking: true
poll_interval: 5s
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
+wait:
wait>: 10s
blocking: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
+wait:
wait>: 10s
poll_interval: 5s

0 comments on commit dd4b18f

Please sign in to comment.