Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Unreleased
* Add support for default versions in Durable Function sub-orchestrations ([#241](https://github.com/microsoft/durabletask-java/pull/241))

## v1.6.0
* Add support for tags when creating new orchestrations ([#231](https://github.com/microsoft/durabletask-java/pull/230))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ public class NewSubOrchestrationInstanceOptions extends TaskOptions {
private String instanceId;
private String version;

/**
* Creates default options for the sub-orchestration. Useful for chaining
* when a RetryPolicy or RetryHandler is not needed.
*/
public NewSubOrchestrationInstanceOptions() {
super((RetryPolicy) null);
}

/**
* Creates options with a retry policy for the sub-orchestration.
* @param retryPolicy The retry policy to use for the sub-orchestration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.StringValue;
import com.microsoft.durabletask.DurableTaskGrpcWorkerVersioningOptions.VersionFailureStrategy;
import com.microsoft.durabletask.DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy;
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService;

import java.time.Duration;
Expand Down Expand Up @@ -125,12 +127,24 @@ public TaskOrchestration create() {
}
});

DurableTaskGrpcWorkerVersioningOptions versioningOptions = null;
if (orchestratorRequest.getPropertiesMap().containsKey("defaultVersion")) {
// If a default version is found, add it to the versioning options so it can be used in the execution flow.
// It is safe to construct this here as we do not provide a client version nor a match/failure strategy that
// would take effect. This is only used in the creation of sub-orchestrations.
versioningOptions = new DurableTaskGrpcWorkerVersioningOptions(
null,
orchestratorRequest.getPropertiesMap().get("defaultVersion").getStringValue(),
VersionMatchStrategy.NONE,
VersionFailureStrategy.REJECT);
}

TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
orchestrationFactories,
new JacksonDataConverter(),
DEFAULT_MAXIMUM_TIMER_INTERVAL,
logger,
null);
versioningOptions);

// TODO: Error handling
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
Expand Down
2 changes: 1 addition & 1 deletion endtoendtests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies {
implementation project(':client')
implementation project(':azurefunctions')

implementation 'com.microsoft.azure.functions:azure-functions-java-library:3.0.0'
implementation 'com.microsoft.azure.functions:azure-functions-java-library:3.1.0'
testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2'
testImplementation 'io.rest-assured:rest-assured:5.3.0'
testImplementation 'io.rest-assured:json-path:5.3.0'
Expand Down
1 change: 1 addition & 0 deletions endtoendtests/e2e-test-setup.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ if ($NoSetup -eq $false) {
docker run --name $ContainerName -p 8080:80 -it --add-host=host.docker.internal:host-gateway -d `
--env 'AzureWebJobsStorage=UseDevelopmentStorage=true;DevelopmentStorageProxyUri=http://host.docker.internal' `
--env 'WEBSITE_HOSTNAME=localhost:8080' `
--env 'FUNCTIONS_EXTENSIONBUNDLE_SOURCE_URI=https://functionscdnstaging.azureedge.net/public' `
$ImageName
}

Expand Down
5 changes: 3 additions & 2 deletions endtoendtests/host.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
},
"extensions": {
"durableTask": {
"hubName": "DFJavaSmokeTest"
"hubName": "DFJavaSmokeTest",
"defaultVersion": "1.0"
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
"version": "[4.26.0, 5.0.0)"
}
}
98 changes: 98 additions & 0 deletions endtoendtests/src/main/java/com/functions/Versioning.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.functions;

import java.util.Optional;

import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.HttpMethod;
import com.microsoft.azure.functions.HttpRequestMessage;
import com.microsoft.azure.functions.HttpResponseMessage;
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.HttpTrigger;
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.NewOrchestrationInstanceOptions;
import com.microsoft.durabletask.NewSubOrchestrationInstanceOptions;
import com.microsoft.durabletask.TaskOrchestrationContext;
import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger;
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;

public class Versioning {
/**
* This HTTP-triggered function starts the orchestration.
*/
@FunctionName("StartVersionedOrchestration")
public HttpResponseMessage startOrchestration(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");

DurableTaskClient client = durableContext.getClient();
String queryVersion = request.getQueryParameters().getOrDefault("version", "");
context.getLogger().info(String.format("Received version '%s' from the query string", queryVersion));
String instanceId = null;
if (queryVersion.equals("default")) {
instanceId = client.scheduleNewOrchestrationInstance("VersionedOrchestrator");
} else {
instanceId = client.scheduleNewOrchestrationInstance("VersionedOrchestrator", new NewOrchestrationInstanceOptions().setVersion(queryVersion));
}
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
return durableContext.createCheckStatusResponse(request, instanceId);
}

/**
* This HTTP-triggered function starts the orchestration.
*/
@FunctionName("StartVersionedSubOrchestration")
public HttpResponseMessage startSubOrchestration(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");

DurableTaskClient client = durableContext.getClient();
String queryVersion = request.getQueryParameters().getOrDefault("version", "");
String instanceId = null;
if (queryVersion.equals("default")) {
instanceId = client.scheduleNewOrchestrationInstance("VersionedSubOrchestrator");
} else {
instanceId = client.scheduleNewOrchestrationInstance("VersionedSubOrchestrator", queryVersion);
}
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
return durableContext.createCheckStatusResponse(request, instanceId);
}

@FunctionName("VersionedOrchestrator")
public String versionedOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
return ctx.callActivity("SayVersion", ctx.getVersion(), String.class).await();
}

@FunctionName("VersionedSubOrchestrator")
public String versionedSubOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String subVersion = ctx.getInput(String.class);
NewSubOrchestrationInstanceOptions options = new NewSubOrchestrationInstanceOptions();
if (subVersion != null) {
options.setVersion(subVersion);
}
return ctx.callSubOrchestrator("SubOrchestrator", null, null, options, String.class).await();
}

@FunctionName("SubOrchestrator")
public String subOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
return ctx.callActivity("SayVersion", ctx.getVersion(), String.class).await();
}

@FunctionName("SayVersion")
public String sayVersion(
@DurableActivityTrigger(name = "version") String version,
final ExecutionContext context) {
version = version.replaceAll("\"", "");
context.getLogger().info(String.format("Called with version: '%s'", version));
return String.format("Version: '%s'", version);
}
}
63 changes: 63 additions & 0 deletions endtoendtests/src/test/java/com/functions/EndToEndTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

