diff --git a/Cargo.lock b/Cargo.lock index f6e1f73bf93a..adac570e684f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6524,7 +6524,7 @@ dependencies = [ "futures", "glob", "itertools", - "lru 0.10.0", + "lru 0.7.6", "madsim", "madsim-aws-sdk-s3", "madsim-etcd-client", diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 74bc97d5c212..24d500ade27d 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -288,7 +288,7 @@ steps: # files: "*-junit.xml" # format: "junit" - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 18 + timeout_in_minutes: 25 - label: "misc check" command: "ci/scripts/misc-check.sh" diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 6f1793bdf08f..9a85ad991f27 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -104,6 +104,7 @@ pub async fn compute_node_serve( WorkerType::ComputeNode, &advertise_addr, opts.parallelism, + &config.meta, ) .await .unwrap(); diff --git a/src/ctl/src/common/meta_service.rs b/src/ctl/src/common/meta_service.rs index c0617cfa2473..d40b46640505 100644 --- a/src/ctl/src/common/meta_service.rs +++ b/src/ctl/src/common/meta_service.rs @@ -15,6 +15,7 @@ use std::env; use anyhow::{bail, Result}; +use risingwave_common::config::MetaConfig; use risingwave_common::util::addr::HostAddr; use risingwave_pb::common::WorkerType; use risingwave_rpc_client::MetaClient; @@ -58,6 +59,7 @@ Note: the default value of `RW_META_ADDR` is 'http://127.0.0.1:5690'."; WorkerType::RiseCtl, &get_new_ctl_identity(), 0, + &MetaConfig::default(), ) .await?; let worker_id = client.worker_id(); diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 7be89190dec4..9abe51cdef25 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -190,6 +190,7 @@ impl FrontendEnv { WorkerType::Frontend, &frontend_address, 0, + &config.meta, ) .await?; diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index 5b357e85282c..1321f9a16f96 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -619,6 +619,8 @@ pub async fn start_service_as_election_leader( } }; + tracing::info!("Starting meta services"); + tonic::transport::Server::builder() .layer(MetricsMiddlewareLayer::new(meta_metrics)) .add_service(HeartbeatServiceServer::new(heartbeat_srv)) diff --git a/src/prost/Cargo.toml b/src/prost/Cargo.toml index 4547a0decf2d..dd64bda49720 100644 --- a/src/prost/Cargo.toml +++ b/src/prost/Cargo.toml @@ -13,7 +13,7 @@ pbjson = "0.5" prost = "0.11" prost-helpers = { path = "helpers" } serde = { version = "1", features = ["derive"] } -tonic = { version = "0.2.14", package = "madsim-tonic" } +tonic = { version = "0.2.18", package = "madsim-tonic" } [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 58602b678062..58fb11f8add6 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -26,7 +26,7 @@ use futures::stream::BoxStream; use itertools::Itertools; use lru::LruCache; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; -use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; +use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::telemetry::report::TelemetryInfoFetcher; use risingwave_common::util::addr::HostAddr; @@ -91,6 +91,7 @@ pub struct MetaClient { worker_type: WorkerType, host_addr: HostAddr, inner: GrpcMetaClient, + meta_config: MetaConfig, } impl MetaClient { @@ -118,7 +119,12 @@ impl MetaClient { host: Some(self.host_addr.to_protobuf()), worker_id: self.worker_id(), }; - let retry_strategy = GrpcMetaClient::retry_strategy_for_request(); + + let retry_strategy = GrpcMetaClient::retry_strategy_to_bound( + Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64), + true, + ); + tokio_retry::Retry::spawn(retry_strategy, || async { let request = request.clone(); self.inner.subscribe(request).await @@ -197,41 +203,49 @@ impl MetaClient { worker_type: WorkerType, addr: &HostAddr, worker_node_parallelism: usize, + meta_config: &MetaConfig, ) -> Result<(Self, SystemParamsReader)> { let addr_strategy = Self::parse_meta_addr(meta_addr)?; tracing::info!("register meta client using strategy: {}", addr_strategy); - let grpc_meta_client = GrpcMetaClient::new(addr_strategy).await?; + // Retry until reaching `max_heartbeat_interval_secs` + let retry_strategy = GrpcMetaClient::retry_strategy_to_bound( + Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64), + true, + ); + + let init_result: Result<_> = tokio_retry::Retry::spawn(retry_strategy, || async { + let grpc_meta_client = GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?; + + let add_worker_resp = grpc_meta_client + .add_worker_node(AddWorkerNodeRequest { + worker_type: worker_type as i32, + host: Some(addr.to_protobuf()), + worker_node_parallelism: worker_node_parallelism as u64, + }) + .await?; + + let system_params_resp = grpc_meta_client + .get_system_params(GetSystemParamsRequest {}) + .await?; + + Ok((add_worker_resp, system_params_resp, grpc_meta_client)) + }) + .await; + + let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?; - let add_worker_request = AddWorkerNodeRequest { - worker_type: worker_type as i32, - host: Some(addr.to_protobuf()), - worker_node_parallelism: worker_node_parallelism as u64, - }; - let add_worker_resp = - tokio_retry::Retry::spawn(GrpcMetaClient::retry_strategy_for_request(), || async { - let request = add_worker_request.clone(); - grpc_meta_client.add_worker_node(request).await - }) - .await?; let worker_node = add_worker_resp .node .expect("AddWorkerNodeResponse::node is empty"); - let system_params_request = GetSystemParamsRequest {}; - let system_params_resp = - tokio_retry::Retry::spawn(GrpcMetaClient::retry_strategy_for_request(), || async { - let request = system_params_request.clone(); - grpc_meta_client.get_system_params(request).await - }) - .await?; - Ok(( Self { worker_id: worker_node.id, worker_type, host_addr: addr.clone(), inner: grpc_meta_client, + meta_config: meta_config.to_owned(), }, system_params_resp.params.unwrap().into(), )) @@ -242,7 +256,10 @@ impl MetaClient { let request = ActivateWorkerNodeRequest { host: Some(addr.to_protobuf()), }; - let retry_strategy = GrpcMetaClient::retry_strategy_for_request(); + let retry_strategy = GrpcMetaClient::retry_strategy_to_bound( + Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64), + true, + ); tokio_retry::Retry::spawn(retry_strategy, || async { let request = request.clone(); self.inner.activate_worker_node(request).await @@ -1116,14 +1133,15 @@ struct MetaMemberGroup { members: LruCache>, } -struct ElectionMemberManagement { +struct MetaMemberManagement { core_ref: Arc>, members: Either, current_leader: String, + meta_config: MetaConfig, } -impl ElectionMemberManagement { - const ELECTION_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5); +impl MetaMemberManagement { + const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5); fn host_address_to_url(addr: HostAddress) -> String { format!("http://{}:{}", addr.host, addr.port) @@ -1202,8 +1220,17 @@ impl ElectionMemberManagement { if discovered_leader != self.current_leader { tracing::info!("new meta leader {} discovered", discovered_leader); - let (channel, _) = - GrpcMetaClient::try_build_rpc_channel(vec![discovered_leader.clone()]).await?; + + let retry_strategy = GrpcMetaClient::retry_strategy_to_bound( + Duration::from_secs(self.meta_config.meta_leader_lease_secs), + false, + ); + + let channel = tokio_retry::Retry::spawn(retry_strategy, || async { + let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone())?; + GrpcMetaClient::connect_to_endpoint(endpoint).await + }) + .await?; self.recreate_core(channel).await; self.current_leader = discovered_leader; @@ -1215,44 +1242,39 @@ impl ElectionMemberManagement { } impl GrpcMetaClient { - // Retry base interval in ms for connecting to meta server. - const CONN_RETRY_BASE_INTERVAL_MS: u64 = 100; - // Max retry interval in ms for connecting to meta server. - const CONN_RETRY_MAX_INTERVAL_MS: u64 = 5000; // See `Endpoint::http2_keep_alive_interval` const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60; // See `Endpoint::keep_alive_timeout` const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60; - // Max retry times for request to meta server. - const REQUEST_RETRY_BASE_INTERVAL_MS: u64 = 50; + // Retry base interval in ms for connecting to meta server. + const INIT_RETRY_BASE_INTERVAL_MS: u64 = 50; // Max retry times for connecting to meta server. - const REQUEST_RETRY_MAX_ATTEMPTS: usize = 10; - // Max retry interval in ms for request to meta server. - const REQUEST_RETRY_MAX_INTERVAL_MS: u64 = 5000; + const INIT_RETRY_MAX_INTERVAL_MS: u64 = 5000; async fn start_meta_member_monitor( &self, init_leader_addr: String, members: Either, force_refresh_receiver: Receiver>>, + meta_config: MetaConfig, ) -> Result<()> { let core_ref = self.core.clone(); let current_leader = init_leader_addr; let enable_period_tick = matches!(members, Either::Right(_)); - let member_management = ElectionMemberManagement { + let member_management = MetaMemberManagement { core_ref, members, current_leader, + meta_config, }; let mut force_refresh_receiver = force_refresh_receiver; tokio::spawn(async move { let mut member_management = member_management; - let mut ticker = - time::interval(ElectionMemberManagement::ELECTION_MEMBER_REFRESH_PERIOD); + let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD); loop { let result_sender: Option>> = if enable_period_tick { @@ -1266,7 +1288,7 @@ impl GrpcMetaClient { let tick_result = member_management.refresh_members().await; if let Err(e) = tick_result.as_ref() { - tracing::warn!("refresh election client failed {}", e); + tracing::warn!("refresh meta member client failed {}", e); } if let Some(sender) = result_sender { @@ -1291,8 +1313,8 @@ impl GrpcMetaClient { } /// Connect to the meta server from `addrs`. - pub async fn new(strategy: MetaAddressStrategy) -> Result { - let (channel, addr) = match &strategy { + pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result { + let (channel, addr) = match strategy { MetaAddressStrategy::LoadBalance(addr) => { Self::try_build_rpc_channel(vec![addr.clone()]).await } @@ -1305,7 +1327,7 @@ impl GrpcMetaClient { }; let meta_member_client = client.core.read().await.meta_member_client.clone(); - let members = match &strategy { + let members = match strategy { MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client), MetaAddressStrategy::List(addrs) => { let mut members = LruCache::new(NonZeroUsize::new(20).unwrap()); @@ -1319,12 +1341,10 @@ impl GrpcMetaClient { }; client - .start_meta_member_monitor(addr, members, force_refresh_receiver) + .start_meta_member_monitor(addr, members, force_refresh_receiver, config) .await?; - if let Err(e) = client.force_refresh_leader().await { - tracing::warn!("force refresh leader failed {}, init leader may failed", e); - } + client.force_refresh_leader().await?; Ok(client) } @@ -1341,35 +1361,27 @@ impl GrpcMetaClient { .map(|addr| Self::addr_to_endpoint(addr.clone()).map(|endpoint| (endpoint, addr))) .try_collect()?; - let retry_strategy = ExponentialBackoff::from_millis(Self::CONN_RETRY_BASE_INTERVAL_MS) - .max_delay(Duration::from_millis(Self::CONN_RETRY_MAX_INTERVAL_MS)) - .map(jitter); + let endpoints = endpoints.clone(); - let channel = tokio_retry::Retry::spawn(retry_strategy, || async { - let endpoints = endpoints.clone(); - - for (endpoint, addr) in endpoints { - match Self::connect_to_endpoint(endpoint).await { - Ok(channel) => { - tracing::info!("Connect to meta server {} successfully", addr); - return Ok((channel, addr)); - } - Err(e) => { - tracing::warn!( - "Failed to connect to meta server {}, trying again: {}", - addr, - e - ) - } + for (endpoint, addr) in endpoints { + match Self::connect_to_endpoint(endpoint).await { + Ok(channel) => { + tracing::info!("Connect to meta server {} successfully", addr); + return Ok((channel, addr)); + } + Err(e) => { + tracing::warn!( + "Failed to connect to meta server {}, trying again: {}", + addr, + e + ) } } + } - Err(RpcError::Internal(anyhow!( - "Failed to connect to meta server" - ))) - }) - .await?; - Ok(channel) + Err(RpcError::Internal(anyhow!( + "Failed to connect to meta server" + ))) } async fn connect_to_endpoint(endpoint: Endpoint) -> Result { @@ -1382,12 +1394,25 @@ impl GrpcMetaClient { .map_err(RpcError::TransportError) } - /// Return retry strategy for retrying meta requests. - pub fn retry_strategy_for_request() -> impl Iterator { - ExponentialBackoff::from_millis(Self::REQUEST_RETRY_BASE_INTERVAL_MS) - .max_delay(Duration::from_millis(Self::REQUEST_RETRY_MAX_INTERVAL_MS)) - .map(jitter) - .take(Self::REQUEST_RETRY_MAX_ATTEMPTS) + pub(crate) fn retry_strategy_to_bound( + high_bound: Duration, + exceed: bool, + ) -> impl Iterator { + let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS) + .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS)) + .map(jitter); + + let mut sum = Duration::default(); + + iter.take_while(move |duration| { + sum += *duration; + + if exceed { + sum < high_bound + *duration + } else { + sum < high_bound + } + }) } } diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index d22e1b2c9fb0..c31efc84faa6 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -70,9 +70,11 @@ pub async fn compactor_serve( WorkerType::Compactor, &advertise_addr, 0, + &config.meta, ) .await .unwrap(); + info!("Assigned compactor id {}", meta_client.worker_id()); meta_client.activate(&advertise_addr).await.unwrap(); diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 1544214751ae..b1004ead0cde 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -26,7 +26,9 @@ use clap::Parser; use futures::TryStreamExt; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; -use risingwave_common::config::{extract_storage_memory_config, load_config, NO_OVERRIDE}; +use risingwave_common::config::{ + extract_storage_memory_config, load_config, MetaConfig, NO_OVERRIDE, +}; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, FIRST_VERSION_ID}; @@ -230,13 +232,14 @@ async fn init_metadata_for_replay( // filter key manager will fail to acquire a key extractor. tokio::time::sleep(Duration::from_secs(2)).await; + let meta_config = MetaConfig::default(); let meta_client: MetaClient; tokio::select! { _ = tokio::signal::ctrl_c() => { tracing::info!("Ctrl+C received, now exiting"); std::process::exit(0); }, - ret = MetaClient::register_new(cluster_meta_endpoint, WorkerType::RiseCtl, advertise_addr, 0) => { + ret = MetaClient::register_new(cluster_meta_endpoint, WorkerType::RiseCtl, advertise_addr, 0, &meta_config) => { (meta_client, _) = ret.unwrap(); }, } @@ -246,8 +249,14 @@ async fn init_metadata_for_replay( let tables = meta_client.risectl_list_state_tables().await?; - let (new_meta_client, _) = - MetaClient::register_new(new_meta_endpoint, WorkerType::RiseCtl, advertise_addr, 0).await?; + let (new_meta_client, _) = MetaClient::register_new( + new_meta_endpoint, + WorkerType::RiseCtl, + advertise_addr, + 0, + &meta_config, + ) + .await?; new_meta_client.activate(advertise_addr).await.unwrap(); if ci_mode { let table_to_check = tables.iter().find(|t| t.name == "nexmark_q7").unwrap(); @@ -277,6 +286,7 @@ async fn pull_version_deltas( WorkerType::RiseCtl, advertise_addr, 0, + &MetaConfig::default(), ) .await?; let worker_id = meta_client.worker_id(); @@ -325,9 +335,14 @@ async fn start_replay( // Register to the cluster. // We reuse the RiseCtl worker type here - let (meta_client, system_params) = - MetaClient::register_new(&opts.meta_address, WorkerType::RiseCtl, &advertise_addr, 0) - .await?; + let (meta_client, system_params) = MetaClient::register_new( + &opts.meta_address, + WorkerType::RiseCtl, + &advertise_addr, + 0, + &config.meta, + ) + .await?; let worker_id = meta_client.worker_id(); tracing::info!("Assigned replay worker id {}", worker_id); meta_client.activate(&advertise_addr).await.unwrap(); diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 801a76801c48..f788a50d9f3f 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -16,12 +16,12 @@ async-trait = "0.1" aws-sdk-s3 = { version = "0.2.17", package = "madsim-aws-sdk-s3" } clap = { version = "4", features = ["derive"] } console = "0.15" -etcd-client = { version = "0.2.17", package = "madsim-etcd-client" } +etcd-client = { version = "0.2.18", package = "madsim-etcd-client" } futures = { version = "0.3", default-features = false, features = ["alloc"] } glob = "0.3" itertools = "0.10" -lru = "0.10.0" -madsim = "0.2.17" +lru = { git = "https://github.com/risingwavelabs/lru-rs.git", branch = "evict_by_timestamp" } +madsim = "0.2.18" paste = "1" pin-project = "1.0" pretty_assertions = "1" diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 48c1f08539ac..c2e55c416b2e 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -98,7 +98,7 @@ impl Configuration { config_path: ConfigPath::Temp(config_path.into()), frontend_nodes: 2, compute_nodes: 3, - meta_nodes: 1, + meta_nodes: 3, compactor_nodes: 2, compute_node_cores: 2, etcd_timeout_rate: 0.0, @@ -432,6 +432,10 @@ impl Cluster { } nodes.push(format!("meta-{}", i)); } + // don't kill all meta services + if nodes.len() == self.config.meta_nodes { + nodes.truncate(1); + } } if opts.kill_frontend { let rand = rand::thread_rng().gen_range(0..3); @@ -475,7 +479,14 @@ impl Cluster { tracing::info!("kill {name}"); madsim::runtime::Handle::current().kill(name); - let t = rand::thread_rng().gen_range(Duration::from_secs(0)..Duration::from_secs(1)); + let mut t = + rand::thread_rng().gen_range(Duration::from_secs(0)..Duration::from_secs(1)); + // has a small chance to restart after a long time + // so that the node is expired and removed from the cluster + if rand::thread_rng().gen_bool(0.1) { + // max_heartbeat_interval_secs = 60 + t += Duration::from_secs(20); + } tokio::time::sleep(t).await; tracing::info!("restart {name}"); madsim::runtime::Handle::current().restart(name); diff --git a/src/tests/simulation/src/risingwave-scale.toml b/src/tests/simulation/src/risingwave-scale.toml index b88e8a0644f7..6757f4cf288b 100644 --- a/src/tests/simulation/src/risingwave-scale.toml +++ b/src/tests/simulation/src/risingwave-scale.toml @@ -2,6 +2,10 @@ # # Note: this file is embedded in the binary and cannot be changed without recompiling. +[meta] +# a relatively small number to make it easier to timeout +max_heartbeat_interval_secs = 10 + [system] barrier_interval_ms = 250 checkpoint_frequency = 4 diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index 9dc83472b98e..8a31f5c0cc96 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -158,7 +158,7 @@ pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { continue; } - let should_kill = thread_rng().gen_ratio((opts.kill_rate * 1000.0) as u32, 1000); + let should_kill = thread_rng().gen_bool(opts.kill_rate as f64); // spawn a background task to kill nodes let handle = if should_kill { let cluster = cluster.clone();