Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENG-1513] Better integration between Jobs and processing Actors #1974

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ To run the landing page:

If you encounter any issues, ensure that you are using the following versions of Rust, Node and Pnpm:

- Rust version: **1.73.0**
- Rust version: **1.75.0**
- Node version: **18.17**
- Pnpm version: **8.0.0**

Expand Down
39 changes: 39 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ swift-rs = { version = "1.0.6" }
# Third party dependencies used by one or more of our crates
anyhow = "1.0.75"
async-channel = "2.0.0"
async-trait = "0.1.77"
axum = "0.6.20"
base64 = "0.21.5"
blake3 = "1.5.0"
Expand Down
4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "sd-core"
version = "0.2.4"
description = "Virtual distributed filesystem engine that powers Spacedrive."
authors = ["Spacedrive Technology Inc."]
rust-version = "1.73.0"
rust-version = "1.75.0"
license = { workspace = true }
repository = { workspace = true }
edition = { workspace = true }
Expand Down Expand Up @@ -51,6 +51,7 @@ sd-cloud-api = { version = "0.1.0", path = "../crates/cloud-api" }

# Workspace dependencies
async-channel = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true }
base64 = { workspace = true }
blake3 = { workspace = true }
Expand Down Expand Up @@ -100,7 +101,6 @@ webp = { workspace = true }
# Specific Core dependencies
async-recursion = "1.0.5"
async-stream = "0.3.5"
async-trait = "^0.1.74"
bytes = "1.5.0"
ctor = "0.2.5"
directories = "5.0.1"
Expand Down
2 changes: 1 addition & 1 deletion crates/ai/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.1.0"
authors = ["Ericson Soares <ericson@spacedrive.com>"]
readme = "README.md"
description = "A simple library to generate video thumbnails using ffmpeg with the webp format"
rust-version = "1.73.0"
rust-version = "1.75.0"
license = { workspace = true }
repository = { workspace = true }
edition = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/file-path-helper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "sd-file-path-helper"
version = "0.1.0"
authors = ["Ericson Soares <ericson@spacedrive.com>"]
readme = "README.md"
rust-version = "1.73.0"
rust-version = "1.75.0"
license = { workspace = true }
repository = { workspace = true }
edition = { workspace = true }
Expand Down
42 changes: 42 additions & 0 deletions crates/task-system/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
[package]
name = "sd-task-system"
version = "0.1.0"
authors = ["Ericson \"Fogo\" Soares <ericson.ds999@gmail.com>"]
rust-version = "1.75.0"
license.workspace = true
edition.workspace = true
repository.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
# Workspace deps
async-channel = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
futures-concurrency = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = [
"sync",
"parking_lot",
"rt-multi-thread",
"time",
] }
tokio-stream = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true, features = ["v4"] }

# External deps
downcast-rs = "1.2.0"
pin-project = "1.1.4"

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "test-util", "fs"] }
tempfile = { workspace = true }
rand = "0.8.5"
tracing-test = { version = "^0.2.4", features = ["no-env-filter"] }
thiserror = { workspace = true }
lending-stream = "1.0.0"
serde = { workspace = true, features = ["derive"] }
rmp-serde = { workspace = true }
uuid = { workspace = true, features = ["serde"] }
28 changes: 28 additions & 0 deletions crates/task-system/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use std::{error::Error, fmt};

use super::task::TaskId;

/// Task system's error type definition, representing when internal errors occurs.
#[derive(Debug, thiserror::Error)]
pub enum SystemError {
#[error("task not found <task_id='{0}'>")]
TaskNotFound(TaskId),
#[error("task aborted <task_id='{0}'>")]
TaskAborted(TaskId),
#[error("task join error <task_id='{0}'>")]
TaskJoin(TaskId),
#[error("forced abortion for task <task_id='{0}'> timed out")]
TaskForcedAbortTimeout(TaskId),
}

/// Trait for errors that can be returned by tasks, we use this trait as a bound for the task system generic
/// error type.
///
///With this trait, we can have a unified error type through all the tasks in the system.
pub trait RunError: Error + fmt::Debug + Send + Sync + 'static {}

/// We provide a blanket implementation for all types that also implements
/// [`std::error::Error`](https://doc.rust-lang.org/std/error/trait.Error.html) and
/// [`std::fmt::Debug`](https://doc.rust-lang.org/std/fmt/trait.Debug.html).
/// So you will not need to implement this trait for your error type, just implement the `Error` and `Debug`
impl<T: Error + fmt::Debug + Send + Sync + 'static> RunError for T {}
71 changes: 71 additions & 0 deletions crates/task-system/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//!
//! # Task System
//!
//! Spacedrive's Task System is a library that provides a way to manage and execute tasks in a concurrent
//! and parallel environment.
//!
//! Just bring your own unified error type and dispatch some tasks, the system will handle enqueueing,
//! parallel execution, and error handling for you. Aside from some niceties like:
//! - Round robin scheduling between workers following the available CPU cores on the user machine;
//! - Work stealing between workers for better load balancing;
//! - Gracefully pause and cancel tasks;
//! - Forced abortion of tasks;
//! - Prioritizing tasks that will suspend running tasks without priority;
//! - When the system is shutdown, it will return all pending and running tasks to theirs dispatchers, so the user can store them on disk or any other storage to be re-dispatched later;
//!
//!
//! ## Basic example
//!
//! ```
//! use sd_task_system::{TaskSystem, Task, TaskId, ExecStatus, TaskOutput, Interrupter, TaskStatus};
//! use async_trait::async_trait;
//! use thiserror::Error;
//!
//! #[derive(Debug, Error)]
//! pub enum SampleError {
//! #[error("Sample error")]
//! SampleError,
//! }
//!
//! #[derive(Debug)]
//! pub struct ReadyTask {
//! id: TaskId,
//! }
//!
//! #[async_trait]
//! impl Task<SampleError> for ReadyTask {
//! fn id(&self) -> TaskId {
//! self.id
//! }
//!
//! async fn run(&mut self, _interrupter: &Interrupter) -> Result<ExecStatus, SampleError> {
//! Ok(ExecStatus::Done(TaskOutput::Empty))
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//! let system = TaskSystem::new();
//!
//! let handle = system.dispatch(ReadyTask { id: TaskId::new_v4() }).await;
//!
//! assert!(matches!(
//! handle.await,
//! Ok(TaskStatus::Done(TaskOutput::Empty))
//! ));
//!
//! system.shutdown().await;
//! }
//! ```
mod error;
mod message;
mod system;
mod task;
mod worker;

pub use error::{RunError, SystemError as TaskSystemError};
pub use system::{Dispatcher as TaskDispatcher, System as TaskSystem};
pub use task::{
AnyTaskOutput, ExecStatus, Interrupter, InterrupterFuture, InterruptionKind, IntoAnyTaskOutput,
IntoTask, Task, TaskHandle, TaskId, TaskOutput, TaskStatus,
};
63 changes: 63 additions & 0 deletions crates/task-system/src/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use tokio::sync::oneshot;

use super::{
error::{RunError, SystemError},
task::{TaskId, TaskWorkState},
worker::WorkerId,
};

#[derive(Debug)]
pub(crate) enum SystemMessage {
IdleReport(WorkerId),
WorkingReport(WorkerId),
ResumeTask {
task_id: TaskId,
worker_id: WorkerId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
PauseNotRunningTask {
task_id: TaskId,
worker_id: WorkerId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
CancelNotRunningTask {
task_id: TaskId,
worker_id: WorkerId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
ForceAbortion {
task_id: TaskId,
worker_id: WorkerId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
NotifyIdleWorkers {
start_from: WorkerId,
task_count: usize,
},
ShutdownRequest(oneshot::Sender<Result<(), SystemError>>),
}

#[derive(Debug)]
pub(crate) enum WorkerMessage<E: RunError> {
NewTask(TaskWorkState<E>),
TaskCountRequest(oneshot::Sender<usize>),
ResumeTask {
task_id: TaskId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
PauseNotRunningTask {
task_id: TaskId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
CancelNotRunningTask {
task_id: TaskId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
ForceAbortion {
task_id: TaskId,
ack: oneshot::Sender<Result<(), SystemError>>,
},
ShutdownRequest(oneshot::Sender<()>),
StealRequest(oneshot::Sender<Option<TaskWorkState<E>>>),
WakeUp,
}
Loading
Loading