-
Notifications
You must be signed in to change notification settings - Fork 6
Open
Description
Description
- Schedule a pipeline and wait for its result.
- The first task in the pipeline raises an exception and goes to retry.
- Retrieve the pipeline result as an error.
- The pipeline then successfully completes with the correct result.
Expected
Await the pipeline’s result while a task is retrying.
Actual
The first exception is returned as the result regardless of retries.
Code
import asyncio
import math
from taskiq import Context
from taskiq import SimpleRetryMiddleware
from taskiq import TaskiqDepends
from taskiq import InMemoryBroker
from taskiq_pipelines import Pipeline
from taskiq_pipelines import PipelineMiddleware
from taskiq_redis import RedisAsyncResultBackend
from taskiq_redis import RedisStreamBroker
result_backend = RedisAsyncResultBackend(
redis_url="redis://localhost:6379",
)
broker = (
RedisStreamBroker(
url="redis://localhost:6379",
)
.with_middlewares(
PipelineMiddleware(),
SimpleRetryMiddleware(default_retry_count=3),
)
.with_result_backend(result_backend)
)
check_interval = 0.2
@broker.task("power", retry_on_error=True)
async def power(x: int, y: int = 2, context: "Context" = TaskiqDepends()) -> int:
if context.message.labels.get("_retries", 0) == 0:
raise ValueError()
await asyncio.sleep(check_interval * 10)
result = x**y
print(f"{x} ** {y} = {result}")
return result
@broker.task("sqrt", retry_on_error=True)
async def sqrt(value: int) -> float:
result = math.sqrt(value)
print(f"sqrt({value}) = {result}")
return result #
pipeline = Pipeline(broker).call_next(power).call_next(sqrt)
async def main():
task = await pipeline.kiq(2)
result = await task.wait_result(check_interval=check_interval)
print("result:", result)
if __name__ == "__main__":
asyncio.run(main())
Metadata
Metadata
Assignees
Labels
No labels