diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e6ff54d..7e670ac6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## Unreleased +* Add support for default versions in Durable Function sub-orchestrations ([#241](https://github.com/microsoft/durabletask-java/pull/241)) ## v1.6.0 * Add support for tags when creating new orchestrations ([#231](https://github.com/microsoft/durabletask-java/pull/230)) diff --git a/client/src/main/java/com/microsoft/durabletask/NewSubOrchestrationInstanceOptions.java b/client/src/main/java/com/microsoft/durabletask/NewSubOrchestrationInstanceOptions.java index d959fc3b..8f30ac33 100644 --- a/client/src/main/java/com/microsoft/durabletask/NewSubOrchestrationInstanceOptions.java +++ b/client/src/main/java/com/microsoft/durabletask/NewSubOrchestrationInstanceOptions.java @@ -8,6 +8,14 @@ public class NewSubOrchestrationInstanceOptions extends TaskOptions { private String instanceId; private String version; + /** + * Creates default options for the sub-orchestration. Useful for chaining + * when a RetryPolicy or RetryHandler is not needed. + */ + public NewSubOrchestrationInstanceOptions() { + super((RetryPolicy) null); + } + /** * Creates options with a retry policy for the sub-orchestration. * @param retryPolicy The retry policy to use for the sub-orchestration. diff --git a/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java b/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java index 7ae9e2ae..b85f24a7 100644 --- a/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java +++ b/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java @@ -4,6 +4,8 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.StringValue; +import com.microsoft.durabletask.DurableTaskGrpcWorkerVersioningOptions.VersionFailureStrategy; +import com.microsoft.durabletask.DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy; import com.microsoft.durabletask.implementation.protobuf.OrchestratorService; import java.time.Duration; @@ -125,12 +127,24 @@ public TaskOrchestration create() { } }); + DurableTaskGrpcWorkerVersioningOptions versioningOptions = null; + if (orchestratorRequest.getPropertiesMap().containsKey("defaultVersion")) { + // If a default version is found, add it to the versioning options so it can be used in the execution flow. + // It is safe to construct this here as we do not provide a client version nor a match/failure strategy that + // would take effect. This is only used in the creation of sub-orchestrations. + versioningOptions = new DurableTaskGrpcWorkerVersioningOptions( + null, + orchestratorRequest.getPropertiesMap().get("defaultVersion").getStringValue(), + VersionMatchStrategy.NONE, + VersionFailureStrategy.REJECT); + } + TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor( orchestrationFactories, new JacksonDataConverter(), DEFAULT_MAXIMUM_TIMER_INTERVAL, logger, - null); + versioningOptions); // TODO: Error handling TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( diff --git a/endtoendtests/build.gradle b/endtoendtests/build.gradle index 155f725a..658e777f 100644 --- a/endtoendtests/build.gradle +++ b/endtoendtests/build.gradle @@ -19,7 +19,7 @@ dependencies { implementation project(':client') implementation project(':azurefunctions') - implementation 'com.microsoft.azure.functions:azure-functions-java-library:3.0.0' + implementation 'com.microsoft.azure.functions:azure-functions-java-library:3.1.0' testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2' testImplementation 'io.rest-assured:rest-assured:5.3.0' testImplementation 'io.rest-assured:json-path:5.3.0' diff --git a/endtoendtests/e2e-test-setup.ps1 b/endtoendtests/e2e-test-setup.ps1 index 6e57d763..8d122a2d 100644 --- a/endtoendtests/e2e-test-setup.ps1 +++ b/endtoendtests/e2e-test-setup.ps1 @@ -29,6 +29,7 @@ if ($NoSetup -eq $false) { docker run --name $ContainerName -p 8080:80 -it --add-host=host.docker.internal:host-gateway -d ` --env 'AzureWebJobsStorage=UseDevelopmentStorage=true;DevelopmentStorageProxyUri=http://host.docker.internal' ` --env 'WEBSITE_HOSTNAME=localhost:8080' ` + --env 'FUNCTIONS_EXTENSIONBUNDLE_SOURCE_URI=https://functionscdnstaging.azureedge.net/public' ` $ImageName } diff --git a/endtoendtests/host.json b/endtoendtests/host.json index 9d59145e..c3533435 100644 --- a/endtoendtests/host.json +++ b/endtoendtests/host.json @@ -13,11 +13,12 @@ }, "extensions": { "durableTask": { - "hubName": "DFJavaSmokeTest" + "hubName": "DFJavaSmokeTest", + "defaultVersion": "1.0" } }, "extensionBundle": { "id": "Microsoft.Azure.Functions.ExtensionBundle", - "version": "[4.*, 5.0.0)" + "version": "[4.26.0, 5.0.0)" } } \ No newline at end of file diff --git a/endtoendtests/src/main/java/com/functions/Versioning.java b/endtoendtests/src/main/java/com/functions/Versioning.java new file mode 100644 index 00000000..aa72bab1 --- /dev/null +++ b/endtoendtests/src/main/java/com/functions/Versioning.java @@ -0,0 +1,98 @@ +package com.functions; + +import java.util.Optional; + +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.NewOrchestrationInstanceOptions; +import com.microsoft.durabletask.NewSubOrchestrationInstanceOptions; +import com.microsoft.durabletask.TaskOrchestrationContext; +import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; + +public class Versioning { + /** + * This HTTP-triggered function starts the orchestration. + */ + @FunctionName("StartVersionedOrchestration") + public HttpResponseMessage startOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Java HTTP trigger processed a request."); + + DurableTaskClient client = durableContext.getClient(); + String queryVersion = request.getQueryParameters().getOrDefault("version", ""); + context.getLogger().info(String.format("Received version '%s' from the query string", queryVersion)); + String instanceId = null; + if (queryVersion.equals("default")) { + instanceId = client.scheduleNewOrchestrationInstance("VersionedOrchestrator"); + } else { + instanceId = client.scheduleNewOrchestrationInstance("VersionedOrchestrator", new NewOrchestrationInstanceOptions().setVersion(queryVersion)); + } + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + /** + * This HTTP-triggered function starts the orchestration. + */ + @FunctionName("StartVersionedSubOrchestration") + public HttpResponseMessage startSubOrchestration( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Java HTTP trigger processed a request."); + + DurableTaskClient client = durableContext.getClient(); + String queryVersion = request.getQueryParameters().getOrDefault("version", ""); + String instanceId = null; + if (queryVersion.equals("default")) { + instanceId = client.scheduleNewOrchestrationInstance("VersionedSubOrchestrator"); + } else { + instanceId = client.scheduleNewOrchestrationInstance("VersionedSubOrchestrator", queryVersion); + } + context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + @FunctionName("VersionedOrchestrator") + public String versionedOrchestrator( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + return ctx.callActivity("SayVersion", ctx.getVersion(), String.class).await(); + } + + @FunctionName("VersionedSubOrchestrator") + public String versionedSubOrchestrator( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + String subVersion = ctx.getInput(String.class); + NewSubOrchestrationInstanceOptions options = new NewSubOrchestrationInstanceOptions(); + if (subVersion != null) { + options.setVersion(subVersion); + } + return ctx.callSubOrchestrator("SubOrchestrator", null, null, options, String.class).await(); + } + + @FunctionName("SubOrchestrator") + public String subOrchestrator( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + return ctx.callActivity("SayVersion", ctx.getVersion(), String.class).await(); + } + + @FunctionName("SayVersion") + public String sayVersion( + @DurableActivityTrigger(name = "version") String version, + final ExecutionContext context) { + version = version.replaceAll("\"", ""); + context.getLogger().info(String.format("Called with version: '%s'", version)); + return String.format("Version: '%s'", version); + } +} diff --git a/endtoendtests/src/test/java/com/functions/EndToEndTests.java b/endtoendtests/src/test/java/com/functions/EndToEndTests.java index 65b9fb70..c2d0be02 100644 --- a/endtoendtests/src/test/java/com/functions/EndToEndTests.java +++ b/endtoendtests/src/test/java/com/functions/EndToEndTests.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; @Tag("e2e") public class EndToEndTests { @@ -262,6 +263,68 @@ public void DeserializeFail(String functionName) throws InterruptedException { assertTrue(errorMessage.contains("Failed to deserialize the JSON text")); } + @ParameterizedTest + @ValueSource(strings = { + "default", + "", + "0.9", + "1.0" + }) + public void VersionedOrchestrationTests(String version) throws InterruptedException { + Response response = post("api/StartVersionedOrchestration?version=" + version); + String statusQueryGetUri = null; + try { + + JsonPath jsonPath = response.jsonPath(); + // assert orchestration status + statusQueryGetUri = jsonPath.get("statusQueryGetUri"); + } catch (Exception e) { + fail("Failed to parse response: " + response.asString()); + } + boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(10)); + assertTrue(completed); + + // assert exception message + Response resp = get(statusQueryGetUri); + String output = resp.jsonPath().get("output"); + if (version.equals("default")) { + assertTrue(output.contains("Version: '1.0'"), "Expected default version (1.0), got: " + output); + } else { + assertTrue(output.contains(String.format("Version: '%s'", version)), "Expected version (" + version + "), got: " + output); + } + } + + @ParameterizedTest + @ValueSource(strings = { + "default", + "", + "0.9", + "1.0" + }) + public void VersionedSubOrchestrationTests(String version) throws InterruptedException { + Response response = post("api/StartVersionedSubOrchestration?version=" + version); + String statusQueryGetUri = null; + try { + + JsonPath jsonPath = response.jsonPath(); + // assert orchestration status + statusQueryGetUri = jsonPath.get("statusQueryGetUri"); + } catch (Exception e) { + fail("Failed to parse response: " + response.asString()); + } + boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(10)); + assertTrue(completed); + + // assert exception message + Response resp = get(statusQueryGetUri); + String output = resp.jsonPath().get("output"); + if (version.equals("default")) { + assertTrue(output.contains(String.format("Version: '%s'", "1.0")), "Expected default version (1.0), got: " + output); + } else { + assertTrue(output.contains(String.format("Version: '%s'", version)), "Expected version (" + version + "), got: " + output); + } + } + private boolean pollingCheck(String statusQueryGetUri, String expectedState, Set continueStates,