@Tag("e2e")
public class EndToEndTests {
Expand Down Expand Up @@ -262,6 +263,68 @@ public void DeserializeFail(String functionName) throws InterruptedException {
assertTrue(errorMessage.contains("Failed to deserialize the JSON text"));
}

@ParameterizedTest
@ValueSource(strings = {
"default",
"",
"0.9",
"1.0"
})
public void VersionedOrchestrationTests(String version) throws InterruptedException {
Response response = post("api/StartVersionedOrchestration?version=" + version);
String statusQueryGetUri = null;
try {

JsonPath jsonPath = response.jsonPath();
// assert orchestration status
statusQueryGetUri = jsonPath.get("statusQueryGetUri");
} catch (Exception e) {
fail("Failed to parse response: " + response.asString());
}
boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(10));
assertTrue(completed);

// assert exception message
Response resp = get(statusQueryGetUri);
String output = resp.jsonPath().get("output");
if (version.equals("default")) {
assertTrue(output.contains("Version: '1.0'"), "Expected default version (1.0), got: " + output);
} else {
assertTrue(output.contains(String.format("Version: '%s'", version)), "Expected version (" + version + "), got: " + output);
}
}

@ParameterizedTest
@ValueSource(strings = {
"default",
"",
"0.9",
"1.0"
})
public void VersionedSubOrchestrationTests(String version) throws InterruptedException {
Response response = post("api/StartVersionedSubOrchestration?version=" + version);
String statusQueryGetUri = null;
try {

JsonPath jsonPath = response.jsonPath();
// assert orchestration status
statusQueryGetUri = jsonPath.get("statusQueryGetUri");
} catch (Exception e) {
fail("Failed to parse response: " + response.asString());
}
boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(10));
assertTrue(completed);

// assert exception message
Response resp = get(statusQueryGetUri);
String output = resp.jsonPath().get("output");
if (version.equals("default")) {
assertTrue(output.contains(String.format("Version: '%s'", "1.0")), "Expected default version (1.0), got: " + output);
} else {
assertTrue(output.contains(String.format("Version: '%s'", version)), "Expected version (" + version + "), got: " + output);
}
}

private boolean pollingCheck(String statusQueryGetUri,
String expectedState,
Set<String> continueStates,
Expand Down
Loading