Skip to content

Commit

Permalink
Fail activity worker on broken executor (#253)
Browse files Browse the repository at this point in the history
Fixes #245
  • Loading branch information
cretz committed Jan 13, 2023
1 parent ddbb92f commit fdf8b65
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 3 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,9 @@ Note, all calls from an activity to functions in the `temporalio.activity` packa
activities must `copy_context()` and then `.run()` manually to ensure `temporalio.activity` calls like `heartbeat` still
function in the new threads.

If any activity ever throws a `concurrent.futures.BrokenExecutor`, the failure is consisted unrecoverable and the worker
will fail and shutdown.

###### Synchronous Multithreaded Activities

If `activity_executor` is set to an instance of `concurrent.futures.ThreadPoolExecutor` then the synchronous activities
Expand Down
37 changes: 35 additions & 2 deletions temporalio/worker/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,18 @@
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Callable, Dict, Iterator, Optional, Sequence, Tuple, Type, Union
from typing import (
Any,
Callable,
Dict,
Iterator,
NoReturn,
Optional,
Sequence,
Tuple,
Type,
Union,
)

import google.protobuf.duration_pb2
import google.protobuf.timestamp_pb2
Expand Down Expand Up @@ -64,6 +75,7 @@ def __init__(
self._running_activities: Dict[bytes, _RunningActivity] = {}
self._data_converter = data_converter
self._interceptors = interceptors
self._fail_worker_exception_queue: asyncio.Queue[Exception] = asyncio.Queue()
# Lazily created on first activity
self._worker_shutdown_event: Optional[
temporalio.activity._CompositeEvent
Expand Down Expand Up @@ -111,11 +123,26 @@ def __init__(
self._activities[defn.name] = defn

async def run(self) -> None:
# Create a task that fails when we get a failure on the queue
async def raise_from_queue() -> NoReturn:
raise await self._fail_worker_exception_queue.get()

exception_task = asyncio.create_task(raise_from_queue())

# Continually poll for activity work
while True:
try:
# Poll for a task
task = await self._bridge_worker().poll_activity_task()
poll_task = asyncio.create_task(
self._bridge_worker().poll_activity_task()
)
await asyncio.wait([poll_task, exception_task], return_when=asyncio.FIRST_COMPLETED) # type: ignore
# If exception for failing the worker happened, raise it.
# Otherwise, the poll succeeded.
if exception_task.done():
poll_task.cancel()
await exception_task
task = await poll_task

if task.HasField("start"):
# Cancelled event and sync field will be updated inside
Expand All @@ -131,8 +158,10 @@ async def run(self) -> None:
else:
raise RuntimeError(f"Unrecognized activity task: {task}")
except temporalio.bridge.worker.PollShutdownError:
exception_task.cancel()
return
except Exception as err:
exception_task.cancel()
raise RuntimeError("Activity worker failed") from err

async def shutdown(self, after_graceful_timeout: timedelta) -> None:
Expand Down Expand Up @@ -465,6 +494,10 @@ async def _run_activity(
await self._data_converter.encode_failure(
err, completion.result.failed.failure
)

# For broken executors, we have to fail the entire worker
if isinstance(err, concurrent.futures.BrokenExecutor):
self._fail_worker_exception_queue.put_nowait(err)
except Exception as inner_err:
temporalio.activity.logger.exception(
f"Exception handling failed, original error: {err}"
Expand Down
4 changes: 3 additions & 1 deletion temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def __init__(
activity_executor: Concurrent executor to use for non-async
activities. This is required if any activities are non-async. If
this is a :py:class:`concurrent.futures.ProcessPoolExecutor`,
all non-async activities must be picklable.
all non-async activities must be picklable. Note, a broken
executor failure from this executor will cause the worker to
fail and shutdown.
workflow_task_executor: Thread pool executor for workflow tasks. If
this is not present, a new
:py:class:`concurrent.futures.ThreadPoolExecutor` will be
Expand Down
53 changes: 53 additions & 0 deletions tests/worker/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import logging
import logging.handlers
import multiprocessing
import os
import queue
import signal
import threading
import time
import uuid
from concurrent.futures.process import BrokenProcessPool
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Callable, List, NoReturn, Optional, Sequence
Expand Down Expand Up @@ -919,6 +922,56 @@ async def test_sync_activity_process_worker_shutdown_graceful(
assert "Worker graceful shutdown" == await handle.result()


@activity.defn
def kill_my_process() -> str:
os.kill(os.getpid(), getattr(signal, "SIGKILL", -9))
return "does not get here"


async def test_sync_activity_process_executor_crash(
client: Client, worker: ExternalWorker
):
act_task_queue = str(uuid.uuid4())
with concurrent.futures.ProcessPoolExecutor() as executor:
act_worker = Worker(
client,
task_queue=act_task_queue,
activities=[kill_my_process],
activity_executor=executor,
graceful_shutdown_timeout=timedelta(seconds=2),
shared_state_manager=_default_shared_state_manager,
)
act_worker_task = asyncio.create_task(act_worker.run())

# Confirm workflow failure with broken pool
with pytest.raises(WorkflowFailureError) as workflow_err:
await client.execute_workflow(
"kitchen_sink",
KSWorkflowParams(
actions=[
KSAction(
execute_activity=KSExecuteActivityAction(
name="kill_my_process",
task_queue=act_task_queue,
heartbeat_timeout_ms=30000,
)
)
]
),
id=str(uuid.uuid4()),
task_queue=worker.task_queue,
)
assert isinstance(workflow_err.value.cause, ActivityError)
assert isinstance(workflow_err.value.cause.cause, ApplicationError)
assert workflow_err.value.cause.cause.type == "BrokenProcessPool"

# Also confirm that activity worker fails unrecoverably
with pytest.raises(RuntimeError) as worker_err:
await asyncio.wait_for(act_worker_task, 10)
assert str(worker_err.value) == "Activity worker failed"
assert isinstance(worker_err.value.__cause__, BrokenProcessPool)


class AsyncActivityWrapper:
def __init__(self) -> None:
self._info: Optional[activity.Info] = None
Expand Down

0 comments on commit fdf8b65

Please sign in to comment.