diff --git a/scenarios/throughput_stress.go b/scenarios/throughput_stress.go index 6f4c91ea..bebf1231 100644 --- a/scenarios/throughput_stress.go +++ b/scenarios/throughput_stress.go @@ -507,7 +507,7 @@ func (t *tpsExecutor) createChildWorkflowAction(run *loadgen.Run, childID int) * WorkflowId: fmt.Sprintf("%s/child-%d", run.DefaultStartWorkflowOptions().ID, childID), SearchAttributes: map[string]*common.Payload{ ThroughputStressScenarioIdSearchAttribute: &common.Payload{ - Metadata: map[string][]byte{"encoding": []byte("json/plain")}, + Metadata: map[string][]byte{"encoding": []byte("json/plain"), "type": []byte("Keyword")}, Data: []byte(fmt.Sprintf("%q", t.config.ScenarioRunID)), // quoted to be valid JSON string }, }, diff --git a/workers/python/kitchen_sink.py b/workers/python/kitchen_sink.py index c9ab2f92..92e94f9e 100644 --- a/workers/python/kitchen_sink.py +++ b/workers/python/kitchen_sink.py @@ -6,7 +6,7 @@ import temporalio.workflow from temporalio import exceptions, workflow -from temporalio.api.common.v1 import Payload +from temporalio.api.common.v1 import Payload, SearchAttributes from temporalio.common import ( Priority, RawValue, @@ -14,7 +14,6 @@ SearchAttributeKey, SearchAttributeUpdate, ) -from temporalio.converter import DefaultPayloadConverter from temporalio.workflow import ActivityHandle, ChildWorkflowHandle from protos.kitchen_sink_pb2 import ( @@ -130,15 +129,14 @@ async def handle_action(self, action: Action) -> Optional[Payload]: child_action = action.exec_child_workflow child = child_action.workflow_type or "kitchenSink" args = [RawValue(i) for i in child_action.input] - + proto_sa = SearchAttributes(indexed_fields=child_action.search_attributes) + typed_attrs = temporalio.converter.decode_typed_search_attributes(proto_sa) await handle_awaitable_choice( workflow.start_child_workflow( child, id=child_action.workflow_id, args=args, - search_attributes=decode_search_attrs( - child_action.search_attributes, DefaultPayloadConverter() - ), + search_attributes=typed_attrs, ), child_action.awaitable_choice, after_started_fn=wait_task_complete, @@ -324,10 +322,3 @@ def convert_act_cancel_type( return temporalio.workflow.ActivityCancellationType.ABANDON else: raise NotImplementedError("Unknown cancellation type " + str(ctype)) - - -def decode_search_attrs(msg_map, converter): - return { - k: v if isinstance(v := converter.from_payload(p), list) else [v] - for k, p in msg_map.items() - } diff --git a/workers/typescript/src/payload-converter.ts b/workers/typescript/src/payload-converter.ts index 29d6d483..ba8a4d7e 100644 --- a/workers/typescript/src/payload-converter.ts +++ b/workers/typescript/src/payload-converter.ts @@ -9,39 +9,28 @@ import { JsonPayloadConverter, PayloadConverterWithEncoding, UndefinedPayloadConverter, - ValueError, } from '@temporalio/common'; -import { decode, encode } from '@temporalio/common/lib/encoding'; import Payload = temporal.api.common.v1.Payload; +// TODO(thomas): can remove this file entirely (and usage of custom payload converter for worker) +// once RawValue.fromPayload(p) is released. export class PassThroughPayload implements PayloadConverterWithEncoding { public toPayload(value: any): Payload | undefined { - if (!value || value.metadata === undefined || value.data === undefined) { + if ( + !value || + value.metadata == null || + value.data == null || + value?.metadata?.encoding == null + ) { return undefined; } - let asPayload; - try { - asPayload = Payload.fromObject(value as any); - } catch (e) { - throw new ValueError('PassThroughPayload can only convert Payloads'); - } - const asBytes = Payload.encode(asPayload).finish(); - return Payload.create({ - metadata: { - encoding: encode(this.encodingType), - }, - data: asBytes, - }); + // If it looks like a Payload, return it as-is + return value as Payload; } - public fromPayload(content: Payload): T { - if (decode(content.metadata?.encoding) === '__passthrough') { - const innerPayload = Payload.decode(new Uint8Array(content.data)); - return payloadConverter.fromPayload(innerPayload); - } - throw new ValueError( - 'PassThroughPayload can only decode passthrough Payloads, got ' + JSON.stringify(content) - ); + public fromPayload(_: Payload): T { + // This should never be called since we don't modify the encoding + throw new Error('PassThroughPayload.fromPayload should not be called'); } public get encodingType(): string { diff --git a/workers/typescript/src/proto_help.ts b/workers/typescript/src/proto_help.ts index d5b24b14..f828abff 100644 --- a/workers/typescript/src/proto_help.ts +++ b/workers/typescript/src/proto_help.ts @@ -3,6 +3,12 @@ import { google } from './protos/root'; import IDuration = google.protobuf.IDuration; import Long from 'long'; +export function durationConvertMaybeUndefined(d: IDuration | null | undefined): number | undefined { + if (!d) { + return undefined; + } + return durationConvert(d); +} export function durationConvert(d: IDuration | null | undefined): number { if (!d) { return 0; diff --git a/workers/typescript/src/workflows/kitchen_sink.ts b/workers/typescript/src/workflows/kitchen_sink.ts index 50ab5f55..c75a0548 100644 --- a/workers/typescript/src/workflows/kitchen_sink.ts +++ b/workers/typescript/src/workflows/kitchen_sink.ts @@ -4,7 +4,6 @@ import { ApplicationFailure, CancellationScope, ChildWorkflowHandle, - ChildWorkflowOptions, condition, continueAsNew, defineQuery, @@ -28,7 +27,8 @@ import { LocalActivityOptions, SearchAttributes, } from '@temporalio/common'; -import { durationConvert, numify } from '../proto_help'; +import { decodeTypedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes'; +import { durationConvert, durationConvertMaybeUndefined, numify } from '../proto_help'; import WorkflowInput = temporal.omes.kitchen_sink.WorkflowInput; import WorkflowState = temporal.omes.kitchen_sink.WorkflowState; import Payload = temporal.api.common.v1.Payload; @@ -139,19 +139,17 @@ export async function kitchenSink(input: WorkflowInput | undefined): Promise { - return startChild(execChild.workflowType ?? 'kitchenSink', { - args: execChild.input ?? [], - ...opts, - }); - }; await handleAwaitableChoice( - childStarter, + () => { + return startChild(execChild.workflowType || 'kitchenSink', { + args: execChild.input ?? [], + workflowId: execChild.workflowId ?? undefined, + typedSearchAttributes: decodeTypedSearchAttributes( + action?.execChildWorkflow?.searchAttributes + ), + }); + }, action.execChildWorkflow.awaitableChoice, async (task) => { await task; @@ -277,11 +275,10 @@ function launchActivity(execActivity: IExecuteActivityAction): Promise actType = 'client'; args.push(execActivity.client); } - const actArgs: ActivityOptions | LocalActivityOptions = { - scheduleToCloseTimeout: durationConvert(execActivity.scheduleToCloseTimeout), - startToCloseTimeout: durationConvert(execActivity.startToCloseTimeout), - scheduleToStartTimeout: durationConvert(execActivity.scheduleToStartTimeout), + scheduleToCloseTimeout: durationConvertMaybeUndefined(execActivity.scheduleToCloseTimeout), + startToCloseTimeout: durationConvertMaybeUndefined(execActivity.startToCloseTimeout), + scheduleToStartTimeout: durationConvertMaybeUndefined(execActivity.scheduleToStartTimeout), retry: decompileRetryPolicy(execActivity.retryPolicy), priority: decodePriority(execActivity.priority), };