Skip to content

Commit aa5411f

Browse files
committed
Support sub-orchestration default version
Sub orchestrations run through Durable Functions (as opposed to isolated with DTS) have a slightly different path for sub orchestration versioning. This change loads the default version provided by the properties field to use as the default sub orchestration version. Signed-off-by: Hal Spang <halspang@microsoft.com>
1 parent f457df9 commit aa5411f

File tree

6 files changed

+171
-2
lines changed

6 files changed

+171
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
## Unreleased
2+
* Add support for default versions in Durable Function sub-orchestrations ([#241](https://github.com/microsoft/durabletask-java/pull/241))
23

34
## v1.6.0
45
* Add support for tags when creating new orchestrations ([#231](https://github.com/microsoft/durabletask-java/pull/230))

client/src/main/java/com/microsoft/durabletask/NewSubOrchestrationInstanceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,14 @@ public class NewSubOrchestrationInstanceOptions extends TaskOptions {
88
private String instanceId;
99
private String version;
1010

11+
/**
12+
* Creates default options for the sub-orchestration. Useful for chaining
13+
* when a RetryPolicy or RetryHandler is not needed.
14+
*/
15+
public NewSubOrchestrationInstanceOptions() {
16+
super((RetryPolicy) null);
17+
}
18+
1119
/**
1220
* Creates options with a retry policy for the sub-orchestration.
1321
* @param retryPolicy The retry policy to use for the sub-orchestration.

client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import com.google.protobuf.InvalidProtocolBufferException;
66
import com.google.protobuf.StringValue;
7+
import com.microsoft.durabletask.DurableTaskGrpcWorkerVersioningOptions.VersionFailureStrategy;
8+
import com.microsoft.durabletask.DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy;
79
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService;
810

911
import java.time.Duration;
@@ -125,12 +127,24 @@ public TaskOrchestration create() {
125127
}
126128
});
127129

130+
DurableTaskGrpcWorkerVersioningOptions versioningOptions = null;
131+
if (orchestratorRequest.getPropertiesMap().containsKey("defaultVersion")) {
132+
// If a default version is found, add it to the versioning options so it can be used in the execution flow.
133+
// It is safe to construct this here as we do not provide a client version nor a match/failure strategy that
134+
// would take effect. This is only used in the creation of sub-orchestrations.
135+
versioningOptions = new DurableTaskGrpcWorkerVersioningOptions(
136+
null,
137+
orchestratorRequest.getPropertiesMap().get("defaultVersion").getStringValue(),
138+
VersionMatchStrategy.NONE,
139+
VersionFailureStrategy.REJECT);
140+
}
141+
128142
TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
129143
orchestrationFactories,
130144
new JacksonDataConverter(),
131145
DEFAULT_MAXIMUM_TIMER_INTERVAL,
132146
logger,
133-
null);
147+
versioningOptions);
134148

135149
// TODO: Error handling
136150
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(

endtoendtests/host.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
},
1414
"extensions": {
1515
"durableTask": {
16-
"hubName": "DFJavaSmokeTest"
16+
"hubName": "DFJavaSmokeTest",
17+
"defaultVersion": "1.0"
1718
}
1819
},
1920
"extensionBundle": {
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package com.functions;
2+
3+
import java.util.Optional;
4+
5+
import com.microsoft.azure.functions.ExecutionContext;
6+
import com.microsoft.azure.functions.HttpMethod;
7+
import com.microsoft.azure.functions.HttpRequestMessage;
8+
import com.microsoft.azure.functions.HttpResponseMessage;
9+
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
10+
import com.microsoft.azure.functions.annotation.FunctionName;
11+
import com.microsoft.azure.functions.annotation.HttpTrigger;
12+
import com.microsoft.durabletask.DurableTaskClient;
13+
import com.microsoft.durabletask.NewOrchestrationInstanceOptions;
14+
import com.microsoft.durabletask.NewSubOrchestrationInstanceOptions;
15+
import com.microsoft.durabletask.TaskOrchestrationContext;
16+
import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger;
17+
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
18+
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
19+
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;
20+
21+
public class Versioning {
22+
/**
23+
* This HTTP-triggered function starts the orchestration.
24+
*/
25+
@FunctionName("StartVersionedOrchestration")
26+
public HttpResponseMessage startOrchestration(
27+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
28+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
29+
final ExecutionContext context) {
30+
context.getLogger().info("Java HTTP trigger processed a request.");
31+
32+
DurableTaskClient client = durableContext.getClient();
33+
String queryVersion = request.getQueryParameters().get("version");
34+
String instanceId = null;
35+
if (queryVersion == "default") {
36+
instanceId = client.scheduleNewOrchestrationInstance("VersionedOrchestrator");
37+
} else {
38+
instanceId = client.scheduleNewOrchestrationInstance("VersionedOrchestrator", new NewOrchestrationInstanceOptions().setVersion(queryVersion));
39+
}
40+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
41+
return durableContext.createCheckStatusResponse(request, instanceId);
42+
}
43+
44+
/**
45+
* This HTTP-triggered function starts the orchestration.
46+
*/
47+
@FunctionName("StartVersionedSubOrchestration")
48+
public HttpResponseMessage startSubOrchestration(
49+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
50+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
51+
final ExecutionContext context) {
52+
context.getLogger().info("Java HTTP trigger processed a request.");
53+
54+
DurableTaskClient client = durableContext.getClient();
55+
String queryVersion = request.getQueryParameters().get("version");
56+
String instanceId = null;
57+
if (queryVersion == "default") {
58+
instanceId = client.scheduleNewOrchestrationInstance("VersionedSubOrchestrator");
59+
} else {
60+
instanceId = client.scheduleNewOrchestrationInstance("VersionedSubOrchestrator", queryVersion);
61+
}
62+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
63+
return durableContext.createCheckStatusResponse(request, instanceId);
64+
}
65+
66+
@FunctionName("VersionedOrchestrator")
67+
public String versionedOrchestrator(
68+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
69+
return ctx.callActivity("SayVersion", ctx.getVersion(), String.class).await();
70+
}
71+
72+
@FunctionName("VersionedSubOrchestrator")
73+
public String versionedSubOrchestrator(
74+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
75+
String subVersion = ctx.getInput(String.class);
76+
NewSubOrchestrationInstanceOptions options = new NewSubOrchestrationInstanceOptions();
77+
if (subVersion != null) {
78+
options.setVersion(subVersion);
79+
}
80+
return ctx.callSubOrchestrator("SayVersion", null, null, options, String.class).await();
81+
}
82+
83+
public String subOrchestrator(
84+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
85+
return ctx.callActivity("SayVersion", ctx.getVersion(), String.class).await();
86+
}
87+
88+
@FunctionName("SayVersion")
89+
public String sayVersion(
90+
@DurableActivityTrigger(name = "version") String version,
91+
final ExecutionContext context) {
92+
context.getLogger().info("Called with version: " + version);
93+
return "Version: " + version;
94+
}
95+
}

endtoendtests/src/test/java/com/functions/EndToEndTests.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,56 @@ public void DeserializeFail(String functionName) throws InterruptedException {
262262
assertTrue(errorMessage.contains("Failed to deserialize the JSON text"));
263263
}
264264

265+
@ParameterizedTest
266+
@ValueSource(strings = {
267+
"default",
268+
"",
269+
"0.9",
270+
"1.0"
271+
})
272+
public void VersionedOrchestrationTests(String version) throws InterruptedException {
273+
Response response = post("api/StartVersionedOrchestration?version=" + version);
274+
JsonPath jsonPath = response.jsonPath();
275+
// assert orchestration status
276+
String statusQueryGetUri = jsonPath.get("statusQueryGetUri");
277+
boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(10));
278+
assertTrue(completed);
279+
280+
// assert exception message
281+
Response resp = get(statusQueryGetUri);
282+
String output = resp.jsonPath().get("output");
283+
if (version.equals("default")) {
284+
assertTrue(output.contains("Version: 1.0"), "Expected default version (1.0), got: " + output);
285+
} else {
286+
assertTrue(output.contains("Version: " + version), "Expected version (" + version + "), got: " + output);
287+
}
288+
}
289+
290+
@ParameterizedTest
291+
@ValueSource(strings = {
292+
"default",
293+
"",
294+
"0.9",
295+
"1.0"
296+
})
297+
public void VersionedSubOrchestrationTests(String version) throws InterruptedException {
298+
Response response = post("api/StartVersionedSubOrchestration?version=" + version);
299+
JsonPath jsonPath = response.jsonPath();
300+
// assert orchestration status
301+
String statusQueryGetUri = jsonPath.get("statusQueryGetUri");
302+
boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(10));
303+
assertTrue(completed);
304+
305+
// assert exception message
306+
Response resp = get(statusQueryGetUri);
307+
String output = resp.jsonPath().get("output");
308+
if (version.equals("default")) {
309+
assertTrue(output.contains("Version: 1.0"), "Expected default version (1.0), got: " + output);
310+
} else {
311+
assertTrue(output.contains("Version: " + version), "Expected version (" + version + "), got: " + output);
312+
}
313+
}
314+
265315
private boolean pollingCheck(String statusQueryGetUri,
266316
String expectedState,
267317
Set<String> continueStates,

0 commit comments

Comments
 (0)