Skip to content

DaskTaskRunner does not handle Dask exceptions #17384

Open
@bnaul

Description

@bnaul

Bug summary

In PrefectDaskFuture.wait, it's assumed (per this comment) that either future.result() returns a State or times out. But there are other possible failure states described here that are not handled, and lead to a fairly cryptic failure message:

File ~/model/.venv/lib/python3.10/site-packages/prefect/states.py:509, in get_state_exception(state)
    507     default_message = "Run cancelled."
    508 else:
--> 509     raise ValueError(f"Expected failed or crashed state got {state!r}.")
    511 if isinstance(state.data, ResultRecord):
    512     result = state.data.result

ValueError: Expected failed or crashed state got Running(message='', type=RUNNING, result=None).

To reproduce, I am running a simple flow like below on a local dask cluster:

from time import sleep
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@flow(task_runner=DaskTaskRunner(address="localhost:8786"))
def wait_flow():
    @task
    def wait():
        sleep(30)
        return True
    result = wait.submit()
    return result

if __name__ == "__main__":
    wait_flow()

and killing the dask worker until the scheduler declares the task suspicious and gives up:

distributed.scheduler.KilledWorker: Attempted to run task slow_task-a25511e480b7951003007c0155e3f56c on 1 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:60949. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

Because of the except: return block linked above, we end up not returning any kind of State, leading to the Expected failed or crashed state got Running failure.

Not super familiar with the new State ontology yet but it seems like KilledWorker or CommError should probably result in a Crashed state?

cc Coiled folks @mrocklin @ntabris @jrbourbeau in case anyone has a stronger and better informed opinion than I on the proper behavior here 🙂 .

Version info

Version:             3.2.7
API version:         0.8.4
Python version:      3.10.16
Git commit:          d4d9001e
Built:               Fri, Feb 21, 2025 7:39 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         cloud
Pydantic version:    2.9.2
Integrations:
  prefect-dask:      0.3.2.dev1046+gbe1ba636e4.d20250305
  prefect-gcp:       0.6.2
  prefect-kubernetes: 0.5.3

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions