Skip to content

Commit

Permalink
fix(workflow): Fix case where activity or timer would try to be cance…
Browse files Browse the repository at this point in the history
…lled without being scheduled (#621)
  • Loading branch information
bergundy committed Apr 30, 2022
1 parent 83bb5f3 commit fd77ced
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 42 deletions.
10 changes: 1 addition & 9 deletions packages/internal-workflow-common/src/time.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ export function msNumberToTs(millis: number): Timestamp {
return { seconds: Long.fromNumber(seconds), nanos };
}

export function falsyMsToTs(str: string | number | undefined): Timestamp | undefined {
return str ? msToTs(str) : undefined;
}

export function msToTs(str: string | number): Timestamp {
if (typeof str === 'number') {
return msNumberToTs(str);
Expand All @@ -56,11 +52,7 @@ export function msToTs(str: string | number): Timestamp {
}

export function msOptionalToTs(str: string | number | undefined): Timestamp | undefined {
if (str === undefined) return undefined;
if (typeof str === 'number') {
return msNumberToTs(str);
}
return msNumberToTs(ms(str));
return str ? msToTs(str) : undefined;
}

export function msOptionalToNumber(val: string | number | undefined): number | undefined {
Expand Down
8 changes: 4 additions & 4 deletions packages/internal-workflow-common/src/workflow-options.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { coresdk, google } from '@temporalio/proto';
import { Workflow } from './interfaces';
import { RetryPolicy } from './retry-policy';
import { falsyMsToTs } from './time';
import { msOptionalToTs } from './time';
import { checkExtends, Replace } from './type-helpers';

// Avoid importing the proto implementation to reduce workflow bundle size
Expand Down Expand Up @@ -122,8 +122,8 @@ export function compileWorkflowOptions<T extends WorkflowDurationOptions>(

return {
...rest,
workflowExecutionTimeout: falsyMsToTs(workflowExecutionTimeout),
workflowRunTimeout: falsyMsToTs(workflowRunTimeout),
workflowTaskTimeout: falsyMsToTs(workflowTaskTimeout),
workflowExecutionTimeout: msOptionalToTs(workflowExecutionTimeout),
workflowRunTimeout: msOptionalToTs(workflowRunTimeout),
workflowTaskTimeout: msOptionalToTs(workflowTaskTimeout),
};
}
10 changes: 10 additions & 0 deletions packages/test/src/integration-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -993,4 +993,14 @@ export function runIntegrationTests(codec?: PayloadCodec): void {
await client.getHandle(workflowId).terminate();
}
});

test('Runtime does not issue cancellations for activities and timers that throw during validation', async (t) => {
const { client } = t.context;
const workflowId = uuid4();
await client.execute(workflows.cancelScopeOnFailedValidation, {
taskQueue: 'test',
workflowId,
});
t.pass();
});
}
14 changes: 7 additions & 7 deletions packages/test/src/test-local-activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ if (RUN_INTEGRATION_TESTS) {
namespace: 'default',
execution: { workflowId: handle.workflowId },
});
// WorkflowExecutionStarted
// WorkflowTaskScheduled
// WorkflowTaskStarted
// MarkerRecorded x 3
// WorkflowTaskCompleted
// WorkflowExecutionCompleted
t.is(history?.events?.length, 8);
if (history?.events == null) {
throw new Error('Expected non null events');
}
// Last 3 events before completing the workflow should be MarkerRecorded
t.truthy(history.events[history.events.length - 2].markerRecordedEventAttributes);
t.truthy(history.events[history.events.length - 3].markerRecordedEventAttributes);
t.truthy(history.events[history.events.length - 4].markerRecordedEventAttributes);
});
});

Expand Down
60 changes: 60 additions & 0 deletions packages/test/src/workflows/cancel-scope-on-failed-validation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Workflow used in integration-test:
* 'Runtime does not issue cancellations for activities and timers that throw during validation'
* @module
*/
import * as wf from '@temporalio/workflow';

const { someActivity } = wf.proxyActivities({
startToCloseTimeout: '1 minute',
});

const { someLocalActivity } = wf.proxyLocalActivities({
startToCloseTimeout: '1 minute',
});

