Skip to content

DaskTaskRunner does not handle Dask exceptions #17384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
bnaul opened this issue Mar 5, 2025 · 0 comments · May be fixed by #17725
Open

DaskTaskRunner does not handle Dask exceptions #17384

bnaul opened this issue Mar 5, 2025 · 0 comments · May be fixed by #17725
Labels
bug Something isn't working

Comments

@bnaul
Copy link
Contributor

bnaul commented Mar 5, 2025

Bug summary

In PrefectDaskFuture.wait, it's assumed (per this comment) that either future.result() returns a State or times out. But there are other possible failure states described here that are not handled, and lead to a fairly cryptic failure message:

File ~/model/.venv/lib/python3.10/site-packages/prefect/states.py:509, in get_state_exception(state)
    507     default_message = "Run cancelled."
    508 else:
--> 509     raise ValueError(f"Expected failed or crashed state got {state!r}.")
    511 if isinstance(state.data, ResultRecord):
    512     result = state.data.result

ValueError: Expected failed or crashed state got Running(message='', type=RUNNING, result=None).

To reproduce, I am running a simple flow like below on a local dask cluster:

from time import sleep
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@flow(task_runner=DaskTaskRunner(address="localhost:8786"))
def wait_flow():
    @task
    def wait():
        sleep(30)
        return True
    result = wait.submit()
    return result

if __name__ == "__main__":
    wait_flow()

and killing the dask worker until the scheduler declares the task suspicious and gives up:

distributed.scheduler.KilledWorker: Attempted to run task slow_task-a25511e480b7951003007c0155e3f56c on 1 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:60949. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

Because of the except: return block linked above, we end up not returning any kind of State, leading to the Expected failed or crashed state got Running failure.

Not super familiar with the new State ontology yet but it seems like KilledWorker or CommError should probably result in a Crashed state?

cc Coiled folks @mrocklin @ntabris @jrbourbeau in case anyone has a stronger and better informed opinion than I on the proper behavior here 🙂 .

Version info

Version:             3.2.7
API version:         0.8.4
Python version:      3.10.16
Git commit:          d4d9001e
Built:               Fri, Feb 21, 2025 7:39 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         cloud
Pydantic version:    2.9.2
Integrations:
  prefect-dask:      0.3.2.dev1046+gbe1ba636e4.d20250305
  prefect-gcp:       0.6.2
  prefect-kubernetes: 0.5.3

Additional context

No response

@bnaul bnaul added the bug Something isn't working label Mar 5, 2025
@cicdw cicdw linked a pull request Apr 4, 2025 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant