Skip to content

Commit

Permalink
feat(en): Add health checks for EN components (#1088)
Browse files Browse the repository at this point in the history
## What ❔

Adds health checks with intelligent details for most of components run
by the external node.

## Why ❔

These health checks would allow monitoring what's going on with an EN
easier both for humans and machines. The latter could be used in
integration tests etc.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli committed Feb 16, 2024
1 parent de4d729 commit 4ea1520
Show file tree
Hide file tree
Showing 16 changed files with 476 additions and 190 deletions.
32 changes: 32 additions & 0 deletions core/bin/external_node/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//! Miscellaneous helpers for the EN.

use zksync_health_check::{async_trait, CheckHealth, Health, HealthStatus};
use zksync_web3_decl::{jsonrpsee::http_client::HttpClient, namespaces::EthNamespaceClient};

/// Main node health check.
#[derive(Debug)]
pub(crate) struct MainNodeHealthCheck(HttpClient);

impl From<HttpClient> for MainNodeHealthCheck {
fn from(client: HttpClient) -> Self {
Self(client)
}
}

#[async_trait]
impl CheckHealth for MainNodeHealthCheck {
fn name(&self) -> &'static str {
"main_node_http_rpc"
}

async fn check_health(&self) -> Health {
if let Err(err) = self.0.get_block_number().await {
tracing::warn!("Health-check call to main node HTTP RPC failed: {err}");
let details = serde_json::json!({
"error": err.to_string(),
});
return Health::from(HealthStatus::NotReady).with_details(details);
}
HealthStatus::Ready.into()
}
}
70 changes: 44 additions & 26 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ use zksync_core::{
},
};
use zksync_dal::{healthcheck::ConnectionPoolHealthCheck, ConnectionPool};
use zksync_health_check::CheckHealth;
use zksync_health_check::{CheckHealth, HealthStatus, ReactiveHealthCheck};
use zksync_state::PostgresStorageCaches;
use zksync_storage::RocksDB;
use zksync_utils::wait_for_tasks::wait_for_tasks;

use crate::{config::ExternalNodeConfig, init::ensure_storage_initialized};
use crate::{
config::ExternalNodeConfig, helpers::MainNodeHealthCheck, init::ensure_storage_initialized,
};

mod config;
mod helpers;
mod init;
mod metrics;

Expand Down Expand Up @@ -104,13 +107,12 @@ async fn build_state_keeper(
}

