Skip to content

Commit

Permalink
feat(node_framework): Support for preconditions and oneshot tasks (#1398
Browse files Browse the repository at this point in the history
)

This PR consists of three parts in one (sorry for that).

## Part 1. Preconditions

An alternative take on
#1290

In short:

- Adds two new concepts to the framework: `Precondition` and
`UnconstrainedTask`.
- Any precondition is a future that must successfully resolve before
tasks can be started. An example of future precondition may be the
readiness of storage after snapshot sync.
- Any `Task` will wait until all of the added preconditions successfully
resolves.
- An `UnconstrainedTask` is a way to opt-out from waiting for
preconditions to complete. These may be useful in two cases:
* If you have a stateless component that should start ASAP (e.g.
healthcheck server).
* If your task is supposed to ensure that precondition is met (e.g. task
that does state recovery).
 
The last point is important, btw: imagine that the node runs as a set of
microservices on different machines. The precondition should be present
on *all* machines (e.g. simply wait until it's ensured), but the
corresponding unconstrained task should be run on *just one* machine
(since it alters state).

In the current impl there is no way to subscribe to a certain set of
preconditions or opt-out from a single precondition. You either have to
wait for them all or may rely on none. This is done on purpose to not
overcomplicate the publicly facing interface. The expectation is that
`Task` is a working default that people may just pick up and it'll be
guaranteed to work. You only need to meet extra complexity when a `Task`
is not enough for you.

## Part 2. Oneshot tasks

Adds a notion of oneshot tasks and adds support of running the service
in an oneshot mode (e.g. when we *only* add oneshot tasks and expect the
node to exit as soon as *all of them* resolve).

It comes in two flavors:

- `OneshotTask`. An oneshot analog of `Task`.
- `UnconstrainedOneshotTask`. An oneshot analog of `UnconstrainedTask`.
May be useful, for example, to enforce some `Precondition` and then
exit.

This way the logical flow of the node is as follows:

1. Start all the unconstrained tasks, unconstrained oneshot tasks, and
preconditions.
2. Wait for all the preconditions to resolve.
3. Start all the tasks and oneshot tasks.
4. Wait for any of long-running tasks (i.e. *not* oneshot) to exit.

## Part 3. Refactoring

There was a lot of new logic added, so the `ZkSyncService::run` became
pretty messy.
The most notable part here is that now all flavors of tasks are managed
by the new `Runnables` collection that encapsulates the logic of turning
different tasks into futures that can be launched by the service.
  • Loading branch information
popzxc committed Mar 18, 2024
1 parent 0affdf8 commit 65ea881
Show file tree
Hide file tree
Showing 11 changed files with 536 additions and 114 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions checks-config/era.dic
Expand Up @@ -916,4 +916,5 @@ balancer
lookups
stateful
WIP
oneshot
p2p
1 change: 1 addition & 0 deletions core/node/node_framework/Cargo.toml
Expand Up @@ -22,6 +22,7 @@ zksync_storage = { path = "../../lib/storage" }
zksync_eth_client = { path = "../../lib/eth_client" }
zksync_contracts = { path = "../../lib/contracts" }
zksync_web3_decl = { path = "../../lib/web3_decl" }
zksync_utils = { path = "../../lib/utils" }

tracing = "0.1"
thiserror = "1"
Expand Down
Expand Up @@ -7,7 +7,7 @@ use zksync_health_check::AppHealthCheck;
use crate::{
implementations::resources::healthcheck::AppHealthCheckResource,
service::{ServiceContext, StopReceiver},
task::Task,
task::UnconstrainedTask,
wiring_layer::{WiringError, WiringLayer},
};

Expand Down Expand Up @@ -39,7 +39,8 @@ impl WiringLayer for HealthCheckLayer {
app_health_check,
};

node.add_task(Box::new(task));
// Healthcheck server only exposes the state provided by other tasks, and also it has to start as soon as possible.
node.add_unconstrained_task(Box::new(task));
Ok(())
}
}
Expand All @@ -51,12 +52,15 @@ struct HealthCheckTask {
}

