Skip to content

Commit ae23323

Browse files
committed
[Fix #934] Implementing function catalog (alt)
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 3ab612d commit ae23323

File tree

17 files changed

+308
-138
lines changed

17 files changed

+308
-138
lines changed
Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.serverlessworkflow.impl.catalogs;
16+
package io.serverlessworkflow.impl;
1717

18-
import io.serverlessworkflow.impl.NamedObject;
18+
import io.serverlessworkflow.api.types.Task;
19+
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
20+
import java.util.function.Function;
1921

20-
public interface NamedCatalogNavigator extends CatalogNavigator, NamedObject {}
22+
public interface FunctionReader extends Function<ExternalResourceHandler, Task> {}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import io.serverlessworkflow.api.types.Workflow;
2222
import io.serverlessworkflow.impl.additional.NamedWorkflowAdditionalObject;
2323
import io.serverlessworkflow.impl.additional.WorkflowAdditionalObject;
24-
import io.serverlessworkflow.impl.catalogs.CatalogNavigator;
25-
import io.serverlessworkflow.impl.catalogs.NamedCatalogNavigator;
2624
import io.serverlessworkflow.impl.config.ConfigManager;
2725
import io.serverlessworkflow.impl.config.ConfigSecretManager;
2826
import io.serverlessworkflow.impl.config.SecretManager;
@@ -78,7 +76,7 @@ public class WorkflowApplication implements AutoCloseable {
7876
private final SecretManager secretManager;
7977
private final SchedulerListener schedulerListener;
8078
private final Optional<URITemplateResolver> templateResolver;
81-
private final Map<String, CatalogNavigator> catalogRegistry;
79+
private final Optional<FunctionReader> functionReader;
8280

8381
private WorkflowApplication(Builder builder) {
8482
this.taskFactory = builder.taskFactory;
@@ -101,7 +99,7 @@ private WorkflowApplication(Builder builder) {
10199
this.configManager = builder.configManager;
102100
this.secretManager = builder.secretManager;
103101
this.templateResolver = builder.templateResolver;
104-
this.catalogRegistry = builder.catalogRegistry;
102+
this.functionReader = builder.functionReader;
105103
}
106104

107105
public TaskExecutorFactory taskFactory() {
@@ -182,13 +180,11 @@ public SchemaValidator getValidator(SchemaInline inline) {
182180
private ConfigManager configManager;
183181
private SchedulerListener schedulerListener;
184182
private Optional<URITemplateResolver> templateResolver;
185-
private Map<String, CatalogNavigator> catalogRegistry = new HashMap<>();
183+
private Optional<FunctionReader> functionReader;
186184

187185
private Builder() {
188186
ServiceLoader.load(NamedWorkflowAdditionalObject.class)
189187
.forEach(a -> additionalObjects.put(a.name(), a));
190-
ServiceLoader.load(NamedCatalogNavigator.class)
191-
.forEach(a -> catalogRegistry.put(a.name(), a));
192188
}
193189

194190
public Builder withListener(WorkflowExecutionListener listener) {
@@ -277,11 +273,6 @@ public Builder withModelFactory(WorkflowModelFactory modelFactory) {
277273
return this;
278274
}
279275

280-
public Builder withCatalogRegistry(String name, CatalogNavigator catalog) {
281-
catalogRegistry.put(name, catalog);
282-
return this;
283-
}
284-
285276
public WorkflowApplication build() {
286277
if (modelFactory == null) {
287278
modelFactory =
@@ -340,8 +331,8 @@ public WorkflowApplication build() {
340331
.findFirst()
341332
.orElseGet(() -> new ConfigSecretManager(configManager));
342333
}
343-
344334
templateResolver = ServiceLoader.load(URITemplateResolver.class).findFirst();
335+
functionReader = ServiceLoader.load(FunctionReader.class).findFirst();
345336
return new WorkflowApplication(this);
346337
}
347338
}
@@ -419,8 +410,8 @@ public Optional<URITemplateResolver> templateResolver() {
419410
return templateResolver;
420411
}
421412

422-
public Map<String, CatalogNavigator> catalogRegistry() {
423-
return catalogRegistry;
413+
public Optional<FunctionReader> functionReader() {
414+
return functionReader;
424415
}
425416

426417
public <T> Optional<T> additionalObject(

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,8 @@ public static Optional<SchemaValidator> getSchemaValidator(
5151
return Optional.of(validatorFactory.getValidator(schema.getSchemaInline()));
5252
} else if (schema.getSchemaExternal() != null) {
5353
return Optional.of(
54-
resourceLoader.load(
55-
schema.getSchemaExternal().getResource(),
56-
validatorFactory::getValidator,
57-
null,
58-
null,
59-
null));
54+
resourceLoader.loadStatic(
55+
schema.getSchemaExternal().getResource(), validatorFactory::getValidator));
6056
}
6157
}
6258
return Optional.empty();

impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallFunctionExecutor.java

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,21 @@
2727
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2828
import io.serverlessworkflow.impl.WorkflowUtils;
2929
import io.serverlessworkflow.impl.WorkflowValueResolver;
30-
import io.serverlessworkflow.impl.catalogs.CatalogNavigator;
30+
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
31+
import io.serverlessworkflow.impl.resources.ResourceLoader;
32+
import java.net.URI;
3133
import java.util.Map;
3234
import java.util.Optional;
33-
import org.slf4j.Logger;
34-
import org.slf4j.LoggerFactory;
3535

3636
public class CallFunctionExecutor implements CallableTaskBuilder<CallFunction> {
3737

38-
private static final Logger logger = LoggerFactory.getLogger(CallFunctionExecutor.class);
3938
private TaskExecutorBuilder<? extends TaskBase> executorBuilder;
4039
private WorkflowValueResolver<Map<String, Object>> args;
4140

4241
@Override
4342
public void init(
4443
CallFunction task, WorkflowDefinition definition, WorkflowMutablePosition position) {
4544
String functionName = task.getCall();
46-
4745
Use use = definition.workflow().getUse();
4846
Task function = null;
4947
if (use != null) {
@@ -58,34 +56,22 @@ public void init(
5856
UseCatalogs catalogs = use.getCatalogs();
5957
if (catalogs != null) {
6058
Catalog catalog = catalogs.getAdditionalProperties().get(catalogName);
61-
CatalogNavigator catalogNavigator =
62-
definition.application().catalogRegistry().get(catalogName);
63-
if (catalogNavigator == null) {
64-
throw new IllegalArgumentException(
65-
"There is not catalog registered for name " + catalogName);
66-
}
59+
ResourceLoader loader = definition.resourceLoader();
6760
function =
68-
catalogNavigator
69-
.resolveTask(catalog, functionName.substring(0, indexOf))
70-
.orElseThrow(
71-
() ->
72-
new IllegalArgumentException(
73-
"Cannot find function "
74-
+ functionName
75-
+ " in catalog "
76-
+ catalogName));
61+
definition
62+
.resourceLoader()
63+
.loadURI(
64+
WorkflowUtils.concatURI(
65+
loader.uri(catalog.getEndpoint()),
66+
pathFromFunctionName(functionName.substring(0, indexOf))),
67+
h -> from(definition, h));
7768
}
7869
}
7970
}
8071
}
8172
if (function == null) {
82-
// asume name is direct pointer to catalog file
8373
function =
84-
definition.application().catalogRegistry().values().stream()
85-
.flatMap(c -> c.resolveTask(functionName).stream())
86-
.findFirst()
87-
.orElseThrow(
88-
() -> new IllegalArgumentException("Cannot find function name " + functionName));
74+
definition.resourceLoader().loadURI(URI.create(functionName), h -> from(definition, h));
8975
}
9076
executorBuilder =
9177
definition.application().taskFactory().getTaskExecutor(position, function, definition);
@@ -97,6 +83,32 @@ public void init(
9783
: (w, t, m) -> Map.of();
9884
}
9985

86+
private String pathFromFunctionName(String functionName) {
87+
int sep = functionName.indexOf(":");
88+
if (sep < 0) {
89+
throw new IllegalArgumentException(
90+
"Invalid function name "
91+
+ functionName
92+
+ ". It has to be of the format <function name>:<function version>");
93+
}
94+
StringBuilder sb = new StringBuilder(functionName);
95+
sb.setCharAt(sep, '/');
96+
sb.insert(0, "main/functions/");
97+
sb.append("/function.yaml");
98+
return sb.toString();
99+
}
100+
101+
private Task from(WorkflowDefinition definition, ExternalResourceHandler handler) {
102+
return definition
103+
.application()
104+
.functionReader()
105+
.map(v -> v.apply(handler))
106+
.orElseThrow(
107+
() ->
108+
new IllegalStateException(
109+
"No converter from external resource to function found. Make sure a dependency that includes an implementation of FunctionReader is included"));
110+
}
111+
100112
@Override
101113
public boolean accept(Class<? extends TaskBase> clazz) {
102114
return clazz.equals(CallFunction.class);

impl/core/src/main/java/io/serverlessworkflow/impl/resources/DefaultResourceLoader.java

Lines changed: 16 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,7 @@
1515
*/
1616
package io.serverlessworkflow.impl.resources;
1717

18-
import static io.serverlessworkflow.impl.WorkflowUtils.getURISupplier;
19-
20-
import io.serverlessworkflow.api.types.Endpoint;
21-
import io.serverlessworkflow.api.types.EndpointUri;
22-
import io.serverlessworkflow.api.types.ExternalResource;
23-
import io.serverlessworkflow.impl.TaskContext;
2418
import io.serverlessworkflow.impl.WorkflowApplication;
25-
import io.serverlessworkflow.impl.WorkflowContext;
26-
import io.serverlessworkflow.impl.WorkflowModel;
27-
import io.serverlessworkflow.impl.WorkflowValueResolver;
28-
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
2919
import java.net.MalformedURLException;
3020
import java.net.URI;
3121
import java.nio.file.Path;
@@ -35,18 +25,31 @@
3525
import java.util.concurrent.ConcurrentHashMap;
3626
import java.util.function.Function;
3727

38-
public class DefaultResourceLoader implements ResourceLoader {
28+
public class DefaultResourceLoader extends ResourceLoader {
3929

4030
private final Optional<Path> workflowPath;
41-
private final WorkflowApplication application;
4231

4332
private Map<ExternalResourceHandler, CachedResource> resourceCache = new ConcurrentHashMap<>();
4433

4534
protected DefaultResourceLoader(WorkflowApplication application, Path workflowPath) {
46-
this.application = application;
35+
super(application);
4736
this.workflowPath = Optional.ofNullable(workflowPath);
4837
}
4938

39+
@Override
40+
public <T> T loadURI(URI uri, Function<ExternalResourceHandler, T> function) {
41+
ExternalResourceHandler resourceHandler = buildFromURI(uri);
42+
return (T)
43+
resourceCache
44+
.compute(
45+
resourceHandler,
46+
(k, v) ->
47+
v == null || k.shouldReload(v.lastReload())
48+
? new CachedResource(Instant.now(), function.apply(k))
49+
: v)
50+
.content();
51+
}
52+
5053
private ExternalResourceHandler fileResource(String pathStr) {
5154
Path path = Path.of(pathStr);
5255
if (path.isAbsolute()) {
@@ -74,66 +77,6 @@ private ExternalResourceHandler buildFromURI(URI uri) {
7477
}
7578

7679
@Override
77-
public <T> T load(
78-
ExternalResource resource,
79-
Function<ExternalResourceHandler, T> function,
80-
WorkflowContext workflowContext,
81-
TaskContext taskContext,
82-
WorkflowModel model) {
83-
ExternalResourceHandler resourceHandler =
84-
buildFromURI(
85-
uriSupplier(resource.getEndpoint())
86-
.apply(
87-
workflowContext,
88-
taskContext,
89-
model == null ? application.modelFactory().fromNull() : model));
90-
return (T)
91-
resourceCache
92-
.compute(
93-
resourceHandler,
94-
(k, v) ->
95-
v == null || k.shouldReload(v.lastReload())
96-
? new CachedResource(Instant.now(), function.apply(k))
97-
: v)
98-
.content();
99-
}
100-
101-
@Override
102-
public WorkflowValueResolver<URI> uriSupplier(Endpoint endpoint) {
103-
if (endpoint.getEndpointConfiguration() != null) {
104-
EndpointUri uri = endpoint.getEndpointConfiguration().getUri();
105-
if (uri.getLiteralEndpointURI() != null) {
106-
return getURISupplier(application, uri.getLiteralEndpointURI());
107-
} else if (uri.getExpressionEndpointURI() != null) {
108-
return new ExpressionURISupplier(
109-
application
110-
.expressionFactory()
111-
.resolveString(ExpressionDescriptor.from(uri.getExpressionEndpointURI())));
112-
}
113-
} else if (endpoint.getRuntimeExpression() != null) {
114-
return new ExpressionURISupplier(
115-
application
116-
.expressionFactory()
117-
.resolveString(ExpressionDescriptor.from(endpoint.getRuntimeExpression())));
118-
} else if (endpoint.getUriTemplate() != null) {
119-
return getURISupplier(application, endpoint.getUriTemplate());
120-
}
121-
throw new IllegalArgumentException("Invalid endpoint definition " + endpoint);
122-
}
123-
124-
private class ExpressionURISupplier implements WorkflowValueResolver<URI> {
125-
private WorkflowValueResolver<String> expr;
126-
127-
public ExpressionURISupplier(WorkflowValueResolver<String> expr) {
128-
this.expr = expr;
129-
}
130-
131-
@Override
132-
public URI apply(WorkflowContext workflow, TaskContext task, WorkflowModel node) {
133-
return URI.create(expr.apply(workflow, task, node));
134-
}
135-
}
136-
13780
public void close() {
13881
resourceCache.clear();
13982
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.resources;
17+
18+
import java.io.UncheckedIOException;
19+
import java.net.MalformedURLException;
20+
import java.net.URL;
21+
22+
public class GitHubHelper {
23+
24+
private GitHubHelper() {}
25+
26+
private static final String BLOB = "blob/";
27+
28+
public static URL handleURL(URL url) {
29+
if (url.getHost().equals("github.com")) {
30+
try {
31+
String path = url.getPath();
32+
if (path.startsWith(BLOB)) {
33+
path = path.substring(BLOB.length());
34+
}
35+
return new URL(url.getProtocol(), "raw.githubusercontent.com", url.getPort(), path);
36+
} catch (MalformedURLException e) {
37+
throw new UncheckedIOException(e);
38+
}
39+
} else {
40+
return url;
41+
}
42+
}
43+
}

impl/core/src/main/java/io/serverlessworkflow/impl/resources/HttpResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class HttpResource implements ExternalResourceHandler {
2828
private URL url;
2929

3030
public HttpResource(URL url) {
31-
this.url = url;
31+
this.url = GitHubHelper.handleURL(url);
3232
}
3333

3434
@Override

0 commit comments

Comments
 (0)