async fn init_tasks(
config: ExternalNodeConfig,
config: &ExternalNodeConfig,
connection_pool: ConnectionPool,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<(
Vec<task::JoinHandle<anyhow::Result<()>>>,
watch::Sender<bool>,
HealthCheckHandle,
watch::Receiver<bool>,
Vec<Box<dyn CheckHealth>>,
)> {
let release_manifest: serde_json::Value = serde_json::from_str(RELEASE_MANIFEST)
.expect("release manifest is a valid json document; qed");
Expand All @@ -124,12 +126,12 @@ async fn init_tasks(
.required
.main_node_url()
.expect("Main node URL is incorrect");
let (stop_sender, stop_receiver) = watch::channel(false);
let mut healthchecks: Vec<Box<dyn CheckHealth>> = Vec::new();
// Create components.
let fee_params_fetcher = Arc::new(MainNodeFeeParamsFetcher::new(&main_node_url));

let sync_state = SyncState::new();
let sync_state = SyncState::default();
healthchecks.push(Box::new(sync_state.clone()));
let (action_queue_sender, action_queue) = ActionQueue::new();

let mut task_handles = vec![];
Expand Down Expand Up @@ -159,7 +161,7 @@ async fn init_tasks(
let state_keeper = build_state_keeper(
action_queue,
config.required.state_cache_path.clone(),
&config,
config,
connection_pool.clone(),
sync_state.clone(),
config.remote.l2_erc20_bridge_addr,
Expand All @@ -171,6 +173,9 @@ async fn init_tasks(

let main_node_client = <dyn MainNodeClient>::json_rpc(&main_node_url)
.context("Failed creating JSON-RPC client for main node")?;
healthchecks.push(Box::new(MainNodeHealthCheck::from(
main_node_client.clone(),
)));
let singleton_pool_builder = ConnectionPool::singleton(&config.postgres.database_url);

let fetcher_handle = if let Some(cfg) = config.consensus.clone() {
Expand Down Expand Up @@ -245,6 +250,8 @@ async fn init_tasks(
.await
.context("failed to build connection pool for ConsistencyChecker")?,
);
healthchecks.push(Box::new(consistency_checker.health_check().clone()));
let consistency_checker_handle = tokio::spawn(consistency_checker.run(stop_receiver.clone()));

let batch_status_updater = BatchStatusUpdater::new(
&main_node_url,
Expand All @@ -254,6 +261,7 @@ async fn init_tasks(
.context("failed to build a connection pool for BatchStatusUpdater")?,
)
.context("failed initializing batch status updater")?;
healthchecks.push(Box::new(batch_status_updater.health_check()));

// Run the components.
let tree_stop_receiver = stop_receiver.clone();
Expand All @@ -267,11 +275,9 @@ async fn init_tasks(
.build()
.await
.context("failed to build a commitment_generator_pool")?;
let commitment_generator =
CommitmentGenerator::new(commitment_generator_pool, stop_receiver.clone());
let commitment_generator_handle = tokio::spawn(commitment_generator.run());

let consistency_checker_handle = tokio::spawn(consistency_checker.run(stop_receiver.clone()));
let commitment_generator = CommitmentGenerator::new(commitment_generator_pool);
healthchecks.push(Box::new(commitment_generator.health_check()));
let commitment_generator_handle = tokio::spawn(commitment_generator.run(stop_receiver.clone()));

let updater_handle = task::spawn(batch_status_updater.run(stop_receiver.clone()));
let fee_address_migration_handle =
Expand Down Expand Up @@ -347,15 +353,21 @@ async fn init_tasks(
healthchecks.push(Box::new(ws_server_handles.health_check));
healthchecks.push(Box::new(http_server_handles.health_check));
healthchecks.push(Box::new(ConnectionPoolHealthCheck::new(connection_pool)));
let healthcheck_handle = HealthCheckHandle::spawn_server(
([0, 0, 0, 0], config.required.healthcheck_port).into(),
healthchecks,
);

if let Some(port) = config.optional.prometheus_port {
let prometheus_task = PrometheusExporterConfig::pull(port).run(stop_receiver.clone());
task_handles.push(tokio::spawn(prometheus_task));
let (prometheus_health_check, prometheus_health_updater) =
ReactiveHealthCheck::new("prometheus_exporter");
healthchecks.push(Box::new(prometheus_health_check));
task_handles.push(tokio::spawn(async move {
prometheus_health_updater.update(HealthStatus::Ready.into());
let result = PrometheusExporterConfig::pull(port)
.run(stop_receiver)
.await;
drop(prometheus_health_updater);
result
}));
}

task_handles.extend(http_server_handles.tasks);
task_handles.extend(ws_server_handles.tasks);
task_handles.extend(cache_update_handle);
Expand All @@ -370,7 +382,7 @@ async fn init_tasks(
commitment_generator_handle,
]);

Ok((task_handles, stop_sender, healthcheck_handle, stop_receiver))
Ok((task_handles, healthchecks))
}

async fn shutdown_components(
Expand Down Expand Up @@ -515,13 +527,19 @@ async fn main() -> anyhow::Result<()> {
)
.await?;

let (task_handles, stop_sender, health_check_handle, stop_receiver) =
init_tasks(config.clone(), connection_pool.clone())
let (stop_sender, stop_receiver) = watch::channel(false);
let (task_handles, mut healthchecks) =
init_tasks(&config, connection_pool.clone(), stop_receiver.clone())
.await
.context("init_tasks")?;

let reorg_detector = ReorgDetector::new(&main_node_url, connection_pool.clone(), stop_receiver);
let mut reorg_detector_handle = tokio::spawn(reorg_detector.run()).fuse();
let reorg_detector = ReorgDetector::new(&main_node_url, connection_pool.clone());
healthchecks.push(Box::new(reorg_detector.health_check().clone()));
let healthcheck_handle = HealthCheckHandle::spawn_server(
([0, 0, 0, 0], config.required.healthcheck_port).into(),
healthchecks,
);
let mut reorg_detector_handle = tokio::spawn(reorg_detector.run(stop_receiver)).fuse();
let mut reorg_detector_result = None;

let particular_crypto_alerts = None;
Expand All @@ -541,7 +559,7 @@ async fn main() -> anyhow::Result<()> {

// Reaching this point means that either some actor exited unexpectedly or we received a stop signal.
// Broadcast the stop signal to all actors and exit.
shutdown_components(stop_sender, health_check_handle).await;
shutdown_components(stop_sender, healthcheck_handle).await;

if !reorg_detector_handle.is_terminated() {
reorg_detector_result = Some(reorg_detector_handle.await);
Expand Down
2 changes: 1 addition & 1 deletion core/lib/health_check/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async-trait = "0.1"
futures = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["sync"] }
tokio = { version = "1", features = ["sync", "time"] }
tracing = "0.1"

[dev-dependencies]
Expand Down
78 changes: 68 additions & 10 deletions core/lib/health_check/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,27 @@ pub enum HealthStatus {
NotReady,
/// Component is ready for operations.
Ready,
/// Component is affected by some non-fatal issue. The component is still considered healthy.
Affected,
/// Component is shut down.
ShutDown,
/// Component has been abnormally interrupted by a panic.
Panicked,
}

impl HealthStatus {
/// Checks whether a component is ready according to this status.
pub fn is_ready(self) -> bool {
matches!(self, Self::Ready)
/// Checks whether a component is healthy according to this status.
pub fn is_healthy(self) -> bool {
matches!(self, Self::Ready | Self::Affected)
}

fn priority_for_aggregation(self) -> usize {
match self {
Self::Ready => 0,
Self::ShutDown => 1,
Self::NotReady => 2,
Self::Panicked => 3,
Self::Affected => 1,
Self::ShutDown => 2,
Self::NotReady => 3,
Self::Panicked => 4,
}
}
}
Expand Down Expand Up @@ -94,7 +97,7 @@ impl AppHealth {
let inner = aggregated_status.into();

let this = Self { inner, components };
if !this.inner.status.is_ready() {
if !this.inner.status.is_healthy() {
// Only log non-ready application health so that logs are not spammed without a reason.
tracing::debug!("Aggregated application health: {this:?}");
}
Expand Down Expand Up @@ -129,8 +132,8 @@ impl AppHealth {
}
}

pub fn is_ready(&self) -> bool {
self.inner.status.is_ready()
pub fn is_healthy(&self) -> bool {
self.inner.status.is_healthy()
}
}

Expand All @@ -144,7 +147,7 @@ pub trait CheckHealth: Send + Sync + 'static {
}

/// Basic implementation of [`CheckHealth`] trait that can be updated using a matching [`HealthUpdater`].
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ReactiveHealthCheck {
name: &'static str,
health_receiver: watch::Receiver<Health>,
Expand Down Expand Up @@ -293,4 +296,59 @@ mod tests {
let updated = health_updater.update(health);
assert!(updated);
}

#[tokio::test]
async fn aggregating_health_checks() {
let (first_check, first_updater) = ReactiveHealthCheck::new("first");
let (second_check, second_updater) = ReactiveHealthCheck::new("second");
let checks: Vec<Box<dyn CheckHealth>> = vec![Box::new(first_check), Box::new(second_check)];

let app_health = AppHealth::new(&checks).await;
assert!(!app_health.is_healthy());
assert_matches!(app_health.inner.status(), HealthStatus::NotReady);
assert_matches!(
app_health.components["first"].status,
HealthStatus::NotReady
);
assert_matches!(
app_health.components["second"].status,
HealthStatus::NotReady
);

first_updater.update(HealthStatus::Ready.into());

let app_health = AppHealth::new(&checks).await;
assert!(!app_health.is_healthy());
assert_matches!(app_health.inner.status(), HealthStatus::NotReady);
assert_matches!(app_health.components["first"].status, HealthStatus::Ready);
assert_matches!(
app_health.components["second"].status,
HealthStatus::NotReady
);

second_updater.update(HealthStatus::Affected.into());

let app_health = AppHealth::new(&checks).await;
assert!(app_health.is_healthy());
assert_matches!(app_health.inner.status(), HealthStatus::Affected);
assert_matches!(app_health.components["first"].status, HealthStatus::Ready);
assert_matches!(
app_health.components["second"].status,
HealthStatus::Affected
);

drop(first_updater);

let app_health = AppHealth::new(&checks).await;
assert!(!app_health.is_healthy());
assert_matches!(app_health.inner.status(), HealthStatus::ShutDown);
assert_matches!(
app_health.components["first"].status,
HealthStatus::ShutDown
);
assert_matches!(
app_health.components["second"].status,
HealthStatus::Affected
);
}
}
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/api_server/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ async fn check_health<T: AsRef<dyn CheckHealth>>(
health_checks: State<Arc<[T]>>,
) -> (StatusCode, Json<AppHealth>) {
let response = AppHealth::new(&health_checks).await;
let response_code = if response.is_ready() {
let response_code = if response.is_healthy() {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/api_server/web3/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl ApiServerHandles {
"Timed out waiting for API server"
);
let health = self.health_check.check_health().await;
if health.status().is_ready() {
if health.status().is_healthy() {
break;
}
tokio::time::sleep(POLL_INTERVAL).await;
Expand Down
Loading

0 comments on commit 4ea1520

Please sign in to comment.