Skip to content

Commit 7d74e17

Browse files
committed
[Fix #936] Alternative approach
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 6fe63bd commit 7d74e17

File tree

28 files changed

+582
-466
lines changed

28 files changed

+582
-466
lines changed

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/ContainerRunner.java

Lines changed: 14 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,21 @@
2828
import com.github.dockerjava.core.DockerClientImpl;
2929
import com.github.dockerjava.core.NameParser;
3030
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
31-
import io.serverlessworkflow.api.types.Container;
32-
import io.serverlessworkflow.api.types.ContainerLifetime;
33-
import io.serverlessworkflow.api.types.TimeoutAfter;
31+
import io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy;
3432
import io.serverlessworkflow.impl.TaskContext;
3533
import io.serverlessworkflow.impl.WorkflowContext;
36-
import io.serverlessworkflow.impl.WorkflowDefinition;
3734
import io.serverlessworkflow.impl.WorkflowModel;
3835
import io.serverlessworkflow.impl.WorkflowUtils;
3936
import io.serverlessworkflow.impl.WorkflowValueResolver;
37+
import io.serverlessworkflow.impl.executors.CallableTask;
4038
import java.io.IOException;
4139
import java.time.Duration;
42-
import java.util.ArrayList;
4340
import java.util.Collection;
4441
import java.util.Optional;
4542
import java.util.concurrent.CompletableFuture;
4643
import java.util.concurrent.TimeUnit;
4744

48-
class ContainerRunner {
45+
class ContainerRunner implements CallableTask {
4946

5047
private static final DefaultDockerClientConfig DEFAULT_CONFIG =
5148
DefaultDockerClientConfig.createDefaultConfigBuilder().build();
@@ -64,14 +61,19 @@ private static class DockerClientHolder {
6461
private final ContainerCleanupPolicy policy;
6562
private final String containerImage;
6663

67-
private ContainerRunner(ContainerRunnerBuilder builder) {
68-
this.propertySetters = builder.propertySetters;
69-
this.timeout = Optional.ofNullable(builder.timeout);
70-
this.policy = builder.policy;
71-
this.containerImage = builder.containerImage;
64+
public ContainerRunner(
65+
Collection<ContainerPropertySetter> propertySetters,
66+
Optional<WorkflowValueResolver<Duration>> timeout,
67+
ContainerCleanupPolicy policy,
68+
String containerImage) {
69+
this.propertySetters = propertySetters;
70+
this.timeout = timeout;
71+
this.policy = policy;
72+
this.containerImage = containerImage;
7273
}
7374

74-
CompletableFuture<WorkflowModel> start(
75+
@Override
76+
public CompletableFuture<WorkflowModel> apply(
7577
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
7678
return CompletableFuture.supplyAsync(
7779
() -> startSync(workflowContext, taskContext, input),
@@ -215,52 +217,4 @@ private static RuntimeException mapExitCode(int exit) {
215217
private static RuntimeException failed(String message) {
216218
return new RuntimeException(message);
217219
}
218-
219-
static ContainerRunnerBuilder builder() {
220-
return new ContainerRunnerBuilder();
221-
}
222-
223-
public static class ContainerRunnerBuilder {
224-
private Container container;
225-
private WorkflowDefinition definition;
226-
private WorkflowValueResolver<Duration> timeout;
227-
private ContainerCleanupPolicy policy;
228-
private String containerImage;
229-
private Collection<ContainerPropertySetter> propertySetters = new ArrayList<>();
230-
231-
private ContainerRunnerBuilder() {}
232-
233-
ContainerRunnerBuilder withContainer(Container container) {
234-
this.container = container;
235-
return this;
236-
}
237-
238-
public ContainerRunnerBuilder withWorkflowDefinition(WorkflowDefinition definition) {
239-
this.definition = definition;
240-
return this;
241-
}
242-
243-
ContainerRunner build() {
244-
propertySetters.add(new NamePropertySetter(definition, container));
245-
propertySetters.add(new CommandPropertySetter(definition, container));
246-
propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container));
247-
propertySetters.add(new LifetimePropertySetter(container));
248-
propertySetters.add(new PortsPropertySetter(container));
249-
propertySetters.add(new VolumesPropertySetter(definition, container));
250-
251-
containerImage = container.getImage();
252-
if (containerImage == null || container.getImage().isBlank()) {
253-
throw new IllegalArgumentException("Container image must be provided");
254-
}
255-
ContainerLifetime lifetime = container.getLifetime();
256-
if (lifetime != null) {
257-
policy = lifetime.getCleanup();
258-
TimeoutAfter afterTimeout = lifetime.getAfter();
259-
if (afterTimeout != null)
260-
timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout);
261-
}
262-
263-
return new ContainerRunner(this);
264-
}
265-
}
266220
}

impl/container/src/main/java/io/serverlessworkflow/impl/container/executors/RunContainerExecutor.java

Lines changed: 0 additions & 51 deletions
This file was deleted.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.container.executors;
17+
18+
import io.serverlessworkflow.api.types.Container;
19+
import io.serverlessworkflow.api.types.ContainerLifetime;
20+
import io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy;
21+
import io.serverlessworkflow.api.types.RunContainer;
22+
import io.serverlessworkflow.api.types.RunTaskConfiguration;
23+
import io.serverlessworkflow.api.types.TimeoutAfter;
24+
import io.serverlessworkflow.impl.WorkflowDefinition;
25+
import io.serverlessworkflow.impl.WorkflowUtils;
26+
import io.serverlessworkflow.impl.WorkflowValueResolver;
27+
import io.serverlessworkflow.impl.executors.CallableTask;
28+
import io.serverlessworkflow.impl.executors.RunnableTaskBuilder;
29+
import java.time.Duration;
30+
import java.util.ArrayList;
31+
import java.util.Collection;
32+
import java.util.Objects;
33+
import java.util.Optional;
34+
35+
public class RunContainerExecutorBuilder implements RunnableTaskBuilder<RunContainer> {
36+
37+
@Override
38+
public CallableTask build(RunContainer taskConfiguration, WorkflowDefinition definition) {
39+
Collection<ContainerPropertySetter> propertySetters = new ArrayList<>();
40+
Container container = taskConfiguration.getContainer();
41+
propertySetters.add(new NamePropertySetter(definition, container));
42+
propertySetters.add(new CommandPropertySetter(definition, container));
43+
propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container));
44+
propertySetters.add(new LifetimePropertySetter(container));
45+
propertySetters.add(new PortsPropertySetter(container));
46+
propertySetters.add(new VolumesPropertySetter(definition, container));
47+
48+
if (WorkflowUtils.isValid(container.getImage())) {
49+
throw new IllegalArgumentException();
50+
}
51+
ContainerCleanupPolicy policy = null;
52+
WorkflowValueResolver<Duration> timeout = null;
53+
ContainerLifetime lifetime = container.getLifetime();
54+
if (lifetime != null) {
55+
policy = lifetime.getCleanup();
56+
TimeoutAfter afterTimeout = lifetime.getAfter();
57+
if (afterTimeout != null)
58+
timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout);
59+
}
60+
return new ContainerRunner(
61+
propertySetters,
62+
Optional.ofNullable(timeout),
63+
policy,
64+
Objects.requireNonNull(container.getImage(), "Container image must be provided"));
65+
}
66+
67+
@Override
68+
public boolean accept(Class<? extends RunTaskConfiguration> clazz) {
69+
return RunContainer.class.equals(clazz);
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
io.serverlessworkflow.impl.container.executors.RunContainerExecutor
1+
io.serverlessworkflow.impl.container.executors.RunContainerExecutorBuilder

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,10 @@ public class WorkflowMutableInstance implements WorkflowInstance {
4545
protected final WorkflowContext workflowContext;
4646
protected Instant startedAt;
4747

48-
protected final Map<String, Object> additionalObjects = new ConcurrentHashMap<String, Object>();
49-
5048
protected AtomicReference<CompletableFuture<WorkflowModel>> futureRef = new AtomicReference<>();
5149
protected Instant completedAt;
5250

53-
protected final Map<String, Object> additionalObjects = new ConcurrentHashMap<String, Object>();
51+
protected final Map<String, Object> additionalObjects = new ConcurrentHashMap<>();
5452

5553
private Lock statusLock = new ReentrantLock();
5654
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;
@@ -292,8 +290,4 @@ public <T> T additionalObject(String key, Supplier<T> supplier) {
292290
}
293291

294292
public void restoreContext(WorkflowContext workflow, TaskContext context) {}
295-
296-
public <T> T additionalObject(String key, Supplier<T> supplier) {
297-
return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get());
298-
}
299293
}

0 commit comments

Comments
 (0)