You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
canceling a flow run with a task that submits work to a ProcessPoolExecutor surfaces the following serialization error (where MockValSer cannot be converted to SchemaSerializer when dumping a model inside of Prefect?) on the flow run's pod, and leaves the Prefect UI stuck in a "canceling" state. this is seen when using a prefect-kubernetes work pool.
stack trace from the pod's logs - right after canceling from the UI:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 586, in execute_flow_run
async with anyio.create_task_group() as tg:
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 740, in __aexit__
raise exc_val
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 714, in __aexit__
await asyncio.wait(self._tasks)
File "/usr/local/lib/python3.10/asyncio/tasks.py", line 384, in wait
return await _wait(fs, timeout, return_when, loop)
File "/usr/local/lib/python3.10/asyncio/tasks.py", line 491, in _wait
await waiter
asyncio.exceptions.CancelledError: Cancelled by cancel scope ffffa422bfa0
During handling of the above exception, another exception occurred:
+ Exception Group Traceback (most recent call last):
| File "/usr/local/lib/python3.10/site-packages/prefect/cli/_utilities.py", line 44, in wrapper
| return fn(*args, **kwargs)
| File "/usr/local/lib/python3.10/site-packages/prefect/cli/_types.py", line 156, in sync_fn
| return asyncio.run(async_fn(*args, **kwargs))
| File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
| return loop.run_until_complete(main)
| File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
| return future.result()
| File "/usr/local/lib/python3.10/site-packages/prefect/cli/flow_run.py", line 375, in execute
| await runner.execute_flow_run(id)
| File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 582, in execute_flow_run
| async with context:
| File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 1676, in __aexit__
| await self._runs_task_group.__aexit__(*exc_info)
| File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 736, in __aexit__
| raise BaseExceptionGroup(
| exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 1111, in _cancel_run
| await self._mark_flow_run_as_cancelled(
| File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 1539, in _mark_flow_run_as_cancelled
| await self._client.set_flow_run_state(flow_run.id, state, force=True)
| File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration/_flow_runs/client.py", line 804, in set_flow_run_state
| state=state_create.model_dump(mode="json", serialize_as_any=True),
| File "/usr/local/lib/python3.10/site-packages/pydantic/main.py", line 390, in model_dump
| return self.__pydantic_serializer__.to_python(
| TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'
+------------------------------------
An exception occurred.
MRE:
import asyncio
import random
import time
from concurrent.futures import ProcessPoolExecutor
from typing import List
from prefect import flow, task
from prefect.task_runners
from prefect.cache_policies import NO_CACHE
def cpu_intensive_function(batch_id: int, items: List[int]) -> List[int]:
print(f"Processing batch {batch_id} with {len(items)} items")
results = []
for item in items:
start = time.time()
processing_time = random.uniform(0.1, 0.5)
time.sleep(processing_time) # Simulate CPU work
result = sum(i * i for i in range(item, item + 10000))
results.append(result)
elapsed = time.time() - start
print(f"Batch {batch_id}: Item {item} processed in {elapsed:.2f}s")
return results
@task(cache_policy=NO_CACHE)
async def process_batch(
executor: ProcessPoolExecutor,
semaphore: asyncio.Semaphore,
batch_id: int,
batch_data: List[int],
) -> List[int]:
print(f"Task started for batch {batch_id}")
async with semaphore:
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(
executor, cpu_intensive_function, batch_id, batch_data
)
print(f"Task completed for batch {batch_id}")
return results
async def intermediary_function(data: List[int]) -> List[List[int]]:
print("Starting intermediary function")
batch_size = max(1, len(data) // 10)
batches = [data[i : i + batch_size] for i in range(0, len(data), batch_size)][:10]
semaphore = asyncio.Semaphore(3)
# Create ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = []
for batch_id, batch_data in enumerate(batches):
task_result = await process_batch(executor, semaphore, batch_id, batch_data)
tasks.append(task_result)
print("Completed intermediary function")
return tasks
@flow(name="Process Data With Pool")
async def entrypoint():
print("Starting main flow")
data = list(range(100))
results = await intermediary_function(data)
total_results = sum(len(batch) for batch in results)
print(f"Processed {total_results} items across {len(results)} batches")
return results
if __name__ == "__main__":
asyncio.run(entrypoint())
ProcessPoolExecutor farms work out using the multiprocessing lib (concurrent.futures docs) - note that the pod backs off as expected when canceled, but the flow run state does not appear to get set successfully because of the above error.
this could be related to #12075 and expanding support for multiprocessing in Prefect
The text was updated successfully, but these errors were encountered:
Uh oh!
There was an error while loading. Please reload this page.
Bug summary
canceling a flow run with a task that submits work to a
ProcessPoolExecutor
surfaces the following serialization error (whereMockValSer
cannot be converted toSchemaSerializer
when dumping a model inside of Prefect?) on the flow run's pod, and leaves the Prefect UI stuck in a "canceling" state. this is seen when using aprefect-kubernetes
work pool.stack trace from the pod's logs - right after canceling from the UI:
MRE:
Version info
Additional context
ProcessPoolExecutor
farms work out using the multiprocessing lib (concurrent.futures docs) - note that the pod backs off as expected when canceled, but the flow run state does not appear to get set successfully because of the above error.this could be related to #12075 and expanding support for
multiprocessing
in PrefectThe text was updated successfully, but these errors were encountered: