You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In PrefectDaskFuture.wait, it's assumed (per this comment) that either future.result() returns a State or times out. But there are other possible failure states described here that are not handled, and lead to a fairly cryptic failure message:
File ~/model/.venv/lib/python3.10/site-packages/prefect/states.py:509, in get_state_exception(state)
507 default_message = "Run cancelled."
508 else:
--> 509 raise ValueError(f"Expected failed or crashed state got {state!r}.")
511 if isinstance(state.data, ResultRecord):
512 result = state.data.result
ValueError: Expected failed or crashed state got Running(message='', type=RUNNING, result=None).
To reproduce, I am running a simple flow like below on a local dask cluster:
from time import sleep
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(address="localhost:8786"))
def wait_flow():
@task
def wait():
sleep(30)
return True
result = wait.submit()
return result
if __name__ == "__main__":
wait_flow()
and killing the dask worker until the scheduler declares the task suspicious and gives up:
distributed.scheduler.KilledWorker: Attempted to run task slow_task-a25511e480b7951003007c0155e3f56c on 1 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:60949. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.
Because of the except: return block linked above, we end up not returning any kind of State, leading to the Expected failed or crashed state got Running failure.
Not super familiar with the new State ontology yet but it seems like KilledWorker or CommError should probably result in a Crashed state?
cc Coiled folks @mrocklin@ntabris@jrbourbeau in case anyone has a stronger and better informed opinion than I on the proper behavior here 🙂 .
Bug summary
In
PrefectDaskFuture.wait
, it's assumed (per this comment) that eitherfuture.result()
returns aState
or times out. But there are other possible failure states described here that are not handled, and lead to a fairly cryptic failure message:To reproduce, I am running a simple flow like below on a local dask cluster:
and killing the dask worker until the scheduler declares the task suspicious and gives up:
Because of the
except: return
block linked above, we end up not returning any kind of State, leading to theExpected failed or crashed state got Running
failure.Not super familiar with the new State ontology yet but it seems like
KilledWorker
orCommError
should probably result in aCrashed
state?cc Coiled folks @mrocklin @ntabris @jrbourbeau in case anyone has a stronger and better informed opinion than I on the proper behavior here 🙂 .
Version info
Additional context
No response
The text was updated successfully, but these errors were encountered: