Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scenarios/throughput_stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (t *tpsExecutor) createChildWorkflowAction(run *loadgen.Run, childID int) *
WorkflowId: fmt.Sprintf("%s/child-%d", run.DefaultStartWorkflowOptions().ID, childID),
SearchAttributes: map[string]*common.Payload{
ThroughputStressScenarioIdSearchAttribute: &common.Payload{
Metadata: map[string][]byte{"encoding": []byte("json/plain")},
Metadata: map[string][]byte{"encoding": []byte("json/plain"), "type": []byte("Keyword")},
Data: []byte(fmt.Sprintf("%q", t.config.ScenarioRunID)), // quoted to be valid JSON string
},
},
Expand Down
17 changes: 4 additions & 13 deletions workers/python/kitchen_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@

import temporalio.workflow
from temporalio import exceptions, workflow
from temporalio.api.common.v1 import Payload
from temporalio.api.common.v1 import Payload, SearchAttributes
from temporalio.common import (
Priority,
RawValue,
RetryPolicy,
SearchAttributeKey,
SearchAttributeUpdate,
)
from temporalio.converter import DefaultPayloadConverter
from temporalio.workflow import ActivityHandle, ChildWorkflowHandle

from protos.kitchen_sink_pb2 import (
Expand Down Expand Up @@ -130,15 +129,14 @@ async def handle_action(self, action: Action) -> Optional[Payload]:
child_action = action.exec_child_workflow
child = child_action.workflow_type or "kitchenSink"
args = [RawValue(i) for i in child_action.input]

proto_sa = SearchAttributes(indexed_fields=child_action.search_attributes)
typed_attrs = temporalio.converter.decode_typed_search_attributes(proto_sa)
await handle_awaitable_choice(
workflow.start_child_workflow(
child,
id=child_action.workflow_id,
args=args,
search_attributes=decode_search_attrs(
child_action.search_attributes, DefaultPayloadConverter()
),
search_attributes=typed_attrs,
),
child_action.awaitable_choice,
after_started_fn=wait_task_complete,
Expand Down Expand Up @@ -324,10 +322,3 @@ def convert_act_cancel_type(
return temporalio.workflow.ActivityCancellationType.ABANDON
else:
raise NotImplementedError("Unknown cancellation type " + str(ctype))


def decode_search_attrs(msg_map, converter):
return {
k: v if isinstance(v := converter.from_payload(p), list) else [v]
for k, p in msg_map.items()
}
37 changes: 13 additions & 24 deletions workers/typescript/src/payload-converter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,28 @@ import {
JsonPayloadConverter,
PayloadConverterWithEncoding,
UndefinedPayloadConverter,
ValueError,
} from '@temporalio/common';
import { decode, encode } from '@temporalio/common/lib/encoding';
import Payload = temporal.api.common.v1.Payload;

// TODO(thomas): can remove this file entirely (and usage of custom payload converter for worker)
// once RawValue.fromPayload(p) is released.
export class PassThroughPayload implements PayloadConverterWithEncoding {
public toPayload(value: any): Payload | undefined {
if (!value || value.metadata === undefined || value.data === undefined) {
if (
!value ||
value.metadata == null ||
value.data == null ||
value?.metadata?.encoding == null
) {
return undefined;
}
let asPayload;
try {
asPayload = Payload.fromObject(value as any);
} catch (e) {
throw new ValueError('PassThroughPayload can only convert Payloads');
}
const asBytes = Payload.encode(asPayload).finish();
return Payload.create({
metadata: {
encoding: encode(this.encodingType),
},
data: asBytes,
});
// If it looks like a Payload, return it as-is
return value as Payload;
}

public fromPayload<T>(content: Payload): T {
if (decode(content.metadata?.encoding) === '__passthrough') {
const innerPayload = Payload.decode(new Uint8Array(content.data));
return payloadConverter.fromPayload<T>(innerPayload);
}
throw new ValueError(
'PassThroughPayload can only decode passthrough Payloads, got ' + JSON.stringify(content)
);
public fromPayload<T>(_: Payload): T {
// This should never be called since we don't modify the encoding
throw new Error('PassThroughPayload.fromPayload should not be called');
}

public get encodingType(): string {
Expand Down
6 changes: 6 additions & 0 deletions workers/typescript/src/proto_help.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ import { google } from './protos/root';
import IDuration = google.protobuf.IDuration;
import Long from 'long';

export function durationConvertMaybeUndefined(d: IDuration | null | undefined): number | undefined {
if (!d) {
return undefined;
}
return durationConvert(d);
}
export function durationConvert(d: IDuration | null | undefined): number {
if (!d) {
return 0;
Expand Down
31 changes: 14 additions & 17 deletions workers/typescript/src/workflows/kitchen_sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
ApplicationFailure,
CancellationScope,
ChildWorkflowHandle,
ChildWorkflowOptions,
condition,
continueAsNew,
defineQuery,
Expand All @@ -28,7 +27,8 @@ import {
LocalActivityOptions,
SearchAttributes,
} from '@temporalio/common';
import { durationConvert, numify } from '../proto_help';
import { decodeTypedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes';
import { durationConvert, durationConvertMaybeUndefined, numify } from '../proto_help';
import WorkflowInput = temporal.omes.kitchen_sink.WorkflowInput;
import WorkflowState = temporal.omes.kitchen_sink.WorkflowState;
import Payload = temporal.api.common.v1.Payload;
Expand Down Expand Up @@ -139,19 +139,17 @@ export async function kitchenSink(input: WorkflowInput | undefined): Promise<IPa
action.execActivity.awaitableChoice
);
} else if (action.execChildWorkflow) {
const opts: ChildWorkflowOptions = {};
if (action.execChildWorkflow.workflowId) {
opts.workflowId = action.execChildWorkflow.workflowId;
}
const execChild = action.execChildWorkflow;
const childStarter = () => {
return startChild(execChild.workflowType ?? 'kitchenSink', {
args: execChild.input ?? [],
...opts,
});
};
await handleAwaitableChoice(
childStarter,
() => {
return startChild(execChild.workflowType || 'kitchenSink', {
args: execChild.input ?? [],
workflowId: execChild.workflowId ?? undefined,
typedSearchAttributes: decodeTypedSearchAttributes(
action?.execChildWorkflow?.searchAttributes
),
});
},
action.execChildWorkflow.awaitableChoice,
async (task) => {
await task;
Expand Down Expand Up @@ -277,11 +275,10 @@ function launchActivity(execActivity: IExecuteActivityAction): Promise<unknown>
actType = 'client';
args.push(execActivity.client);
}

const actArgs: ActivityOptions | LocalActivityOptions = {
scheduleToCloseTimeout: durationConvert(execActivity.scheduleToCloseTimeout),
startToCloseTimeout: durationConvert(execActivity.startToCloseTimeout),
scheduleToStartTimeout: durationConvert(execActivity.scheduleToStartTimeout),
scheduleToCloseTimeout: durationConvertMaybeUndefined(execActivity.scheduleToCloseTimeout),
startToCloseTimeout: durationConvertMaybeUndefined(execActivity.startToCloseTimeout),
scheduleToStartTimeout: durationConvertMaybeUndefined(execActivity.scheduleToStartTimeout),
retry: decompileRetryPolicy(execActivity.retryPolicy),
priority: decodePriority(execActivity.priority),
};
Expand Down
Loading