Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,21 @@
import com.github.dockerjava.core.DockerClientImpl;
import com.github.dockerjava.core.NameParser;
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
import io.serverlessworkflow.api.types.Container;
import io.serverlessworkflow.api.types.ContainerLifetime;
import io.serverlessworkflow.api.types.TimeoutAfter;
import io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowUtils;
import io.serverlessworkflow.impl.WorkflowValueResolver;
import io.serverlessworkflow.impl.executors.CallableTask;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

class ContainerRunner {
class ContainerRunner implements CallableTask {

private static final DefaultDockerClientConfig DEFAULT_CONFIG =
DefaultDockerClientConfig.createDefaultConfigBuilder().build();
Expand All @@ -64,14 +61,19 @@ private static class DockerClientHolder {
private final ContainerCleanupPolicy policy;
private final String containerImage;

private ContainerRunner(ContainerRunnerBuilder builder) {
this.propertySetters = builder.propertySetters;
this.timeout = Optional.ofNullable(builder.timeout);
this.policy = builder.policy;
this.containerImage = builder.containerImage;
public ContainerRunner(
Collection<ContainerPropertySetter> propertySetters,
Optional<WorkflowValueResolver<Duration>> timeout,
ContainerCleanupPolicy policy,
String containerImage) {
this.propertySetters = propertySetters;
this.timeout = timeout;
this.policy = policy;
this.containerImage = containerImage;
}

CompletableFuture<WorkflowModel> start(
@Override
public CompletableFuture<WorkflowModel> apply(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
return CompletableFuture.supplyAsync(
() -> startSync(workflowContext, taskContext, input),
Expand Down Expand Up @@ -215,52 +217,4 @@ private static RuntimeException mapExitCode(int exit) {
private static RuntimeException failed(String message) {
return new RuntimeException(message);
}

static ContainerRunnerBuilder builder() {
return new ContainerRunnerBuilder();
}

public static class ContainerRunnerBuilder {
private Container container;
private WorkflowDefinition definition;
private WorkflowValueResolver<Duration> timeout;
private ContainerCleanupPolicy policy;
private String containerImage;
private Collection<ContainerPropertySetter> propertySetters = new ArrayList<>();

private ContainerRunnerBuilder() {}

ContainerRunnerBuilder withContainer(Container container) {
this.container = container;
return this;
}

public ContainerRunnerBuilder withWorkflowDefinition(WorkflowDefinition definition) {
this.definition = definition;
return this;
}

ContainerRunner build() {
propertySetters.add(new NamePropertySetter(definition, container));
propertySetters.add(new CommandPropertySetter(definition, container));
propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container));
propertySetters.add(new LifetimePropertySetter(container));
propertySetters.add(new PortsPropertySetter(container));
propertySetters.add(new VolumesPropertySetter(definition, container));

containerImage = container.getImage();
if (containerImage == null || container.getImage().isBlank()) {
throw new IllegalArgumentException("Container image must be provided");
}
ContainerLifetime lifetime = container.getLifetime();
if (lifetime != null) {
policy = lifetime.getCleanup();
TimeoutAfter afterTimeout = lifetime.getAfter();
if (afterTimeout != null)
timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout);
}

return new ContainerRunner(this);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.container.executors;

import io.serverlessworkflow.api.types.Container;
import io.serverlessworkflow.api.types.ContainerLifetime;
import io.serverlessworkflow.api.types.ContainerLifetime.ContainerCleanupPolicy;
import io.serverlessworkflow.api.types.RunContainer;
import io.serverlessworkflow.api.types.RunTaskConfiguration;
import io.serverlessworkflow.api.types.TimeoutAfter;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowUtils;
import io.serverlessworkflow.impl.WorkflowValueResolver;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.executors.RunnableTaskBuilder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;

public class RunContainerExecutorBuilder implements RunnableTaskBuilder<RunContainer> {

@Override
public CallableTask build(RunContainer taskConfiguration, WorkflowDefinition definition) {
Collection<ContainerPropertySetter> propertySetters = new ArrayList<>();
Container container = taskConfiguration.getContainer();
propertySetters.add(new NamePropertySetter(definition, container));
propertySetters.add(new CommandPropertySetter(definition, container));
propertySetters.add(new ContainerEnvironmentPropertySetter(definition, container));
propertySetters.add(new LifetimePropertySetter(container));
propertySetters.add(new PortsPropertySetter(container));
propertySetters.add(new VolumesPropertySetter(definition, container));

ContainerCleanupPolicy policy = null;
WorkflowValueResolver<Duration> timeout = null;
ContainerLifetime lifetime = container.getLifetime();
if (lifetime != null) {
policy = lifetime.getCleanup();
TimeoutAfter afterTimeout = lifetime.getAfter();
if (afterTimeout != null)
timeout = WorkflowUtils.fromTimeoutAfter(definition.application(), afterTimeout);
}
return new ContainerRunner(
propertySetters, Optional.ofNullable(timeout), policy, container.getImage());
}

@Override
public boolean accept(Class<? extends RunTaskConfiguration> clazz) {
return RunContainer.class.equals(clazz);
}
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
io.serverlessworkflow.impl.container.executors.RunContainerExecutor
io.serverlessworkflow.impl.container.executors.RunContainerExecutorBuilder
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class WorkflowMutableInstance implements WorkflowInstance {
protected AtomicReference<CompletableFuture<WorkflowModel>> futureRef = new AtomicReference<>();
protected Instant completedAt;

protected final Map<String, Object> additionalObjects = new ConcurrentHashMap<String, Object>();
protected final Map<String, Object> additionalObjects = new ConcurrentHashMap<>();

private Lock statusLock = new ReentrantLock();
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;

public abstract class AbstractConfigManager implements ConfigManager {
Expand Down Expand Up @@ -56,5 +59,19 @@ protected <T> T convert(String value, Class<T> propClass) {
return propClass.cast(result);
}

@Override
public <T> Collection<T> multiConfig(String propName, Class<T> propClass) {
String multiValue = get(propName);
if (multiValue != null) {
Collection<T> result = new ArrayList<>();
for (String value : multiValue.split(",")) {
result.add(convert(value, propClass));
}
return result;
} else {
return Collections.emptyList();
}
}

protected abstract <T> T convertComplex(String value, Class<T> propClass);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@
package io.serverlessworkflow.impl.config;

import io.serverlessworkflow.impl.ServicePriority;
import java.util.Collection;
import java.util.List;
import java.util.Optional;

public interface ConfigManager extends ServicePriority {

<T> Optional<T> config(String propName, Class<T> propClass);

default <T> Collection<T> multiConfig(String propName, Class<T> propClass) {
return List.of();
}

Iterable<String> names();
}
Loading