Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Result type setter for string-based activity, child workflow, and query calls #334

Merged
merged 2 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,7 @@ async def query(
arg: Any = temporalio.common._arg_unset,
*,
args: Sequence[Any] = [],
result_type: Optional[Type] = None,
reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
Expand All @@ -1289,6 +1290,7 @@ async def query(
arg: Any = temporalio.common._arg_unset,
*,
args: Sequence[Any] = [],
result_type: Optional[Type] = None,
reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
Expand All @@ -1308,6 +1310,8 @@ async def query(
query: Query function or name on the workflow.
arg: Single argument to the query.
args: Multiple arguments to the query. Cannot be set if arg is.
result_type: For string queries, this can set the specific result
type hint to deserialize into.
reject_condition: Condition for rejecting the query. If unset/None,
defaults to the client's default (which is defaulted to None).
rpc_metadata: Headers used on the RPC call. Keys here override
Expand All @@ -1322,7 +1326,7 @@ async def query(
RPCError: Workflow details could not be fetched.
"""
query_name: str
ret_type: Optional[Type] = None
ret_type = result_type
if callable(query):
defn = temporalio.workflow._QueryDefinition.from_fn(query)
if not defn:
Expand Down
9 changes: 6 additions & 3 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ def workflow_start_activity(
activity: Any,
*args: Any,
task_queue: Optional[str],
result_type: Optional[Type],
schedule_to_close_timeout: Optional[timedelta],
schedule_to_start_timeout: Optional[timedelta],
start_to_close_timeout: Optional[timedelta],
Expand All @@ -823,7 +824,7 @@ def workflow_start_activity(
# Get activity definition if it's callable
name: str
arg_types: Optional[List[Type]] = None
ret_type: Optional[Type] = None
ret_type = result_type
if isinstance(activity, str):
name = activity
elif callable(activity):
Expand Down Expand Up @@ -859,6 +860,7 @@ async def workflow_start_child_workflow(
*args: Any,
id: str,
task_queue: Optional[str],
result_type: Optional[Type],
cancellation_type: temporalio.workflow.ChildWorkflowCancellationType,
parent_close_policy: temporalio.workflow.ParentClosePolicy,
execution_timeout: Optional[timedelta],
Expand All @@ -873,7 +875,7 @@ async def workflow_start_child_workflow(
# Use definition if callable
name: str
arg_types: Optional[List[Type]] = None
ret_type: Optional[Type] = None
ret_type = result_type
if isinstance(workflow, str):
name = workflow
elif callable(workflow):
Expand Down Expand Up @@ -910,6 +912,7 @@ def workflow_start_local_activity(
self,
activity: Any,
*args: Any,
result_type: Optional[Type],
schedule_to_close_timeout: Optional[timedelta],
schedule_to_start_timeout: Optional[timedelta],
start_to_close_timeout: Optional[timedelta],
Expand All @@ -921,7 +924,7 @@ def workflow_start_local_activity(
# Get activity definition if it's callable
name: str
arg_types: Optional[List[Type]] = None
ret_type: Optional[Type] = None
ret_type = result_type
if isinstance(activity, str):
name = activity
elif callable(activity):
Expand Down
35 changes: 35 additions & 0 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ def workflow_start_activity(
activity: Any,
*args: Any,
task_queue: Optional[str],
result_type: Optional[Type],
schedule_to_close_timeout: Optional[timedelta],
schedule_to_start_timeout: Optional[timedelta],
start_to_close_timeout: Optional[timedelta],
Expand All @@ -463,6 +464,7 @@ async def workflow_start_child_workflow(
*args: Any,
id: str,
task_queue: Optional[str],
result_type: Optional[Type],
cancellation_type: ChildWorkflowCancellationType,
parent_close_policy: ParentClosePolicy,
execution_timeout: Optional[timedelta],
Expand All @@ -481,6 +483,7 @@ def workflow_start_local_activity(
self,
activity: Any,
*args: Any,
result_type: Optional[Type],
schedule_to_close_timeout: Optional[timedelta],
schedule_to_start_timeout: Optional[timedelta],
start_to_close_timeout: Optional[timedelta],
Expand Down Expand Up @@ -1265,6 +1268,7 @@ def start_activity(
*,
args: Sequence[Any] = [],
task_queue: Optional[str] = None,
result_type: Optional[Type] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
Expand All @@ -1282,6 +1286,7 @@ def start_activity(
*,
args: Sequence[Any] = [],
task_queue: Optional[str] = None,
result_type: Optional[Type] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
Expand All @@ -1301,6 +1306,8 @@ def start_activity(
args: Multiple arguments to the activity. Cannot be set if arg is.
task_queue: Task queue to run the activity on. Defaults to the current
workflow's task queue.
result_type: For string activities, this can set the specific result
type hint to deserialize into.
schedule_to_close_timeout: Max amount of time the activity can take from
first being scheduled to being completed before it times out. This
is inclusive of all retries.
Expand All @@ -1326,6 +1333,7 @@ def start_activity(
activity,
*temporalio.common._arg_or_args(arg, args),
task_queue=task_queue,
result_type=result_type,
schedule_to_close_timeout=schedule_to_close_timeout,
schedule_to_start_timeout=schedule_to_start_timeout,
start_to_close_timeout=start_to_close_timeout,
Expand Down Expand Up @@ -1450,6 +1458,7 @@ async def execute_activity(
*,
args: Sequence[Any] = [],
task_queue: Optional[str] = None,
result_type: Optional[Type] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
Expand All @@ -1467,6 +1476,7 @@ async def execute_activity(
*,
args: Sequence[Any] = [],
task_queue: Optional[str] = None,
result_type: Optional[Type] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
Expand All @@ -1485,6 +1495,7 @@ async def execute_activity(
activity,
*temporalio.common._arg_or_args(arg, args),
task_queue=task_queue,
result_type=result_type,
schedule_to_close_timeout=schedule_to_close_timeout,
schedule_to_start_timeout=schedule_to_start_timeout,
start_to_close_timeout=start_to_close_timeout,
Expand Down Expand Up @@ -1623,6 +1634,7 @@ def start_activity_class(
activity,
*temporalio.common._arg_or_args(arg, args),
task_queue=task_queue,
result_type=None,
schedule_to_close_timeout=schedule_to_close_timeout,
schedule_to_start_timeout=schedule_to_start_timeout,
start_to_close_timeout=start_to_close_timeout,
Expand Down Expand Up @@ -1761,6 +1773,7 @@ async def execute_activity_class(
activity,
*temporalio.common._arg_or_args(arg, args),
task_queue=task_queue,
result_type=None,
schedule_to_close_timeout=schedule_to_close_timeout,
schedule_to_start_timeout=schedule_to_start_timeout,
start_to_close_timeout=start_to_close_timeout,
Expand Down Expand Up @@ -1899,6 +1912,7 @@ def start_activity_method(
activity,
*temporalio.common._arg_or_args(arg, args),
task_queue=task_queue,
result_type=None,
schedule_to_close_timeout=schedule_to_close_timeout,
schedule_to_start_timeout=schedule_to_start_timeout,
start_to_close_timeout=start_to_close_timeout,
Expand Down Expand Up @@ -2039,6 +2053,7 @@ async def execute_activity_method(
activity,
*temporalio.common._arg_or_args(arg, args),
task_queue=task_queue,
result_type=None,
schedule_to_close_timeout=schedule_to_close_timeout,
schedule_to_start_timeout=schedule_to_start_timeout,
start_to_close_timeout=start_to_close_timeout,
Expand Down Expand Up @@ -2170,6 +2185,7 @@ def start_local_activity(
arg: Any = temporalio.common._arg_unset,
*,
args: Sequence[Any] = [],
result_type: Optional[Type] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
Expand All @@ -2186,6 +2202,7 @@ def start_local_activity(
arg: Any = temporalio.common._arg_unset,
*,
args: Sequence[Any] = [],
result_type: Optional[Type] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
Expand All @@ -2206,6 +2223,8 @@ def start_local_activity(
activity: Activity name or function reference.
arg: Single argument to the activity.
args: Multiple arguments to the activity. Cannot be set if arg is.
result_type: For string activities, this can set the specific result
type hint to deserialize into.
activity_id: Optional unique identifier for the activity.
schedule_to_close_timeout: Max amount of time the activity can take from
first being scheduled to being completed before it times out. This
Expand All @@ -2229,6 +2248,7 @@ def start_local_activity(
return _Runtime.current().workflow_start_local_activity(
activity,
*temporalio.common._arg_or_args(arg, args),
result_type=result_type,
schedule_to_close_timeout=schedule_to_close_timeout,
schedule_to_start_timeout=schedule_to_start_timeout,
start_to_close_timeout=start_to_close_timeout,
Expand Down Expand Up @@ -2346,6 +2366,7 @@ async def execute_local_activity(
arg: Any = temporalio.common._arg_unset,
*,
args: Sequence[Any] = [],
result_type: Optional[Type] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
Expand All @@ -2362,6 +2383,7 @@ async def execute_local_activity(
arg: Any = temporalio.common._arg_unset,
*,
args: Sequence[Any] = [],
result_type: Optional[Type] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
schedule_to_start_timeout: Optional[timedelta] = None,
start_to_close_timeout: Optional[timedelta] = None,
Expand All @@ -2382,6 +2404,7 @@ async def execute_local_activity(
return await _Runtime.current().workflow_start_local_activity(
activity,
*temporalio.common._arg_or_args(arg, args),
result_type=result_type,
schedule_to_close_timeout=schedule_to_close_timeout,
schedule_to_start_timeout=schedule_to_start_timeout,
start_to_close_timeout=start_to_close_timeout,
Expand Down Expand Up @@ -2515,6 +2538,7 @@ def start_local_activity_class(
return _Runtime.current().workflow_start_local_activity(
activity,
*temporalio.common._arg_or_args(arg, args),
result_type=None,
schedule_to_close_timeout=schedule_to_close_timeout,
schedule_to_start_timeout=schedule_to_start_timeout,
start_to_close_timeout=start_to_close_timeout,
Expand Down Expand Up @@ -2650,6 +2674,7 @@ async def execute_local_activity_class(
return await _Runtime.current().workflow_start_local_activity(
activity,
*temporalio.common._arg_or_args(arg, args),
result_type=None,
schedule_to_close_timeout=schedule_to_close_timeout,
schedule_to_start_timeout=schedule_to_start_timeout,
start_to_close_timeout=start_to_close_timeout,
Expand Down Expand Up @@ -2783,6 +2808,7 @@ def start_local_activity_method(
return _Runtime.current().workflow_start_local_activity(
activity,
*temporalio.common._arg_or_args(arg, args),
result_type=None,
schedule_to_close_timeout=schedule_to_close_timeout,
schedule_to_start_timeout=schedule_to_start_timeout,
start_to_close_timeout=start_to_close_timeout,
Expand Down Expand Up @@ -2918,6 +2944,7 @@ async def execute_local_activity_method(
return await _Runtime.current().workflow_start_local_activity(
activity,
*temporalio.common._arg_or_args(arg, args),
result_type=None,
schedule_to_close_timeout=schedule_to_close_timeout,
schedule_to_start_timeout=schedule_to_start_timeout,
start_to_close_timeout=start_to_close_timeout,
Expand Down Expand Up @@ -3127,6 +3154,7 @@ async def start_child_workflow(
args: Sequence[Any] = [],
id: Optional[str] = None,
task_queue: Optional[str] = None,
result_type: Optional[Type] = None,
cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
execution_timeout: Optional[timedelta] = None,
Expand All @@ -3148,6 +3176,7 @@ async def start_child_workflow(
args: Sequence[Any] = [],
id: Optional[str] = None,
task_queue: Optional[str] = None,
result_type: Optional[Type] = None,
cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
execution_timeout: Optional[timedelta] = None,
Expand All @@ -3170,6 +3199,8 @@ async def start_child_workflow(
defaults to :py:func:`uuid4`.
task_queue: Task queue to run the workflow on. Defaults to the current
workflow's task queue.
result_type: For string workflows, this can set the specific result type
hint to deserialize into.
cancellation_type: How the child workflow will react to cancellation.
parent_close_policy: How to handle the child workflow when the parent
workflow closes.
Expand All @@ -3191,6 +3222,7 @@ async def start_child_workflow(
*temporalio.common._arg_or_args(arg, args),
id=id or str(uuid4()),
task_queue=task_queue,
result_type=result_type,
cancellation_type=cancellation_type,
parent_close_policy=parent_close_policy,
execution_timeout=execution_timeout,
Expand Down Expand Up @@ -3278,6 +3310,7 @@ async def execute_child_workflow(
args: Sequence[Any] = [],
id: Optional[str] = None,
task_queue: Optional[str] = None,
result_type: Optional[Type] = None,
cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
execution_timeout: Optional[timedelta] = None,
Expand All @@ -3299,6 +3332,7 @@ async def execute_child_workflow(
args: Sequence[Any] = [],
id: Optional[str] = None,
task_queue: Optional[str] = None,
result_type: Optional[Type] = None,
cancellation_type: ChildWorkflowCancellationType = ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
parent_close_policy: ParentClosePolicy = ParentClosePolicy.TERMINATE,
execution_timeout: Optional[timedelta] = None,
Expand All @@ -3321,6 +3355,7 @@ async def execute_child_workflow(
*temporalio.common._arg_or_args(arg, args),
id=id or str(uuid4()),
task_queue=task_queue,
result_type=result_type,
cancellation_type=cancellation_type,
parent_close_policy=parent_close_policy,
execution_timeout=execution_timeout,
Expand Down
Loading
Loading