Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,24 @@
*/
package io.serverlessworkflow.impl.executors.openapi;

import static io.serverlessworkflow.impl.executors.http.HttpExecutor.getTargetSupplier;

import io.serverlessworkflow.api.types.CallHTTP;
import io.serverlessworkflow.api.types.CallOpenAPI;
import io.serverlessworkflow.api.types.OpenAPIArguments;
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowException;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.executors.http.HttpExecutor;
import io.serverlessworkflow.impl.executors.http.TargetSupplier;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class OpenAPIExecutor implements CallableTask<CallOpenAPI> {

private CallOpenAPI task;
private Workflow workflow;
private WorkflowDefinition definition;
private WorkflowApplication application;
private TargetSupplier targetSupplier;
private ResourceLoader resourceLoader;
private OperationDefinitionSupplier operationDefinitionSupplier;
private OpenAPIArguments with;

@Override
public boolean accept(Class<? extends TaskBase> clazz) {
Expand All @@ -55,43 +45,39 @@ public CompletableFuture<WorkflowModel> apply(

OperationDefinition operation =
operationDefinitionSupplier.get(workflowContext, taskContext, input);
HttpCallAdapter httpCallAdapter =
getHttpCallAdapter(operation, workflowContext, taskContext, input);

return CompletableFuture.supplyAsync(
() -> {
HttpCallAdapter httpCallAdapter =
getHttpCallAdapter(operation, workflowContext, taskContext, input);

WorkflowException workflowException = null;

for (var server : operation.getServers()) {
CallHTTP callHTTP = httpCallAdapter.server(server).build();
HttpExecutor executor = new HttpExecutor();
executor.init(callHTTP, definition);
Iterator<String> iter = operation.getServers().iterator();
if (!iter.hasNext()) {
throw new IllegalArgumentException(
"List of servers is empty for operation " + operation.getOperation());
}
CompletableFuture<WorkflowModel> future =
executeServer(iter.next(), httpCallAdapter, workflowContext, taskContext, input);
while (iter.hasNext()) {
future.exceptionallyCompose(
i -> executeServer(iter.next(), httpCallAdapter, workflowContext, taskContext, input));
}
return future;
}

try {
return executor.apply(workflowContext, taskContext, input).get();
} catch (WorkflowException e) {
workflowException = e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
throw workflowException; // if we there, we failed all servers and ex is not null
},
workflowContext.definition().application().executorService());
private CompletableFuture<WorkflowModel> executeServer(
String server,
HttpCallAdapter callAdapter,
WorkflowContext workflowContext,
TaskContext taskContext,
WorkflowModel input) {
CallHTTP callHTTP = callAdapter.server(server).build();
HttpExecutor executor = new HttpExecutor();
executor.init(callHTTP, workflowContext.definition());
return executor.apply(workflowContext, taskContext, input);
}

@Override
public void init(CallOpenAPI task, WorkflowDefinition definition) {
this.task = task;
this.definition = definition;
this.workflow = definition.workflow();
this.application = definition.application();
this.resourceLoader = definition.resourceLoader();
this.operationDefinitionSupplier = new OperationDefinitionSupplier(application, task);
this.targetSupplier =
getTargetSupplier(
task.getWith().getDocument().getEndpoint(), application.expressionFactory());
with = task.getWith();
operationDefinitionSupplier = new OperationDefinitionSupplier(definition.application(), with);
}

private HttpCallAdapter getHttpCallAdapter(
Expand All @@ -102,11 +88,11 @@ private HttpCallAdapter getHttpCallAdapter(
OperationPathResolver pathResolver =
new OperationPathResolver(
operation.getPath(),
application,
task.getWith().getParameters().getAdditionalProperties());
workflowContext.definition().application(),
with.getParameters().getAdditionalProperties());

return new HttpCallAdapter()
.auth(task.getWith().getAuthentication())
.auth(with.getAuthentication())
.body(operation.getBody())
.contentType(operation.getContentType())
.headers(
Expand All @@ -118,8 +104,8 @@ private HttpCallAdapter getHttpCallAdapter(
operation.getParameters().stream()
.filter(p -> "query".equals(p.getIn()))
.collect(Collectors.toUnmodifiableSet()))
.redirect(task.getWith().isRedirect())
.redirect(with.isRedirect())
.target(pathResolver.resolve(workflowContext, taskContext, input))
.workflowParams(task.getWith().getParameters().getAdditionalProperties());
.workflowParams(with.getParameters().getAdditionalProperties());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import static io.serverlessworkflow.impl.executors.http.HttpExecutor.getTargetSupplier;

import io.serverlessworkflow.api.types.CallOpenAPI;
import io.serverlessworkflow.api.types.OpenAPIArguments;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
Expand All @@ -28,20 +28,19 @@
class OperationDefinitionSupplier {

private final WorkflowApplication application;
private final CallOpenAPI task;
private final OpenAPIArguments with;

OperationDefinitionSupplier(WorkflowApplication application, CallOpenAPI task) {
this.task = task;
OperationDefinitionSupplier(WorkflowApplication application, OpenAPIArguments with) {
this.with = with;
this.application = application;
}

OperationDefinition get(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
TargetSupplier targetSupplier =
getTargetSupplier(
task.getWith().getDocument().getEndpoint(), application.expressionFactory());
getTargetSupplier(with.getDocument().getEndpoint(), application.expressionFactory());

String operationId = task.getWith().getOperationId();
String operationId = with.getOperationId();
WebTarget webTarget = targetSupplier.apply(workflowContext, taskContext, input);
OpenAPIProcessor processor = new OpenAPIProcessor(operationId, webTarget.getUri());
return processor.parse();
Expand Down