Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log and drop signals whose params can't be deserialized (and other error handling improvements) #349

Merged
merged 4 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion temporalio/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,10 @@ def _type_hints_from_func(
if (
index == 0
and value.name == "self"
and value.annotation is inspect.Parameter.empty
and (
value.annotation is inspect.Parameter.empty
or str(value.annotation) == "typing.Self"
)
):
continue
# Stop if non-positional or not a class
Expand Down
7 changes: 1 addition & 6 deletions temporalio/worker/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,7 @@ async def _run_activity(
completion.result.completed.result.CopyFrom(
(await self._data_converter.encode([result]))[0]
)
except (
Exception,
asyncio.CancelledError,
temporalio.exceptions.CancelledError,
temporalio.activity._CompleteAsyncError,
) as err:
except BaseException as err:
try:
if isinstance(err, temporalio.activity._CompleteAsyncError):
temporalio.activity.logger.debug("Completing asynchronously")
Expand Down
16 changes: 13 additions & 3 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1231,9 +1231,19 @@ def _process_signal_job(
defn: temporalio.workflow._SignalDefinition,
job: temporalio.bridge.proto.workflow_activation.SignalWorkflow,
) -> None:
args = self._process_handler_args(
job.signal_name, job.input, defn.name, defn.arg_types, defn.dynamic_vararg
)
try:
args = self._process_handler_args(
job.signal_name,
job.input,
defn.name,
defn.arg_types,
defn.dynamic_vararg,
)
except Exception:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the only things that could get raised from the call are decoding problems... but maybe it'd be a good protection to catch something a little more specific here, if that can be done w/o any incompatible change.

Copy link
Member Author

@cretz cretz Jul 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want the change technically incompatible. This would fail the task before and I think the requirement is "any deserialization failure during signals causes log-and-drop". There is nothing more specific I can think of here. I guess I could skip Temporal failures, but I don't think we want to let any deserialization exceptions escape. If we do, I think I'd want to make it an option (failing the workflow from the signal arg converter is a bit rough, but may be wanted).

logger.exception(
f"Failed deserializing signal input for {job.signal_name}, dropping the signal"
)
return
input = HandleSignalInput(
signal=job.signal_name, args=args, headers=job.headers
)
Expand Down
66 changes: 65 additions & 1 deletion tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import queue
import sys
import threading
import typing
import uuid
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -493,6 +493,46 @@ async def test_workflow_signal_qnd_query_handlers_old_dynamic_style(client: Clie
)


@dataclass
class BadSignalParam:
some_str: str


@workflow.defn
class BadSignalParamWorkflow:
def __init__(self) -> None:
self._signals: List[BadSignalParam] = []

@workflow.run
async def run(self) -> List[BadSignalParam]:
await workflow.wait_condition(
lambda: bool(self._signals) and self._signals[-1].some_str == "finish"
)
return self._signals

@workflow.signal
async def some_signal(self, param: BadSignalParam) -> None:
self._signals.append(param)


async def test_workflow_bad_signal_param(client: Client):
async with new_worker(client, BadSignalParamWorkflow) as worker:
handle = await client.start_workflow(
BadSignalParamWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
# Send 4 signals, first and third are bad
await handle.signal("some_signal", "bad")
await handle.signal("some_signal", BadSignalParam(some_str="good"))
await handle.signal("some_signal", 123)
await handle.signal("some_signal", BadSignalParam(some_str="finish"))
assert [
BadSignalParam(some_str="good"),
BadSignalParam(some_str="finish"),
] == await handle.result()


@workflow.defn
class AsyncUtilWorkflow:
def __init__(self) -> None:
Expand Down Expand Up @@ -3055,3 +3095,27 @@ async def test_workflow_dynamic(client: Client):
)
assert isinstance(result, DynamicWorkflowValue)
assert result == DynamicWorkflowValue("some-workflow - val1 - val2")


# typing.Self only in 3.11+
if sys.version_info >= (3, 11):

@dataclass
class AnnotatedWithSelfParam:
some_str: str

@workflow.defn
class WorkflowAnnotatedWithSelf:
@workflow.run
async def run(self: typing.Self, some_arg: AnnotatedWithSelfParam) -> str:
assert isinstance(some_arg, AnnotatedWithSelfParam)
return some_arg.some_str

async def test_workflow_annotated_with_self(client: Client):
async with new_worker(client, WorkflowAnnotatedWithSelf) as worker:
assert "foo" == await client.execute_workflow(
WorkflowAnnotatedWithSelf.run,
AnnotatedWithSelfParam(some_str="foo"),
id=f"wf-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
Loading