#[async_trait::async_trait]
impl Task for HealthCheckTask {
impl UnconstrainedTask for HealthCheckTask {
fn name(&self) -> &'static str {
"healthcheck_server"
}

async fn run(mut self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
async fn run_unconstrained(
mut self: Box<Self>,
mut stop_receiver: StopReceiver,
) -> anyhow::Result<()> {
let handle =
HealthCheckHandle::spawn_server(self.config.bind_addr(), self.app_health_check.clone());
stop_receiver.0.changed().await?;
Expand Down
1 change: 1 addition & 0 deletions core/node/node_framework/src/lib.rs
Expand Up @@ -19,6 +19,7 @@
//! - Run it.

pub mod implementations;
pub mod precondition;
pub mod resource;
pub mod service;
pub mod task;
Expand Down
33 changes: 33 additions & 0 deletions core/node/node_framework/src/precondition.rs
@@ -0,0 +1,33 @@
use std::sync::Arc;

use tokio::sync::Barrier;

use crate::service::StopReceiver;

#[async_trait::async_trait]
pub trait Precondition: 'static + Send + Sync {
/// Unique name of the precondition.
fn name(&self) -> &'static str;

async fn check(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()>;
}

impl dyn Precondition {
/// An internal helper method that runs a precondition check and lifts the barrier as soon
/// as the check is finished.
pub(super) async fn check_with_barrier(
self: Box<Self>,
mut stop_receiver: StopReceiver,
preconditions_barrier: Arc<Barrier>,
) -> anyhow::Result<()> {
self.check(stop_receiver.clone()).await?;
tokio::select! {
_ = preconditions_barrier.wait() => {
Ok(())
}
_ = stop_receiver.0.changed() => {
Ok(())
}
}
}
}
59 changes: 56 additions & 3 deletions core/node/node_framework/src/service/context.rs
@@ -1,7 +1,8 @@
use crate::{
precondition::Precondition,
resource::{Resource, StoredResource},
service::ZkStackService,
task::Task,
task::{OneshotTask, Task, UnconstrainedOneshotTask, UnconstrainedTask},
wiring_layer::WiringError,
};

Expand Down Expand Up @@ -33,10 +34,62 @@ impl<'a> ServiceContext<'a> {
}

/// Adds a task to the service.
/// Added tasks will be launched after the wiring process will be finished.
/// Added tasks will be launched after the wiring process will be finished and all the preconditions
/// are met.
pub fn add_task(&mut self, task: Box<dyn Task>) -> &mut Self {
tracing::info!("Layer {} has added a new task: {}", self.layer, task.name());
self.service.tasks.push(task);
self.service.runnables.tasks.push(task);
self
}

/// Adds an unconstrained task to the service.
/// Unconstrained tasks will be launched immediately after the wiring process is finished.
pub fn add_unconstrained_task(&mut self, task: Box<dyn UnconstrainedTask>) -> &mut Self {
tracing::info!(
"Layer {} has added a new unconstrained task: {}",
self.layer,
task.name()
);
self.service.runnables.unconstrained_tasks.push(task);
self
}

/// Adds a precondition to the service.
pub fn add_precondition(&mut self, precondition: Box<dyn Precondition>) -> &mut Self {
tracing::info!(
"Layer {} has added a new precondition: {}",
self.layer,
precondition.name()
);
self.service.runnables.preconditions.push(precondition);
self
}

/// Adds an oneshot task to the service.
pub fn add_oneshot_task(&mut self, task: Box<dyn OneshotTask>) -> &mut Self {
tracing::info!(
"Layer {} has added a new oneshot task: {}",
self.layer,
task.name()
);
self.service.runnables.oneshot_tasks.push(task);
self
}

/// Adds an unconstrained oneshot task to the service.
pub fn add_unconstrained_oneshot_task(
&mut self,
task: Box<dyn UnconstrainedOneshotTask>,
) -> &mut Self {
tracing::info!(
"Layer {} has added a new unconstrained oneshot task: {}",
self.layer,
task.name()
);
self.service
.runnables
.unconstrained_oneshot_tasks
.push(task);
self
}

Expand Down

0 comments on commit 65ea881

Please sign in to comment.