-
-
Notifications
You must be signed in to change notification settings - Fork 957
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[ENG-1513] Better integration between Jobs and processing Actors (#1974)
* First draft on new task system * Removing save to disk from task system * Bunch of concurrency issues * Solving Future impl issue when pausing tasks * Fix cancel and abort * Bunch of fixes on pause, suspend, resume, cancel and abort Also better error handling on task completion for the user * New capabilities to return an output on a task * Introducing a simple way to linear backoff on failed steal * Sample actor where tasks can dispatch more tasks * Rustfmt * Steal test to make sure * Stale deps cleanup * Removing unused utils * Initial lib docs * Docs ok * Memory cleanup on idle --------- Co-authored-by: Vítor Vasconcellos <vasconcellos.dev@gmail.com>
- Loading branch information
1 parent
53713a9
commit dba85eb
Showing
22 changed files
with
4,064 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
[package] | ||
name = "sd-task-system" | ||
version = "0.1.0" | ||
authors = ["Ericson \"Fogo\" Soares <ericson.ds999@gmail.com>"] | ||
rust-version = "1.75.0" | ||
license.workspace = true | ||
edition.workspace = true | ||
repository.workspace = true | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
# Workspace deps | ||
async-channel = { workspace = true } | ||
async-trait = { workspace = true } | ||
futures = { workspace = true } | ||
futures-concurrency = { workspace = true } | ||
thiserror = { workspace = true } | ||
tokio = { workspace = true, features = [ | ||
"sync", | ||
"parking_lot", | ||
"rt-multi-thread", | ||
"time", | ||
] } | ||
tokio-stream = { workspace = true } | ||
tracing = { workspace = true } | ||
uuid = { workspace = true, features = ["v4"] } | ||
|
||
# External deps | ||
downcast-rs = "1.2.0" | ||
pin-project = "1.1.4" | ||
|
||
[dev-dependencies] | ||
tokio = { workspace = true, features = ["macros", "test-util", "fs"] } | ||
tempfile = { workspace = true } | ||
rand = "0.8.5" | ||
tracing-test = { version = "^0.2.4", features = ["no-env-filter"] } | ||
thiserror = { workspace = true } | ||
lending-stream = "1.0.0" | ||
serde = { workspace = true, features = ["derive"] } | ||
rmp-serde = { workspace = true } | ||
uuid = { workspace = true, features = ["serde"] } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
use std::{error::Error, fmt}; | ||
|
||
use super::task::TaskId; | ||
|
||
/// Task system's error type definition, representing when internal errors occurs. | ||
#[derive(Debug, thiserror::Error)] | ||
pub enum SystemError { | ||
#[error("task not found <task_id='{0}'>")] | ||
TaskNotFound(TaskId), | ||
#[error("task aborted <task_id='{0}'>")] | ||
TaskAborted(TaskId), | ||
#[error("task join error <task_id='{0}'>")] | ||
TaskJoin(TaskId), | ||
#[error("forced abortion for task <task_id='{0}'> timed out")] | ||
TaskForcedAbortTimeout(TaskId), | ||
} | ||
|
||
/// Trait for errors that can be returned by tasks, we use this trait as a bound for the task system generic | ||
/// error type. | ||
/// | ||
///With this trait, we can have a unified error type through all the tasks in the system. | ||
pub trait RunError: Error + fmt::Debug + Send + Sync + 'static {} | ||
|
||
/// We provide a blanket implementation for all types that also implements | ||
/// [`std::error::Error`](https://doc.rust-lang.org/std/error/trait.Error.html) and | ||
/// [`std::fmt::Debug`](https://doc.rust-lang.org/std/fmt/trait.Debug.html). | ||
/// So you will not need to implement this trait for your error type, just implement the `Error` and `Debug` | ||
impl<T: Error + fmt::Debug + Send + Sync + 'static> RunError for T {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
//! | ||
//! # Task System | ||
//! | ||
//! Spacedrive's Task System is a library that provides a way to manage and execute tasks in a concurrent | ||
//! and parallel environment. | ||
//! | ||
//! Just bring your own unified error type and dispatch some tasks, the system will handle enqueueing, | ||
//! parallel execution, and error handling for you. Aside from some niceties like: | ||
//! - Round robin scheduling between workers following the available CPU cores on the user machine; | ||
//! - Work stealing between workers for better load balancing; | ||
//! - Gracefully pause and cancel tasks; | ||
//! - Forced abortion of tasks; | ||
//! - Prioritizing tasks that will suspend running tasks without priority; | ||
//! - When the system is shutdown, it will return all pending and running tasks to theirs dispatchers, so the user can store them on disk or any other storage to be re-dispatched later; | ||
//! | ||
//! | ||
//! ## Basic example | ||
//! | ||
//! ``` | ||
//! use sd_task_system::{TaskSystem, Task, TaskId, ExecStatus, TaskOutput, Interrupter, TaskStatus}; | ||
//! use async_trait::async_trait; | ||
//! use thiserror::Error; | ||
//! | ||
//! #[derive(Debug, Error)] | ||
//! pub enum SampleError { | ||
//! #[error("Sample error")] | ||
//! SampleError, | ||
//! } | ||
//! | ||
//! #[derive(Debug)] | ||
//! pub struct ReadyTask { | ||
//! id: TaskId, | ||
//! } | ||
//! | ||
//! #[async_trait] | ||
//! impl Task<SampleError> for ReadyTask { | ||
//! fn id(&self) -> TaskId { | ||
//! self.id | ||
//! } | ||
//! | ||
//! async fn run(&mut self, _interrupter: &Interrupter) -> Result<ExecStatus, SampleError> { | ||
//! Ok(ExecStatus::Done(TaskOutput::Empty)) | ||
//! } | ||
//! } | ||
//! | ||
//! #[tokio::main] | ||
//! async fn main() { | ||
//! let system = TaskSystem::new(); | ||
//! | ||
//! let handle = system.dispatch(ReadyTask { id: TaskId::new_v4() }).await; | ||
//! | ||
//! assert!(matches!( | ||
//! handle.await, | ||
//! Ok(TaskStatus::Done(TaskOutput::Empty)) | ||
//! )); | ||
//! | ||
//! system.shutdown().await; | ||
//! } | ||
//! ``` | ||
mod error; | ||
mod message; | ||
mod system; | ||
mod task; | ||
mod worker; | ||
|
||
pub use error::{RunError, SystemError as TaskSystemError}; | ||
pub use system::{Dispatcher as TaskDispatcher, System as TaskSystem}; | ||
pub use task::{ | ||
AnyTaskOutput, ExecStatus, Interrupter, InterrupterFuture, InterruptionKind, IntoAnyTaskOutput, | ||
IntoTask, Task, TaskHandle, TaskId, TaskOutput, TaskStatus, | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
use tokio::sync::oneshot; | ||
|
||
use super::{ | ||
error::{RunError, SystemError}, | ||
task::{TaskId, TaskWorkState}, | ||
worker::WorkerId, | ||
}; | ||
|
||
#[derive(Debug)] | ||
pub(crate) enum SystemMessage { | ||
IdleReport(WorkerId), | ||
WorkingReport(WorkerId), | ||
ResumeTask { | ||
task_id: TaskId, | ||
worker_id: WorkerId, | ||
ack: oneshot::Sender<Result<(), SystemError>>, | ||
}, | ||
PauseNotRunningTask { | ||
task_id: TaskId, | ||
worker_id: WorkerId, | ||
ack: oneshot::Sender<Result<(), SystemError>>, | ||
}, | ||
CancelNotRunningTask { | ||
task_id: TaskId, | ||
worker_id: WorkerId, | ||
ack: oneshot::Sender<Result<(), SystemError>>, | ||
}, | ||
ForceAbortion { | ||
task_id: TaskId, | ||
worker_id: WorkerId, | ||
ack: oneshot::Sender<Result<(), SystemError>>, | ||
}, | ||
NotifyIdleWorkers { | ||
start_from: WorkerId, | ||
task_count: usize, | ||
}, | ||
ShutdownRequest(oneshot::Sender<Result<(), SystemError>>), | ||
} | ||
|
||
#[derive(Debug)] | ||
pub(crate) enum WorkerMessage<E: RunError> { | ||
NewTask(TaskWorkState<E>), | ||
TaskCountRequest(oneshot::Sender<usize>), | ||
ResumeTask { | ||
task_id: TaskId, | ||
ack: oneshot::Sender<Result<(), SystemError>>, | ||
}, | ||
PauseNotRunningTask { | ||
task_id: TaskId, | ||
ack: oneshot::Sender<Result<(), SystemError>>, | ||
}, | ||
CancelNotRunningTask { | ||
task_id: TaskId, | ||
ack: oneshot::Sender<Result<(), SystemError>>, | ||
}, | ||
ForceAbortion { | ||
task_id: TaskId, | ||
ack: oneshot::Sender<Result<(), SystemError>>, | ||
}, | ||
ShutdownRequest(oneshot::Sender<()>), | ||
StealRequest(oneshot::Sender<Option<TaskWorkState<E>>>), | ||
WakeUp, | ||
} |
Oops, something went wrong.