/**
* Cancel scopes for activities and timers that failed validation and result in
* no command generated.
*
* Used to test that the SDK doesn't issue the cancellation command without
* scheduling first.
*/
export async function cancelScopeOnFailedValidation(): Promise<void> {
await wf.CancellationScope.cancellable(async () => {
try {
await someActivity(1n as any);
throw new Error('Expected an error but none was thrown');
} catch (err) {
if (wf.isCancellation(err)) {
throw err;
} else {
wf.CancellationScope.current().cancel();
}
}
});
await wf.CancellationScope.cancellable(async () => {
try {
await someLocalActivity(1n as any);
throw new Error('Expected an error but none was thrown');
} catch (err) {
if (wf.isCancellation(err)) {
throw err;
} else {
wf.CancellationScope.current().cancel();
}
}
});
await wf.CancellationScope.cancellable(async () => {
try {
await wf.sleep(NaN);
throw new Error('Expected an error but none was thrown');
} catch (err) {
if (wf.isCancellation(err)) {
throw err;
} else {
wf.CancellationScope.current().cancel();
}
}
});
}
1 change: 1 addition & 0 deletions packages/test/src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export * from './cancel-activity-after-first-completion';
export * from './cancel-fake-progress';
export * from './cancel-http-request';
export * from './cancel-requested-with-non-cancellable';
export * from './cancel-scope-on-failed-validation';
export * from './cancel-timer-immediately';
export * from './cancel-timer-immediately-alternative-impl';
export * from './cancel-timer-with-delay';
Expand Down
2 changes: 1 addition & 1 deletion packages/workflow/src/worker-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* @module
*/
import { errorToFailure as _errorToFailure, ProtoFailure } from '@temporalio/common';
import { composeInterceptors, IllegalStateError, msToTs, tsToMs, Workflow } from '@temporalio/internal-workflow-common';
import { composeInterceptors, IllegalStateError, msToTs, tsToMs } from '@temporalio/internal-workflow-common';
import type { coresdk } from '@temporalio/proto';
import { alea } from './alea';
import { storage } from './cancellation-scope';
Expand Down
41 changes: 20 additions & 21 deletions packages/workflow/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ function timerNextHandler(input: TimerInput) {
if (scope.cancellable) {
scope.cancelRequested.catch((err) => {
if (!state.completions.timer.delete(input.seq)) {
return; // Already resolved
return; // Already resolved or never scheduled
}
state.pushCommand({
cancelTimer: {
Expand All @@ -79,16 +79,16 @@ function timerNextHandler(input: TimerInput) {
reject(err);
});
}
state.completions.timer.set(input.seq, {
resolve,
reject,
});
state.pushCommand({
startTimer: {
seq: input.seq,
startToFireTimeout: msToTs(input.durationMs),
},
});
state.completions.timer.set(input.seq, {
resolve,
reject,
});
});
}

Expand Down Expand Up @@ -156,7 +156,7 @@ async function scheduleActivityNextHandler({
if (scope.cancellable) {
scope.cancelRequested.catch(() => {
if (!state.completions.activity.has(seq)) {
return; // Already resolved
return; // Already resolved or never scheduled
}
state.pushCommand({
requestCancelActivity: {
Expand All @@ -165,10 +165,6 @@ async function scheduleActivityNextHandler({
});
});
}
state.completions.activity.set(seq, {
resolve,
reject,
});
state.pushCommand({
scheduleActivity: {
seq,
Expand All @@ -185,6 +181,10 @@ async function scheduleActivityNextHandler({
cancellationType: options.cancellationType,
},
});
state.completions.activity.set(seq, {
resolve,
reject,
});
});
}

Expand All @@ -211,7 +211,7 @@ async function scheduleLocalActivityNextHandler({
if (scope.cancellable) {
scope.cancelRequested.catch(() => {
if (!state.completions.activity.has(seq)) {
return; // Already resolved
return; // Already resolved or never scheduled
}
state.pushCommand({
requestCancelLocalActivity: {
Expand All @@ -220,11 +220,6 @@ async function scheduleLocalActivityNextHandler({
});
});
}
state.completions.activity.set(seq, {
resolve,
reject,
});

state.pushCommand({
scheduleLocalActivity: {
seq,
Expand All @@ -243,6 +238,10 @@ async function scheduleLocalActivityNextHandler({
cancellationType: options.cancellationType,
},
});
state.completions.activity.set(seq, {
resolve,
reject,
});
});
}

Expand Down Expand Up @@ -351,10 +350,6 @@ async function startChildWorkflowExecutionNextHandler({
// Nothing to cancel otherwise
});
}
state.completions.childWorkflowStart.set(seq, {
resolve,
reject,
});
state.pushCommand({
startChildWorkflowExecution: {
seq,
Expand All @@ -378,6 +373,10 @@ async function startChildWorkflowExecutionNextHandler({
memo: options.memo && mapToPayloads(state.payloadConverter, options.memo),
},
});
state.completions.childWorkflowStart.set(seq, {
resolve,
reject,
});
});

// We construct a Promise for the completion of the child Workflow before we know
Expand Down Expand Up @@ -1076,7 +1075,7 @@ export type Handler<
* If this function is called multiple times for a given signal or query name the last handler will overwrite any previous calls.
*
* @param def a {@link SignalDefinition} or {@link QueryDefinition} as returned by {@link defineSignal} or {@link defineQuery} respectively.
* @param handler a compatible handler function for the given definition.
* @param handler a compatible handler function for the given definition or `undefined` to unset the handler.
*/
export function setHandler<Ret, Args extends any[], T extends SignalDefinition<Args> | QueryDefinition<Ret, Args>>(
def: T,
Expand Down

0 comments on commit fd77ced

Please sign in to comment.