diff --git a/temporalio/client.py b/temporalio/client.py index 2b52eb3c..252fb064 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -1277,6 +1277,7 @@ async def query( arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], + result_type: Optional[Type] = None, reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, rpc_metadata: Mapping[str, str] = {}, rpc_timeout: Optional[timedelta] = None, @@ -1289,6 +1290,7 @@ async def query( arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], + result_type: Optional[Type] = None, reject_condition: Optional[temporalio.common.QueryRejectCondition] = None, rpc_metadata: Mapping[str, str] = {}, rpc_timeout: Optional[timedelta] = None, @@ -1308,6 +1310,8 @@ async def query( query: Query function or name on the workflow. arg: Single argument to the query. args: Multiple arguments to the query. Cannot be set if arg is. + result_type: For string queries, this can set the specific result + type hint to deserialize into. reject_condition: Condition for rejecting the query. If unset/None, defaults to the client's default (which is defaulted to None). rpc_metadata: Headers used on the RPC call. Keys here override @@ -1322,7 +1326,7 @@ async def query( RPCError: Workflow details could not be fetched. """ query_name: str - ret_type: Optional[Type] = None + ret_type = result_type if callable(query): defn = temporalio.workflow._QueryDefinition.from_fn(query) if not defn: diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 2dc065fe..16953fab 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -812,6 +812,7 @@ def workflow_start_activity( activity: Any, *args: Any, task_queue: Optional[str], + result_type: Optional[Type], schedule_to_close_timeout: Optional[timedelta], schedule_to_start_timeout: Optional[timedelta], start_to_close_timeout: Optional[timedelta], @@ -823,7 +824,7 @@ def workflow_start_activity( # Get activity definition if it's callable name: str arg_types: Optional[List[Type]] = None - ret_type: Optional[Type] = None + ret_type = result_type if isinstance(activity, str): name = activity elif callable(activity): @@ -859,6 +860,7 @@ async def workflow_start_child_workflow( *args: Any, id: str, task_queue: Optional[str], + result_type: Optional[Type], cancellation_type: temporalio.workflow.ChildWorkflowCancellationType, parent_close_policy: temporalio.workflow.ParentClosePolicy, execution_timeout: Optional[timedelta], @@ -873,7 +875,7 @@ async def workflow_start_child_workflow( # Use definition if callable name: str arg_types: Optional[List[Type]] = None - ret_type: Optional[Type] = None + ret_type = result_type if isinstance(workflow, str): name = workflow elif callable(workflow): @@ -910,6 +912,7 @@ def workflow_start_local_activity( self, activity: Any, *args: Any, + result_type: Optional[Type], schedule_to_close_timeout: Optional[timedelta], schedule_to_start_timeout: Optional[timedelta], start_to_close_timeout: Optional[timedelta], @@ -921,7 +924,7 @@ def workflow_start_local_activity( # Get activity definition if it's callable name: str arg_types: Optional[List[Type]] = None - ret_type: Optional[Type] = None + ret_type = result_type if isinstance(activity, str): name = activity elif callable(activity): diff --git a/temporalio/workflow.py b/temporalio/workflow.py index 4e7c2df1..624eab15 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -446,6 +446,7 @@ def workflow_start_activity( activity: Any, *args: Any, task_queue: Optional[str], + result_type: Optional[Type], schedule_to_close_timeout: Optional[timedelta], schedule_to_start_timeout: Optional[timedelta], start_to_close_timeout: Optional[timedelta], @@ -463,6 +464,7 @@ async def workflow_start_child_workflow( *args: Any, id: str, task_queue: Optional[str], + result_type: Optional[Type], cancellation_type: ChildWorkflowCancellationType, parent_close_policy: ParentClosePolicy, execution_timeout: Optional[timedelta], @@ -481,6 +483,7 @@ def workflow_start_local_activity( self, activity: Any, *args: Any, + result_type: Optional[Type], schedule_to_close_timeout: Optional[timedelta], schedule_to_start_timeout: Optional[timedelta], start_to_close_timeout: Optional[timedelta], @@ -1265,6 +1268,7 @@ def start_activity( *, args: Sequence[Any] = [], task_queue: Optional[str] = None, + result_type: Optional[Type] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, @@ -1282,6 +1286,7 @@ def start_activity( *, args: Sequence[Any] = [], task_queue: Optional[str] = None, + result_type: Optional[Type] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, @@ -1301,6 +1306,8 @@ def start_activity( args: Multiple arguments to the activity. Cannot be set if arg is. task_queue: Task queue to run the activity on. Defaults to the current workflow's task queue. + result_type: For string activities, this can set the specific result + type hint to deserialize into. schedule_to_close_timeout: Max amount of time the activity can take from first being scheduled to being completed before it times out. This is inclusive of all retries. @@ -1326,6 +1333,7 @@ def start_activity( activity, *temporalio.common._arg_or_args(arg, args), task_queue=task_queue, + result_type=result_type, schedule_to_close_timeout=schedule_to_close_timeout, schedule_to_start_timeout=schedule_to_start_timeout, start_to_close_timeout=start_to_close_timeout, @@ -1450,6 +1458,7 @@ async def execute_activity( *, args: Sequence[Any] = [], task_queue: Optional[str] = None, + result_type: Optional[Type] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, @@ -1467,6 +1476,7 @@ async def execute_activity( *, args: Sequence[Any] = [], task_queue: Optional[str] = None, + result_type: Optional[Type] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, @@ -1485,6 +1495,7 @@ async def execute_activity( activity, *temporalio.common._arg_or_args(arg, args), task_queue=task_queue, + result_type=result_type, schedule_to_close_timeout=schedule_to_close_timeout, schedule_to_start_timeout=schedule_to_start_timeout, start_to_close_timeout=start_to_close_timeout, @@ -1623,6 +1634,7 @@ def start_activity_class( activity, *temporalio.common._arg_or_args(arg, args), task_queue=task_queue, + result_type=None, schedule_to_close_timeout=schedule_to_close_timeout, schedule_to_start_timeout=schedule_to_start_timeout, start_to_close_timeout=start_to_close_timeout, @@ -1761,6 +1773,7 @@ async def execute_activity_class( activity, *temporalio.common._arg_or_args(arg, args), task_queue=task_queue, + result_type=None, schedule_to_close_timeout=schedule_to_close_timeout, schedule_to_start_timeout=schedule_to_start_timeout, start_to_close_timeout=start_to_close_timeout, @@ -1899,6 +1912,7 @@ def start_activity_method( activity, *temporalio.common._arg_or_args(arg, args), task_queue=task_queue, + result_type=None, schedule_to_close_timeout=schedule_to_close_timeout, schedule_to_start_timeout=schedule_to_start_timeout, start_to_close_timeout=start_to_close_timeout, @@ -2039,6 +2053,7 @@ async def execute_activity_method( activity, *temporalio.common._arg_or_args(arg, args), task_queue=task_queue, + result_type=None, schedule_to_close_timeout=schedule_to_close_timeout, schedule_to_start_timeout=schedule_to_start_timeout, start_to_close_timeout=start_to_close_timeout, @@ -2170,6 +2185,7 @@ def start_local_activity( arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], + result_type: Optional[Type] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, @@ -2186,6 +2202,7 @@ def start_local_activity( arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], + result_type: Optional[Type] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, @@ -2206,6 +2223,8 @@ def start_local_activity( activity: Activity name or function reference. arg: Single argument to the activity. args: Multiple arguments to the activity. Cannot be set if arg is. + result_type: For string activities, this can set the specific result + type hint to deserialize into. activity_id: Optional unique identifier for the activity. schedule_to_close_timeout: Max amount of time the activity can take from first being scheduled to being completed before it times out. This @@ -2229,6 +2248,7 @@ def start_local_activity( return _Runtime.current().workflow_start_local_activity( activity, *temporalio.common._arg_or_args(arg, args), + result_type=result_type, schedule_to_close_timeout=schedule_to_close_timeout, schedule_to_start_timeout=schedule_to_start_timeout, start_to_close_timeout=start_to_close_timeout, @@ -2346,6 +2366,7 @@ async def execute_local_activity( arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], + result_type: Optional[Type] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, @@ -2362,6 +2383,7 @@ async def execute_local_activity( arg: Any = temporalio.common._arg_unset, *, args: Sequence[Any] = [], + result_type: Optional[Type] = None, schedule_to_close_timeout: Optional[timedelta] = None, schedule_to_start_timeout: Optional[timedelta] = None, start_to_close_timeout: Optional[timedelta] = None, @@ -2382,6 +2404,7 @@ async def execute_local_activity( return await _Runtime.current().workflow_start_local_activity( activity, *temporalio.common._arg_or_args(arg, args), + result_type=result_type, schedule_to_close_timeout=schedule_to_close_timeout, schedule_to_start_timeout=schedule_to_start_timeout, start_to_close_timeout=start_to_close_timeout, @@ -2515,6 +2538,7 @@ def start_local_activity_class( return _Runtime.current().workflow_start_local_activity( activity, *temporalio.common._arg_or_args(arg, args), + result_type=None, schedule_to_close_timeout=schedule_to_close_timeout, schedule_to_start_timeout=schedule_to_start_timeout, start_to_close_timeout=start_to_close_timeout, @@ -2650,6 +2674,7 @@ async def execute_local_activity_class( return await _Runtime.current().workflow_start_local_activity( activity, *temporalio.common._arg_or_args(arg, args), + result_type=None, schedule_to_close_timeout=schedule_to_close_timeout, schedule_to_start_timeout=schedule_to_start_timeout, start_to_close_timeout=start_to_close_timeout, @@ -2783,6 +2808,7 @@ def start_local_activity_method( return _Runtime.current().workflow_start_local_activity( activity, *temporalio.common._arg_or_args(arg, args), + result_type=None, schedule_to_close_timeout=schedule_to_close_timeout, schedule_to_start_timeout=schedule_to_start_timeout, start_to_close_timeout=start_to_close_timeout, @@ -2918,6 +2944,7 @@ async def execute_local_activity_method( return await _Runtime.current().workflow_start_local_activity( activity, *temporalio.common._arg_or_args(arg, args), + result_type=None, schedule_to_close_timeout=schedule_to_close_timeout, schedule_to_start_timeout=schedule_to_start_timeout, start_to_close_timeout=start_to_close_timeout, @@ -3127,6 +3154,7 @@ async def start_child_workflow( args: Sequence[Any] = [], id: Optional[str] = None, task_queue: Optional[str] = None, + result_type: Optional[Type] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, @@ -3148,6 +3176,7 @@ async def start_child_workflow( args: Sequence[Any] = [], id: Optional[str] = None, task_queue: Optional[str] = None, + result_type: Optional[Type] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, @@ -3170,6 +3199,8 @@ async def start_child_workflow( defaults to :py:func:`uuid4`. task_queue: Task queue to run the workflow on. Defaults to the current workflow's task queue. + result_type: For string workflows, this can set the specific result type + hint to deserialize into. cancellation_type: How the child workflow will react to cancellation. parent_close_policy: How to handle the child workflow when the parent workflow closes. @@ -3191,6 +3222,7 @@ async def start_child_workflow( *temporalio.common._arg_or_args(arg, args), id=id or str(uuid4()), task_queue=task_queue, + result_type=result_type, cancellation_type=cancellation_type, parent_close_policy=parent_close_policy, execution_timeout=execution_timeout, @@ -3278,6 +3310,7 @@ async def execute_child_workflow( args: Sequence[Any] = [], id: Optional[str] = None, task_queue: Optional[str] = None, + result_type: Optional[Type] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, @@ -3299,6 +3332,7 @@ async def execute_child_workflow( args: Sequence[Any] = [], id: Optional[str] = None, task_queue: Optional[str] = None, + result_type: Optional[Type] = None, cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE, execution_timeout: Optional[timedelta] = None, @@ -3321,6 +3355,7 @@ async def execute_child_workflow( *temporalio.common._arg_or_args(arg, args), id=id or str(uuid4()), task_queue=task_queue, + result_type=result_type, cancellation_type=cancellation_type, parent_close_policy=parent_close_policy, execution_timeout=execution_timeout, diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index ff86fa44..23e3c8da 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -2763,3 +2763,74 @@ async def test_exception_raising_converter_param(client: Client): ) assert isinstance(err.value.cause, ApplicationError) assert "Intentional converter failure" in str(err.value.cause) + + +@dataclass +class ManualResultType: + some_string: str + + +@activity.defn +async def manual_result_type_activity() -> ManualResultType: + return ManualResultType(some_string="from-activity") + + +@workflow.defn +class ManualResultTypeWorkflow: + @workflow.run + async def run(self) -> ManualResultType: + # Only check activity and child if not a child ourselves + if not workflow.info().parent: + # Activity without result type and with + res1 = await workflow.execute_activity( + "manual_result_type_activity", + schedule_to_close_timeout=timedelta(minutes=2), + ) + assert res1 == {"some_string": "from-activity"} + res2 = await workflow.execute_activity( + "manual_result_type_activity", + result_type=ManualResultType, + schedule_to_close_timeout=timedelta(minutes=2), + ) + assert res2 == ManualResultType(some_string="from-activity") + # Child without result type and with + res3 = await workflow.execute_child_workflow( + "ManualResultTypeWorkflow", + ) + assert res3 == {"some_string": "from-workflow"} + res4 = await workflow.execute_child_workflow( + "ManualResultTypeWorkflow", + result_type=ManualResultType, + ) + assert res4 == ManualResultType(some_string="from-workflow") + return ManualResultType(some_string="from-workflow") + + @workflow.query + def some_query(self) -> ManualResultType: + return ManualResultType(some_string="from-query") + + +async def test_manual_result_type(client: Client): + async with new_worker( + client, ManualResultTypeWorkflow, activities=[manual_result_type_activity] + ) as worker: + # Workflow without result type and with + res1 = await client.execute_workflow( + "ManualResultTypeWorkflow", + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + assert res1 == {"some_string": "from-workflow"} + handle = await client.start_workflow( + "ManualResultTypeWorkflow", + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + result_type=ManualResultType, + ) + res2 = await handle.result() + assert res2 == ManualResultType(some_string="from-workflow") + # Query without result type and with + res3 = await handle.query("some_query") + assert res3 == {"some_string": "from-query"} + res4 = await handle.query("some_query", result_type=ManualResultType) + assert res4 == ManualResultType(some_string="from-query")