Human-friendly orchestration for background work: cooperative pause/resume/cancel, priorities, dependencies, scheduling (with repeat), progress, and supervision all in a small .NET 8+ library.
- Cooperative tasks: each
FlowTaskreceives(CancellationToken ct, Func<Task> wait)to pause/resume cleanly. - Orchestration primitives: priorities, dependencies, scheduling, optional repeating, and a singleton manager (
FlowTaskManager.Shared). - Observability: per-task and global events (
TaskStateChangedEvent/TaskStateTransitionEvent), progress reporting, timing, and snapshots. - Reliability: supervisor jobs with restart/stop/ignore strategies and retry limits.
- Persistence-friendly:
TaskSnapshotcaptures state/priority/progress for storage.
- .NET 8.0+ (targets net8.0, net9.0, net10.0).
dotnet add package FlowPilotusing FlowPilot;
using FlowPilot.Tasks;
var download = FlowTask.NewTask("Download", async (ct, wait) =>
{
for (int i = 0; i <= 100; i += 10)
{
await wait(); // cooperatively pause if requested
ct.ThrowIfCancellationRequested();
await Task.Delay(100, ct);
Console.WriteLine($"Downloading... {i}%");
}
})
.Managed()
.WithPriority(TaskPriority.High)
.Schedule(TimeSpan.FromSeconds(5)); // start 5s later
var process = FlowTask.NewTask("Process", async (ct, wait) =>
{
await Task.Delay(500, ct);
Console.WriteLine("Processing finished.");
})
.Managed()
.After(download); // run after Download completes
FlowTask.NewTask("Cleanup", async (ct, wait) =>
{
await wait();
await Task.Delay(250, ct);
Console.WriteLine("Cleanup done.");
})
.WithPriority(TaskPriority.Low)
.After(process)
.Managed();
FlowTaskManager.Shared.TaskChanged += e =>
Console.WriteLine($"[{e.Timestamp:T}] {e.TaskName} -> {e.NewState} ({e.Progress:0}%){(e.ErrorMessage != null ? " | Error: " + e.ErrorMessage : "")}");
await FlowTaskManager.Shared.RunAllAsync();await FlowTaskManager.Shared.PauseAllAsync();
await FlowTaskManager.Shared.ResumeAllAsync();
await FlowTaskManager.Shared.CancelAsync("Download");var export = FlowTask.NewTask("Export", async (ct, wait) =>
{
var progress = (IProgress<double>)export;
for (int i = 0; i <= 100; i += 5)
{
await wait();
ct.ThrowIfCancellationRequested();
await Task.Delay(50, ct);
progress.Report(i);
}
})
.Managed();
FlowTaskManager.Shared.TaskChanged += e =>
{
if (e.TaskName == "Export")
Console.WriteLine($"Export progress: {e.Progress:0}%");
};using FlowPilot.Supervision;
var job = DefaultSupervisorJobBuilder.Create()
.RestartFailed(maxRetries: 3, retryDelay: TimeSpan.FromSeconds(2))
.AddTasks(download, process)
.Build();
await job.StartAllAsync();Strategies (SupervisionStrategy):
RestartFailed(default): retry failed tasks up to a limit with delay.StopAll: cancel siblings when any task fails.Ignore: do nothing on failure.
FlowTask.NewTask(string name, Func<CancellationToken, Func<Task>, Task> action)Managed()register inFlowTaskManager.SharedWithPriority(TaskPriority priority)After(params FlowTask[] dependencies)Schedule(TimeSpan delay, bool repeat = false)StartAsync(),Pause()/PauseAsync(),Resume(),Cancel()Subscribe(IObserver<TaskStateChangedEvent>)Report(double progress)(0..100)Completion(awaitable snapshot)FlowTaskManager.Shared:RunAllAsync(),PauseAllAsync(),ResumeAllAsync(),CancelAllAsync(),CancelAsync(name),TaskChangedevent,WithAutoCleanup(bool)
src/library sourceTasks/task types and managerEvents/task events and transitionsSupervision/supervisor job, builder, strategies
tests/xUnit test suite (dotnet test)