Description
Bug summary
In PrefectDaskFuture.wait
, it's assumed (per this comment) that either future.result()
returns a State
or times out. But there are other possible failure states described here that are not handled, and lead to a fairly cryptic failure message:
File ~/model/.venv/lib/python3.10/site-packages/prefect/states.py:509, in get_state_exception(state)
507 default_message = "Run cancelled."
508 else:
--> 509 raise ValueError(f"Expected failed or crashed state got {state!r}.")
511 if isinstance(state.data, ResultRecord):
512 result = state.data.result
ValueError: Expected failed or crashed state got Running(message='', type=RUNNING, result=None).
To reproduce, I am running a simple flow like below on a local dask cluster:
from time import sleep
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(address="localhost:8786"))
def wait_flow():
@task
def wait():
sleep(30)
return True
result = wait.submit()
return result
if __name__ == "__main__":
wait_flow()
and killing the dask worker until the scheduler declares the task suspicious and gives up:
distributed.scheduler.KilledWorker: Attempted to run task slow_task-a25511e480b7951003007c0155e3f56c on 1 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:60949. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.
Because of the except: return
block linked above, we end up not returning any kind of State, leading to the Expected failed or crashed state got Running
failure.
Not super familiar with the new State ontology yet but it seems like KilledWorker
or CommError
should probably result in a Crashed
state?
cc Coiled folks @mrocklin @ntabris @jrbourbeau in case anyone has a stronger and better informed opinion than I on the proper behavior here 🙂 .
Version info
Version: 3.2.7
API version: 0.8.4
Python version: 3.10.16
Git commit: d4d9001e
Built: Fri, Feb 21, 2025 7:39 PM
OS/Arch: darwin/arm64
Profile: default
Server type: cloud
Pydantic version: 2.9.2
Integrations:
prefect-dask: 0.3.2.dev1046+gbe1ba636e4.d20250305
prefect-gcp: 0.6.2
prefect-kubernetes: 0.5.3
Additional context
No response