From ce3cd2a434cf46c66073940450208cacc6f8d641 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Tue, 25 Nov 2025 19:23:05 +0100 Subject: [PATCH] [Fix #934] Implementing function catalog Signed-off-by: fjtirado [Fix #934] Implementing function catalog (alt) Signed-off-by: fjtirado --- .../impl/FunctionReader.java | 22 ++++ .../serverlessworkflow/impl/NamedObject.java | 20 ++++ .../impl/WorkflowApplication.java | 8 ++ .../impl/WorkflowUtils.java | 8 +- .../NamedWorkflowAdditionalObject.java | 7 +- .../impl/executors/CallFunctionExecutor.java | 77 +++++++++++--- .../impl/resources/DefaultResourceLoader.java | 89 +++------------- .../impl/resources/GitHubHelper.java | 43 ++++++++ .../impl/resources/HttpResource.java | 2 +- .../impl/resources/ResourceLoader.java | 100 +++++++++++++++++- impl/function/pom.xml | 24 +++++ .../function/JacksonFunctionReader.java | 36 +++++++ .../io.serverlessworkflow.impl.FunctionReader | 1 + impl/jackson/pom.xml | 4 + impl/pom.xml | 10 +- .../impl/test/RetryTimeoutTest.java | 17 +++ ...call-custom-function-cataloged-global.yaml | 17 +++ .../call-custom-function-cataloged.yaml | 13 +++ impl/validation/pom.xml | 2 +- 19 files changed, 398 insertions(+), 102 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/FunctionReader.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/NamedObject.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/resources/GitHubHelper.java create mode 100644 impl/function/pom.xml create mode 100644 impl/function/src/main/java/io/serverlessworkflow/impl/jackson/function/JacksonFunctionReader.java create mode 100644 impl/function/src/main/resources/META-INF/services/io.serverlessworkflow.impl.FunctionReader create mode 100644 impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged-global.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/FunctionReader.java b/impl/core/src/main/java/io/serverlessworkflow/impl/FunctionReader.java new file mode 100644 index 00000000..755a98c1 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/FunctionReader.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import io.serverlessworkflow.api.types.Task; +import io.serverlessworkflow.impl.resources.ExternalResourceHandler; +import java.util.function.Function; + +public interface FunctionReader extends Function {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/NamedObject.java b/impl/core/src/main/java/io/serverlessworkflow/impl/NamedObject.java new file mode 100644 index 00000000..5ac5f80e --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/NamedObject.java @@ -0,0 +1,20 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +public interface NamedObject { + String name(); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index c354325c..b121c332 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -76,6 +76,7 @@ public class WorkflowApplication implements AutoCloseable { private final SecretManager secretManager; private final SchedulerListener schedulerListener; private final Optional templateResolver; + private final Optional functionReader; private WorkflowApplication(Builder builder) { this.taskFactory = builder.taskFactory; @@ -98,6 +99,7 @@ private WorkflowApplication(Builder builder) { this.configManager = builder.configManager; this.secretManager = builder.secretManager; this.templateResolver = builder.templateResolver; + this.functionReader = builder.functionReader; } public TaskExecutorFactory taskFactory() { @@ -178,6 +180,7 @@ public SchemaValidator getValidator(SchemaInline inline) { private ConfigManager configManager; private SchedulerListener schedulerListener; private Optional templateResolver; + private Optional functionReader; private Builder() { ServiceLoader.load(NamedWorkflowAdditionalObject.class) @@ -329,6 +332,7 @@ public WorkflowApplication build() { .orElseGet(() -> new ConfigSecretManager(configManager)); } templateResolver = ServiceLoader.load(URITemplateResolver.class).findFirst(); + functionReader = ServiceLoader.load(FunctionReader.class).findFirst(); return new WorkflowApplication(this); } } @@ -406,6 +410,10 @@ public Optional templateResolver() { return templateResolver; } + public Optional functionReader() { + return functionReader; + } + public Optional additionalObject( String name, WorkflowContext workflowContext, TaskContext taskContext) { return Optional.ofNullable(additionalObjects.get(name)) diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index bedae900..46ee3d24 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -51,12 +51,8 @@ public static Optional getSchemaValidator( return Optional.of(validatorFactory.getValidator(schema.getSchemaInline())); } else if (schema.getSchemaExternal() != null) { return Optional.of( - resourceLoader.load( - schema.getSchemaExternal().getResource(), - validatorFactory::getValidator, - null, - null, - null)); + resourceLoader.loadStatic( + schema.getSchemaExternal().getResource(), validatorFactory::getValidator)); } } return Optional.empty(); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/additional/NamedWorkflowAdditionalObject.java b/impl/core/src/main/java/io/serverlessworkflow/impl/additional/NamedWorkflowAdditionalObject.java index 7991bade..af8c36a0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/additional/NamedWorkflowAdditionalObject.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/additional/NamedWorkflowAdditionalObject.java @@ -15,6 +15,7 @@ */ package io.serverlessworkflow.impl.additional; -public interface NamedWorkflowAdditionalObject extends WorkflowAdditionalObject { - String name(); -} +import io.serverlessworkflow.impl.NamedObject; + +public interface NamedWorkflowAdditionalObject + extends WorkflowAdditionalObject, NamedObject {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallFunctionExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallFunctionExecutor.java index 416cad30..0b60486b 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallFunctionExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallFunctionExecutor.java @@ -16,13 +16,20 @@ package io.serverlessworkflow.impl.executors; import io.serverlessworkflow.api.types.CallFunction; +import io.serverlessworkflow.api.types.Catalog; import io.serverlessworkflow.api.types.FunctionArguments; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.Use; +import io.serverlessworkflow.api.types.UseCatalogs; +import io.serverlessworkflow.api.types.UseFunctions; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.resources.ExternalResourceHandler; +import io.serverlessworkflow.impl.resources.ResourceLoader; +import java.net.URI; import java.util.Map; import java.util.Optional; @@ -35,25 +42,71 @@ public class CallFunctionExecutor implements CallableTaskBuilder { public void init( CallFunction task, WorkflowDefinition definition, WorkflowMutablePosition position) { String functionName = task.getCall(); + Use use = definition.workflow().getUse(); + Task function = null; + if (use != null) { + UseFunctions functions = use.getFunctions(); + if (functions != null) { + function = functions.getAdditionalProperties().get(functionName); + } + if (function == null) { + int indexOf = functionName.indexOf('@'); + if (indexOf > 0) { + String catalogName = functionName.substring(indexOf + 1); + UseCatalogs catalogs = use.getCatalogs(); + if (catalogs != null) { + Catalog catalog = catalogs.getAdditionalProperties().get(catalogName); + ResourceLoader loader = definition.resourceLoader(); + function = + definition + .resourceLoader() + .loadURI( + WorkflowUtils.concatURI( + loader.uri(catalog.getEndpoint()), + pathFromFunctionName(functionName.substring(0, indexOf))), + h -> from(definition, h)); + } + } + } + } + if (function == null) { + function = + definition.resourceLoader().loadURI(URI.create(functionName), h -> from(definition, h)); + } + executorBuilder = + definition.application().taskFactory().getTaskExecutor(position, function, definition); FunctionArguments functionArgs = task.getWith(); args = functionArgs != null ? WorkflowUtils.buildMapResolver( definition.application(), functionArgs.getAdditionalProperties()) : (w, t, m) -> Map.of(); - Task function = null; - if (definition.workflow().getUse() != null - && definition.workflow().getUse().getFunctions() != null - && definition.workflow().getUse().getFunctions().getAdditionalProperties() != null) { - function = - definition.workflow().getUse().getFunctions().getAdditionalProperties().get(functionName); - } - if (function == null) { - // TODO search in catalog - throw new UnsupportedOperationException("Function Catalog not supported yet"); + } + + private String pathFromFunctionName(String functionName) { + int sep = functionName.indexOf(":"); + if (sep < 0) { + throw new IllegalArgumentException( + "Invalid function name " + + functionName + + ". It has to be of the format :"); } - executorBuilder = - definition.application().taskFactory().getTaskExecutor(position, function, definition); + StringBuilder sb = new StringBuilder(functionName); + sb.setCharAt(sep, '/'); + sb.insert(0, "main/functions/"); + sb.append("/function.yaml"); + return sb.toString(); + } + + private Task from(WorkflowDefinition definition, ExternalResourceHandler handler) { + return definition + .application() + .functionReader() + .map(v -> v.apply(handler)) + .orElseThrow( + () -> + new IllegalStateException( + "No converter from external resource to function found. Make sure a dependency that includes an implementation of FunctionReader is included")); } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/resources/DefaultResourceLoader.java b/impl/core/src/main/java/io/serverlessworkflow/impl/resources/DefaultResourceLoader.java index e3018717..cdadf8fb 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/resources/DefaultResourceLoader.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/resources/DefaultResourceLoader.java @@ -15,17 +15,7 @@ */ package io.serverlessworkflow.impl.resources; -import static io.serverlessworkflow.impl.WorkflowUtils.getURISupplier; - -import io.serverlessworkflow.api.types.Endpoint; -import io.serverlessworkflow.api.types.EndpointUri; -import io.serverlessworkflow.api.types.ExternalResource; -import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowApplication; -import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowValueResolver; -import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; @@ -35,18 +25,31 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; -public class DefaultResourceLoader implements ResourceLoader { +public class DefaultResourceLoader extends ResourceLoader { private final Optional workflowPath; - private final WorkflowApplication application; private Map resourceCache = new ConcurrentHashMap<>(); protected DefaultResourceLoader(WorkflowApplication application, Path workflowPath) { - this.application = application; + super(application); this.workflowPath = Optional.ofNullable(workflowPath); } + @Override + public T loadURI(URI uri, Function function) { + ExternalResourceHandler resourceHandler = buildFromURI(uri); + return (T) + resourceCache + .compute( + resourceHandler, + (k, v) -> + v == null || k.shouldReload(v.lastReload()) + ? new CachedResource(Instant.now(), function.apply(k)) + : v) + .content(); + } + private ExternalResourceHandler fileResource(String pathStr) { Path path = Path.of(pathStr); if (path.isAbsolute()) { @@ -74,66 +77,6 @@ private ExternalResourceHandler buildFromURI(URI uri) { } @Override - public T load( - ExternalResource resource, - Function function, - WorkflowContext workflowContext, - TaskContext taskContext, - WorkflowModel model) { - ExternalResourceHandler resourceHandler = - buildFromURI( - uriSupplier(resource.getEndpoint()) - .apply( - workflowContext, - taskContext, - model == null ? application.modelFactory().fromNull() : model)); - return (T) - resourceCache - .compute( - resourceHandler, - (k, v) -> - v == null || k.shouldReload(v.lastReload()) - ? new CachedResource(Instant.now(), function.apply(k)) - : v) - .content(); - } - - @Override - public WorkflowValueResolver uriSupplier(Endpoint endpoint) { - if (endpoint.getEndpointConfiguration() != null) { - EndpointUri uri = endpoint.getEndpointConfiguration().getUri(); - if (uri.getLiteralEndpointURI() != null) { - return getURISupplier(application, uri.getLiteralEndpointURI()); - } else if (uri.getExpressionEndpointURI() != null) { - return new ExpressionURISupplier( - application - .expressionFactory() - .resolveString(ExpressionDescriptor.from(uri.getExpressionEndpointURI()))); - } - } else if (endpoint.getRuntimeExpression() != null) { - return new ExpressionURISupplier( - application - .expressionFactory() - .resolveString(ExpressionDescriptor.from(endpoint.getRuntimeExpression()))); - } else if (endpoint.getUriTemplate() != null) { - return getURISupplier(application, endpoint.getUriTemplate()); - } - throw new IllegalArgumentException("Invalid endpoint definition " + endpoint); - } - - private class ExpressionURISupplier implements WorkflowValueResolver { - private WorkflowValueResolver expr; - - public ExpressionURISupplier(WorkflowValueResolver expr) { - this.expr = expr; - } - - @Override - public URI apply(WorkflowContext workflow, TaskContext task, WorkflowModel node) { - return URI.create(expr.apply(workflow, task, node)); - } - } - public void close() { resourceCache.clear(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/resources/GitHubHelper.java b/impl/core/src/main/java/io/serverlessworkflow/impl/resources/GitHubHelper.java new file mode 100644 index 00000000..e0e52b84 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/resources/GitHubHelper.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.resources; + +import java.io.UncheckedIOException; +import java.net.MalformedURLException; +import java.net.URL; + +public class GitHubHelper { + + private GitHubHelper() {} + + private static final String BLOB = "blob/"; + + public static URL handleURL(URL url) { + if (url.getHost().equals("github.com")) { + try { + String path = url.getPath(); + if (path.startsWith(BLOB)) { + path = path.substring(BLOB.length()); + } + return new URL(url.getProtocol(), "raw.githubusercontent.com", url.getPort(), path); + } catch (MalformedURLException e) { + throw new UncheckedIOException(e); + } + } else { + return url; + } + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/resources/HttpResource.java b/impl/core/src/main/java/io/serverlessworkflow/impl/resources/HttpResource.java index 7e2cd70d..c312b58b 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/resources/HttpResource.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/resources/HttpResource.java @@ -28,7 +28,7 @@ public class HttpResource implements ExternalResourceHandler { private URL url; public HttpResource(URL url) { - this.url = url; + this.url = GitHubHelper.handleURL(url); } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/resources/ResourceLoader.java b/impl/core/src/main/java/io/serverlessworkflow/impl/resources/ResourceLoader.java index 0c17077e..86eb6678 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/resources/ResourceLoader.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/resources/ResourceLoader.java @@ -15,23 +15,115 @@ */ package io.serverlessworkflow.impl.resources; +import static io.serverlessworkflow.impl.WorkflowUtils.getURISupplier; + import io.serverlessworkflow.api.types.Endpoint; +import io.serverlessworkflow.api.types.EndpointUri; import io.serverlessworkflow.api.types.ExternalResource; +import io.serverlessworkflow.api.types.UriTemplate; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; import java.net.URI; import java.util.function.Function; -public interface ResourceLoader extends AutoCloseable { +public abstract class ResourceLoader implements AutoCloseable { + + protected final WorkflowApplication application; + + public ResourceLoader(WorkflowApplication application) { + this.application = application; + } + + public URI uri(Endpoint endpoint) { + if (endpoint.getEndpointConfiguration() != null) { + EndpointUri uri = endpoint.getEndpointConfiguration().getUri(); + if (uri.getLiteralEndpointURI() != null) { + return uri(uri.getLiteralEndpointURI()); + } + } else if (endpoint.getUriTemplate() != null) { + return uri(endpoint.getUriTemplate()); + } + throw new IllegalArgumentException("Endpoint definition is not static " + endpoint); + } - WorkflowValueResolver uriSupplier(Endpoint endpoint); + public WorkflowValueResolver uriSupplier(Endpoint endpoint) { + if (endpoint.getEndpointConfiguration() != null) { + EndpointUri uri = endpoint.getEndpointConfiguration().getUri(); + if (uri.getLiteralEndpointURI() != null) { + return getURISupplier(application, uri.getLiteralEndpointURI()); + } else if (uri.getExpressionEndpointURI() != null) { + return new ExpressionURISupplier( + application + .expressionFactory() + .resolveString(ExpressionDescriptor.from(uri.getExpressionEndpointURI()))); + } + } else if (endpoint.getRuntimeExpression() != null) { + return new ExpressionURISupplier( + application + .expressionFactory() + .resolveString(ExpressionDescriptor.from(endpoint.getRuntimeExpression()))); + } else if (endpoint.getUriTemplate() != null) { + return getURISupplier(application, endpoint.getUriTemplate()); + } + throw new IllegalArgumentException("Invalid endpoint definition " + endpoint); + } - T load( + public T loadStatic( + ExternalResource resource, Function function) { + return loadStatic(resource.getEndpoint(), function); + } + + public T loadStatic(Endpoint endpoint, Function function) { + return loadURI(uri(endpoint), function); + } + + public T load( ExternalResource resource, Function function, WorkflowContext workflowContext, TaskContext taskContext, - WorkflowModel model); + WorkflowModel model) { + return load(resource.getEndpoint(), function, workflowContext, taskContext, model); + } + + public T load( + Endpoint endPoint, + Function function, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model) { + return loadURI( + uriSupplier(endPoint) + .apply( + workflowContext, + taskContext, + model == null ? application.modelFactory().fromNull() : model), + function); + } + + public abstract T loadURI(URI uri, Function function); + + private class ExpressionURISupplier implements WorkflowValueResolver { + private WorkflowValueResolver expr; + + public ExpressionURISupplier(WorkflowValueResolver expr) { + this.expr = expr; + } + + @Override + public URI apply(WorkflowContext workflow, TaskContext task, WorkflowModel node) { + return URI.create(expr.apply(workflow, task, node)); + } + } + + private URI uri(UriTemplate template) { + if (template.getLiteralUri() != null) { + return template.getLiteralUri(); + } + throw new IllegalArgumentException("Template definition is not static " + template); + } } diff --git a/impl/function/pom.xml b/impl/function/pom.xml new file mode 100644 index 00000000..3ea4b79b --- /dev/null +++ b/impl/function/pom.xml @@ -0,0 +1,24 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-function + Serverless Workflow :: Impl :: Jackson:: Validation + + + io.serverlessworkflow + serverlessworkflow-api + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + io.serverlessworkflow + serverlessworkflow-impl-json + + + \ No newline at end of file diff --git a/impl/function/src/main/java/io/serverlessworkflow/impl/jackson/function/JacksonFunctionReader.java b/impl/function/src/main/java/io/serverlessworkflow/impl/jackson/function/JacksonFunctionReader.java new file mode 100644 index 00000000..425fec26 --- /dev/null +++ b/impl/function/src/main/java/io/serverlessworkflow/impl/jackson/function/JacksonFunctionReader.java @@ -0,0 +1,36 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.jackson.function; + +import io.serverlessworkflow.api.WorkflowFormat; +import io.serverlessworkflow.api.types.Task; +import io.serverlessworkflow.impl.FunctionReader; +import io.serverlessworkflow.impl.resources.ExternalResourceHandler; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; + +public class JacksonFunctionReader implements FunctionReader { + + @Override + public Task apply(ExternalResourceHandler handler) { + try (InputStream in = handler.open()) { + return WorkflowFormat.fromFileName(handler.name()).mapper().readValue(in, Task.class); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/impl/function/src/main/resources/META-INF/services/io.serverlessworkflow.impl.FunctionReader b/impl/function/src/main/resources/META-INF/services/io.serverlessworkflow.impl.FunctionReader new file mode 100644 index 00000000..0c8ebd73 --- /dev/null +++ b/impl/function/src/main/resources/META-INF/services/io.serverlessworkflow.impl.FunctionReader @@ -0,0 +1 @@ +io.serverlessworkflow.impl.jackson.function.JacksonFunctionReader \ No newline at end of file diff --git a/impl/jackson/pom.xml b/impl/jackson/pom.xml index be0c7336..1f3bae03 100644 --- a/impl/jackson/pom.xml +++ b/impl/jackson/pom.xml @@ -28,6 +28,10 @@ io.serverlessworkflow serverlessworkflow-impl-validation + + io.serverlessworkflow + serverlessworkflow-impl-function + io.serverlessworkflow serverlessworkflow-impl-lifecycle-events diff --git a/impl/pom.xml b/impl/pom.xml index 6843e0f4..b052dd52 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -50,12 +50,17 @@ serverlessworkflow-impl-model ${project.version} - + io.serverlessworkflow serverlessworkflow-impl-validation ${project.version} - + + io.serverlessworkflow + serverlessworkflow-impl-function + ${project.version} + + io.serverlessworkflow serverlessworkflow-impl-lifecycle-events ${project.version} @@ -178,6 +183,7 @@ model lifecycleevent validation + function container test script-js diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java index 565cc24d..c60e5b73 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java @@ -136,4 +136,21 @@ void testCustomFunction() { .extracting(w -> ((WorkflowException) w.getCause()).getWorkflowError().status()) .isEqualTo(404); } + + @ParameterizedTest + @ValueSource( + strings = { + "workflows-samples/call-custom-function-cataloged.yaml", + "workflows-samples/call-custom-function-cataloged-global.yaml" + }) + void testCustomCatalogFunction(String fileName) throws IOException { + assertThatThrownBy( + () -> + app.workflowDefinition(readWorkflowFromClasspath(fileName)) + .instance(Map.of()) + .start() + .join()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No script runner implementation found for language PYTHON"); + } } diff --git a/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged-global.yaml b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged-global.yaml new file mode 100644 index 00000000..534465d4 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged-global.yaml @@ -0,0 +1,17 @@ +document: + dsl: '1.0.2' + namespace: test + name: call-custom-function-cataloged-global + version: '0.1.0' +use: + catalogs: + global: + endpoint: https://github.com/serverlessworkflow/catalog +do: + - log: + call: log:1.0.0@global + with: + message: Hello, world! + level: information + timestamp: true + format: '{TIMESTAMP} [{LEVEL}] ({CONTEXT}): {MESSAGE}' \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml new file mode 100644 index 00000000..43fb634e --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/call-custom-function-cataloged.yaml @@ -0,0 +1,13 @@ +document: + dsl: '1.0.2' + namespace: test + name: call-custom-function-cataloged + version: '0.1.0' +do: + - log: + call: https://raw.githubusercontent.com/serverlessworkflow/catalog/main/functions/log/1.0.0/function.yaml + with: + message: Hello, world! + level: information + timestamp: true + format: '{TIMESTAMP} [{LEVEL}] ({CONTEXT}): {MESSAGE}' \ No newline at end of file diff --git a/impl/validation/pom.xml b/impl/validation/pom.xml index b8b77ed6..32d830b2 100644 --- a/impl/validation/pom.xml +++ b/impl/validation/pom.xml @@ -12,7 +12,7 @@ com.networknt json-schema-validator - + io.serverlessworkflow serverlessworkflow-api