Skip to content

serialization error on canceling a flow run when using ProcessPoolExecutor #17848

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
rcash opened this issue Apr 17, 2025 · 0 comments
Open
Labels
bug Something isn't working

Comments

@rcash
Copy link
Contributor

rcash commented Apr 17, 2025

Bug summary

canceling a flow run with a task that submits work to a ProcessPoolExecutor surfaces the following serialization error (where MockValSer cannot be converted to SchemaSerializer when dumping a model inside of Prefect?) on the flow run's pod, and leaves the Prefect UI stuck in a "canceling" state. this is seen when using a prefect-kubernetes work pool.

stack trace from the pod's logs - right after canceling from the UI:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 586, in execute_flow_run
    async with anyio.create_task_group() as tg:
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 740, in __aexit__
    raise exc_val
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 714, in __aexit__
    await asyncio.wait(self._tasks)
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 384, in wait
    return await _wait(fs, timeout, return_when, loop)
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 491, in _wait
    await waiter
asyncio.exceptions.CancelledError: Cancelled by cancel scope ffffa422bfa0

During handling of the above exception, another exception occurred:

  + Exception Group Traceback (most recent call last):
  |   File "/usr/local/lib/python3.10/site-packages/prefect/cli/_utilities.py", line 44, in wrapper
  |     return fn(*args, **kwargs)
  |   File "/usr/local/lib/python3.10/site-packages/prefect/cli/_types.py", line 156, in sync_fn
  |     return asyncio.run(async_fn(*args, **kwargs))
  |   File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
  |     return loop.run_until_complete(main)
  |   File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
  |     return future.result()
  |   File "/usr/local/lib/python3.10/site-packages/prefect/cli/flow_run.py", line 375, in execute
  |     await runner.execute_flow_run(id)
  |   File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 582, in execute_flow_run
  |     async with context:
  |   File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 1676, in __aexit__
  |     await self._runs_task_group.__aexit__(*exc_info)
  |   File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 736, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 1111, in _cancel_run
    |     await self._mark_flow_run_as_cancelled(
    |   File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 1539, in _mark_flow_run_as_cancelled
    |     await self._client.set_flow_run_state(flow_run.id, state, force=True)
    |   File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration/_flow_runs/client.py", line 804, in set_flow_run_state
    |     state=state_create.model_dump(mode="json", serialize_as_any=True),
    |   File "/usr/local/lib/python3.10/site-packages/pydantic/main.py", line 390, in model_dump
    |     return self.__pydantic_serializer__.to_python(
    | TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'
    +------------------------------------
An exception occurred.

MRE:

import asyncio
import random
import time
from concurrent.futures import ProcessPoolExecutor
from typing import List

from prefect import flow, task
from prefect.task_runners
from prefect.cache_policies import NO_CACHE


def cpu_intensive_function(batch_id: int, items: List[int]) -> List[int]:
    print(f"Processing batch {batch_id} with {len(items)} items")

    results = []
    for item in items:
        start = time.time()
        processing_time = random.uniform(0.1, 0.5)
        time.sleep(processing_time)  # Simulate CPU work

        result = sum(i * i for i in range(item, item + 10000))
        results.append(result)

        elapsed = time.time() - start
        print(f"Batch {batch_id}: Item {item} processed in {elapsed:.2f}s")

    return results

@task(cache_policy=NO_CACHE)
async def process_batch(
    executor: ProcessPoolExecutor,
    semaphore: asyncio.Semaphore,
    batch_id: int,
    batch_data: List[int],
) -> List[int]:
    print(f"Task started for batch {batch_id}")

    async with semaphore:
        loop = asyncio.get_event_loop()
        results = await loop.run_in_executor(
            executor, cpu_intensive_function, batch_id, batch_data
        )

    print(f"Task completed for batch {batch_id}")
    return results


async def intermediary_function(data: List[int]) -> List[List[int]]:
    print("Starting intermediary function")

    batch_size = max(1, len(data) // 10)
    batches = [data[i : i + batch_size] for i in range(0, len(data), batch_size)][:10]

    semaphore = asyncio.Semaphore(3)

    # Create ProcessPoolExecutor
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = []
        for batch_id, batch_data in enumerate(batches):
            task_result = await process_batch(executor, semaphore, batch_id, batch_data)
            tasks.append(task_result)

    print("Completed intermediary function")
    return tasks


@flow(name="Process Data With Pool")
async def entrypoint():
    print("Starting main flow")

    data = list(range(100))

    results = await intermediary_function(data)

    total_results = sum(len(batch) for batch in results)
    print(f"Processed {total_results} items across {len(results)} batches")

    return results


if __name__ == "__main__":
    asyncio.run(entrypoint())

Version info

Version:             3.1.14
API version:         0.8.4
Python version:      3.12.9
Git commit:          5f1ebb57
Built:               Thu, Jan 23, 2025 1:22 PM
OS/Arch:             darwin/arm64
Profile:             sbx-rowdy-cloud
Server type:         cloud
Pydantic version:    2.9.2
Integrations:
  prefect-docker:    0.6.2
  prefect-kubernetes: 0.5.3

Additional context

ProcessPoolExecutor farms work out using the multiprocessing lib (concurrent.futures docs) - note that the pod backs off as expected when canceled, but the flow run state does not appear to get set successfully because of the above error.

this could be related to #12075 and expanding support for multiprocessing in Prefect

@rcash rcash added the bug Something isn't working label Apr 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant