Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel runs if no progress is made in the manifest #12

Merged
merged 13 commits into from Apr 5, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -43,6 +43,7 @@ tokio = { version = "1.26.0", features = [
] }
tokio-rustls = "0.23.4"
futures = "0.3.27"
pin-project-lite = "0.2.9"

parking_lot = "0.12.1"
moka = { version = "0.10.0", features = ["future"] }
Expand Down
4 changes: 4 additions & 0 deletions crates/abq_cli/src/instance.rs
@@ -1,6 +1,7 @@
use abq_queue::persistence;
use abq_queue::persistence::remote::NoopPersister;
use abq_queue::queue::{Abq, QueueConfig};
use abq_queue::RunTimeoutStrategy;
use abq_utils::auth::{AdminToken, ServerAuthStrategy, UserToken};
use abq_utils::exit::ExitCode;
use abq_utils::net_opt::ServerOptions;
Expand Down Expand Up @@ -57,6 +58,8 @@ pub async fn start_abq_forever(
RESULTS_PERSISTENCE_LRU_CAPACITY,
);

let run_timeout_strategy = RunTimeoutStrategy::RUN_BASED;

let queue_config = QueueConfig {
public_ip,
bind_ip,
Expand All @@ -66,6 +69,7 @@ pub async fn start_abq_forever(
server_options,
persist_manifest,
persist_results,
run_timeout_strategy,
};
let mut abq = Abq::start(queue_config).await;

Expand Down
1 change: 1 addition & 0 deletions crates/abq_queue/Cargo.toml
Expand Up @@ -23,6 +23,7 @@ tracing.workspace = true
tokio.workspace = true
futures.workspace = true
async-trait.workspace = true
pin-project-lite.workspace = true

anyhow.workspace = true

Expand Down
5 changes: 5 additions & 0 deletions crates/abq_queue/src/job_queue.rs
Expand Up @@ -120,6 +120,11 @@ impl JobQueue {

persistence::manifest::ManifestView::new(queue, assigned_entities)
}

/// Reads the current index at a given point in time. Not atomic.
pub fn read_index(&self) -> usize {
self.ptr.load(atomic::ORDERING)
}
}

#[cfg(test)]
Expand Down
3 changes: 3 additions & 0 deletions crates/abq_queue/src/lib.rs
Expand Up @@ -5,5 +5,8 @@ mod prelude;
mod job_queue;
pub mod persistence;
pub mod queue;
mod timeout;
mod worker_timings;
mod worker_tracking;

pub use timeout::{RunTimeoutStrategy, TimeoutReason};