From 1f54c42a2bc628ef37bf2d23da3bbb6174f549bf Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Wed, 1 Feb 2023 13:58:36 +0800 Subject: [PATCH 01/12] simulate worker node heartbeat timeout Signed-off-by: Runji Wang --- src/tests/simulation/src/cluster.rs | 9 ++++++++- src/tests/simulation/src/risingwave.toml | 4 ++++ src/tests/simulation/src/slt.rs | 2 +- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 6d6e0e289371..229dd0b2580d 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -421,7 +421,14 @@ impl Cluster { tracing::info!("kill {name}"); madsim::runtime::Handle::current().kill(name); - let t = rand::thread_rng().gen_range(Duration::from_secs(0)..Duration::from_secs(1)); + let mut t = + rand::thread_rng().gen_range(Duration::from_secs(0)..Duration::from_secs(1)); + // has a small chance to restart after a long time + // so that the node is expired and removed from the cluster + if rand::thread_rng().gen_bool(0.1) { + // max_heartbeat_interval_secs = 60 + t += Duration::from_secs(120); + } tokio::time::sleep(t).await; tracing::info!("restart {name}"); madsim::runtime::Handle::current().restart(name); diff --git a/src/tests/simulation/src/risingwave.toml b/src/tests/simulation/src/risingwave.toml index 8a4e81637021..3033d4094719 100644 --- a/src/tests/simulation/src/risingwave.toml +++ b/src/tests/simulation/src/risingwave.toml @@ -2,6 +2,10 @@ # # Note: this file is embedded in the binary and cannot be changed without recompiling. +[meta] +# a relatively small number to make it easier to timeout +max_heartbeat_interval_secs = 60 + [streaming] barrier_interval_ms = 250 checkpoint_frequency = 4 diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index 9821c143118d..ea5ecaa507d8 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -133,7 +133,7 @@ pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { Err(e) => panic!("{}", e), } } - let should_kill = thread_rng().gen_ratio((opts.kill_rate * 1000.0) as u32, 1000); + let should_kill = thread_rng().gen_bool(opts.kill_rate as f64); // spawn a background task to kill nodes let handle = if should_kill { let cluster = cluster.clone(); From 76e32b78ae8c0db6285c9644c95df8d69d00a33c Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Wed, 1 Feb 2023 16:58:31 +0800 Subject: [PATCH 02/12] patch madsim Signed-off-by: Runji Wang --- Cargo.lock | 9 +++------ Cargo.toml | 2 ++ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9b24f4abcbb9..7467e92976ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3541,8 +3541,7 @@ dependencies = [ [[package]] name = "madsim" version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "150508f130d4ab8b00ff33bbe7d2f6276a943d8198efddb7c5a69c6b89ee6547" +source = "git+https://github.com/madsim-rs/madsim.git?rev=dca0e30#dca0e30c307c5f657a0e90976d1553e546f42714" dependencies = [ "ahash 0.7.6", "async-channel", @@ -3569,8 +3568,7 @@ dependencies = [ [[package]] name = "madsim-etcd-client" version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "399abea1dd4b45ed8564c67e400dfac5b65fe8f21356ea5f0cc32c793ea61285" +source = "git+https://github.com/madsim-rs/madsim.git?rev=dca0e30#dca0e30c307c5f657a0e90976d1553e546f42714" dependencies = [ "etcd-client", "futures-util", @@ -3589,8 +3587,7 @@ dependencies = [ [[package]] name = "madsim-macros" version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" +source = "git+https://github.com/madsim-rs/madsim.git?rev=dca0e30#dca0e30c307c5f657a0e90976d1553e546f42714" dependencies = [ "darling", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index dc4e1f5b8c92..6e19c0c47853 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,3 +120,5 @@ getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "cc95ee3 tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" } tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } postgres-types = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } +madsim = { git = "https://github.com/madsim-rs/madsim.git", rev = "dca0e30" } +madsim-etcd-client = { git = "https://github.com/madsim-rs/madsim.git", rev = "dca0e30" } From 9dc237f770ec0e8dd4c9c7a1d10edce889b66222 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Wed, 1 Feb 2023 17:18:39 +0800 Subject: [PATCH 03/12] don't kill all meta services Signed-off-by: Runji Wang --- src/tests/simulation/src/cluster.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 229dd0b2580d..7404784a1042 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -378,6 +378,10 @@ impl Cluster { } nodes.push(format!("meta-{}", i)); } + // don't kill all meta services + if nodes.len() == self.config.meta_nodes { + nodes.truncate(1); + } } if opts.kill_frontend { let rand = rand::thread_rng().gen_range(0..3); From 6a9909c0e669283f2eb89bc9088af2834265a539 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Tue, 7 Feb 2023 14:56:54 +0800 Subject: [PATCH 04/12] switch madsim to crates.io version Signed-off-by: Runji Wang --- Cargo.lock | 13 ++++++++----- Cargo.toml | 2 -- src/tests/simulation/Cargo.toml | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d3bbe3452fed..e26a9a8345d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3589,8 +3589,9 @@ dependencies = [ [[package]] name = "madsim" -version = "0.2.14" -source = "git+https://github.com/madsim-rs/madsim.git?rev=dca0e30#dca0e30c307c5f657a0e90976d1553e546f42714" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad8ec77ebfe5067154eb35baeb03c402e3c57a3db4866e9a1a276d40435c7fb8" dependencies = [ "ahash 0.7.6", "async-channel", @@ -3616,8 +3617,9 @@ dependencies = [ [[package]] name = "madsim-etcd-client" -version = "0.2.14" -source = "git+https://github.com/madsim-rs/madsim.git?rev=dca0e30#dca0e30c307c5f657a0e90976d1553e546f42714" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b25dd57e0c5b84c838451374177f34be28df7b7885d950f736d46937afc42e96" dependencies = [ "etcd-client", "futures-util", @@ -3636,7 +3638,8 @@ dependencies = [ [[package]] name = "madsim-macros" version = "0.2.12" -source = "git+https://github.com/madsim-rs/madsim.git?rev=dca0e30#dca0e30c307c5f657a0e90976d1553e546f42714" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" dependencies = [ "darling", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index 6e19c0c47853..dc4e1f5b8c92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,5 +120,3 @@ getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "cc95ee3 tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" } tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } postgres-types = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } -madsim = { git = "https://github.com/madsim-rs/madsim.git", rev = "dca0e30" } -madsim-etcd-client = { git = "https://github.com/madsim-rs/madsim.git", rev = "dca0e30" } diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 30ceefca117a..d95a171f0d8a 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -9,11 +9,11 @@ anyhow = "1.0" async-trait = "0.1" clap = "3" console = "0.15" -etcd-client = { version = "0.2.14", package = "madsim-etcd-client" } +etcd-client = { version = "0.2.15", package = "madsim-etcd-client" } futures = { version = "0.3", default-features = false, features = ["alloc"] } glob = "0.3" itertools = "0.10" -madsim = "0.2.14" +madsim = "0.2.15" paste = "1" rand = "0.8" rdkafka = { package = "madsim-rdkafka", version = "=0.2.14-alpha", features = ["cmake-build"] } From 2a964506296520e64fea32051b1809dc1835872f Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Mon, 27 Feb 2023 16:08:09 +0800 Subject: [PATCH 05/12] tune timeout interval Signed-off-by: Runji Wang --- src/tests/simulation/src/cluster.rs | 2 +- src/tests/simulation/src/risingwave.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index de175454c3b5..4b62be0ec8fd 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -445,7 +445,7 @@ impl Cluster { // so that the node is expired and removed from the cluster if rand::thread_rng().gen_bool(0.1) { // max_heartbeat_interval_secs = 60 - t += Duration::from_secs(120); + t += Duration::from_secs(20); } tokio::time::sleep(t).await; tracing::info!("restart {name}"); diff --git a/src/tests/simulation/src/risingwave.toml b/src/tests/simulation/src/risingwave.toml index 3033d4094719..758a98337694 100644 --- a/src/tests/simulation/src/risingwave.toml +++ b/src/tests/simulation/src/risingwave.toml @@ -4,7 +4,7 @@ [meta] # a relatively small number to make it easier to timeout -max_heartbeat_interval_secs = 60 +max_heartbeat_interval_secs = 10 [streaming] barrier_interval_ms = 250 From 2c17361b6665e2b3a31a04f4530293aff6b46fe8 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Mon, 6 Mar 2023 14:59:36 +0800 Subject: [PATCH 06/12] fix etcd election simulation Signed-off-by: Runji Wang --- Cargo.lock | 19 +++++++++++++------ Cargo.toml | 2 ++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e880947fcd4a..37c64558c203 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3512,8 +3512,7 @@ dependencies = [ [[package]] name = "madsim" version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e3c98b41d46214f4ae435a95e246710ad7fb1100754f809dd7c18606a7607c4" +source = "git+https://github.com/madsim-rs/madsim.git?rev=c7b0b3d#c7b0b3d3ea82c8265ea78ca6722b30ad1a99f9f3" dependencies = [ "ahash 0.7.6", "async-channel", @@ -3527,6 +3526,7 @@ dependencies = [ "madsim-macros", "naive-timer", "rand 0.8.5", + "rand_xoshiro", "rustversion", "serde", "spin 0.9.5", @@ -3557,8 +3557,7 @@ dependencies = [ [[package]] name = "madsim-etcd-client" version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb918383c4f5966f29760ec48820e1c2846739e4ae411c2a8aaa4466ce1421b7" +source = "git+https://github.com/madsim-rs/madsim.git?rev=c7b0b3d#c7b0b3d3ea82c8265ea78ca6722b30ad1a99f9f3" dependencies = [ "etcd-client", "futures-util", @@ -3577,8 +3576,7 @@ dependencies = [ [[package]] name = "madsim-macros" version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" +source = "git+https://github.com/madsim-rs/madsim.git?rev=c7b0b3d#c7b0b3d3ea82c8265ea78ca6722b30ad1a99f9f3" dependencies = [ "darling", "proc-macro2", @@ -5275,6 +5273,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_xoshiro" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "random-string" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 218ba6d38213..82be796e1b8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,3 +122,5 @@ tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710" tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" } tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } postgres-types = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } +madsim = { git = "https://github.com/madsim-rs/madsim.git", rev = "c7b0b3d" } +madsim-etcd-client = { git = "https://github.com/madsim-rs/madsim.git", rev = "c7b0b3d" } From ac5d6ae316a341ebb903893ecd7a37a10b97a8ca Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Wed, 8 Mar 2023 14:48:56 +0800 Subject: [PATCH 07/12] bump madsim version Signed-off-by: Runji Wang --- Cargo.lock | 17 ++++++++++------- Cargo.toml | 2 -- src/prost/Cargo.toml | 2 +- src/tests/simulation/Cargo.toml | 4 ++-- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee5114423957..f40879ff8ad7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3554,8 +3554,9 @@ dependencies = [ [[package]] name = "madsim" -version = "0.2.17" -source = "git+https://github.com/madsim-rs/madsim.git?rev=c7b0b3d#c7b0b3d3ea82c8265ea78ca6722b30ad1a99f9f3" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c846a15d407458f1ac5da7da965810277229be9c96ed8082a3eaf2787ef81c23" dependencies = [ "ahash 0.7.6", "async-channel", @@ -3599,8 +3600,9 @@ dependencies = [ [[package]] name = "madsim-etcd-client" -version = "0.2.17" -source = "git+https://github.com/madsim-rs/madsim.git?rev=c7b0b3d#c7b0b3d3ea82c8265ea78ca6722b30ad1a99f9f3" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3894525ac4b7d5732b2123f9d29d018005c96a218e5a7c38d1f42601b927d" dependencies = [ "etcd-client", "futures-util", @@ -3619,7 +3621,8 @@ dependencies = [ [[package]] name = "madsim-macros" version = "0.2.12" -source = "git+https://github.com/madsim-rs/madsim.git?rev=c7b0b3d#c7b0b3d3ea82c8265ea78ca6722b30ad1a99f9f3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" dependencies = [ "darling 0.14.3", "proc-macro2", @@ -3663,9 +3666,9 @@ dependencies = [ [[package]] name = "madsim-tonic" -version = "0.2.14" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "420ca55ac297f5a3555cb03fdb085e7e91b1287dd872751a6b30dd3c3573277c" +checksum = "0a0d4e7468777e5885b6c3b88a97e3dd81547e0f3304324126c1a07ae89be470" dependencies = [ "async-stream", "chrono", diff --git a/Cargo.toml b/Cargo.toml index d1784321acfb..69cecc51c723 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,5 +121,3 @@ tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710" tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" } tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } postgres-types = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" } -madsim = { git = "https://github.com/madsim-rs/madsim.git", rev = "c7b0b3d" } -madsim-etcd-client = { git = "https://github.com/madsim-rs/madsim.git", rev = "c7b0b3d" } diff --git a/src/prost/Cargo.toml b/src/prost/Cargo.toml index 1b1c98d5ca28..4a7e73dc037f 100644 --- a/src/prost/Cargo.toml +++ b/src/prost/Cargo.toml @@ -12,7 +12,7 @@ pbjson = "0.5" prost = "0.11" prost-helpers = { path = "helpers" } serde = { version = "1", features = ["derive"] } -tonic = { version = "0.2.14", package = "madsim-tonic" } +tonic = { version = "0.2.18", package = "madsim-tonic" } [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 0cd3764d8066..0ab15b7a56ab 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -16,11 +16,11 @@ async-trait = "0.1" aws-sdk-s3 = { version = "0.2.17", package = "madsim-aws-sdk-s3" } clap = { version = "4", features = ["derive"] } console = "0.15" -etcd-client = { version = "0.2.17", package = "madsim-etcd-client" } +etcd-client = { version = "0.2.18", package = "madsim-etcd-client" } futures = { version = "0.3", default-features = false, features = ["alloc"] } glob = "0.3" itertools = "0.10" -madsim = "0.2.17" +madsim = "0.2.18" paste = "1" pretty_assertions = "1" rand = "0.8" From 54eb2b138d0c22e8dac6796bb9c50e363cee0eae Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Thu, 30 Mar 2023 22:17:29 +0800 Subject: [PATCH 08/12] Increase meta_nodes in Configuration struct to 3. --- src/tests/simulation/src/cluster.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 1c5f6066ac48..ce3db27fb35b 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -79,7 +79,7 @@ impl Configuration { config_path: CONFIG_PATH.as_os_str().to_string_lossy().into(), frontend_nodes: 2, compute_nodes: 3, - meta_nodes: 1, + meta_nodes: 3, compactor_nodes: 2, compute_node_cores: 2, etcd_timeout_rate: 0.0, From 919744e98d698e077e45282d50d17c3f90c5a8f4 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 31 Mar 2023 15:52:39 +0800 Subject: [PATCH 09/12] Define new struct and retry constants for MetaMemberManagement, modifying function implementation. Update GrpcMetaClient retry strategy & max retries. Pass `MetaConfig` to functions. Optimized retry logic in RPC client & added `retry_strategy_for_init` function. Refactored `try_build_rpc_channel` & `connect_to_endpoint`. Removed unnecessary retry strategies. Add error msg, re-order methods, remove warning. Refactor MetaClient creation and GrpcMetaClient await call. Modify GrpcMetaClient retry & add MetaConfig parameter to start_meta_member_monitor Refactor code for cleaner error handling. --- src/compute/src/server.rs | 1 + src/ctl/src/common/meta_service.rs | 2 + src/frontend/src/session.rs | 1 + src/meta/src/rpc/server.rs | 2 + src/rpc_client/src/meta_client.rs | 192 ++++++++++-------- src/storage/compactor/src/server.rs | 2 + .../src/compaction_test_runner.rs | 29 ++- 7 files changed, 141 insertions(+), 88 deletions(-) diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 6f1793bdf08f..9a85ad991f27 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -104,6 +104,7 @@ pub async fn compute_node_serve( WorkerType::ComputeNode, &advertise_addr, opts.parallelism, + &config.meta, ) .await .unwrap(); diff --git a/src/ctl/src/common/meta_service.rs b/src/ctl/src/common/meta_service.rs index c0617cfa2473..d40b46640505 100644 --- a/src/ctl/src/common/meta_service.rs +++ b/src/ctl/src/common/meta_service.rs @@ -15,6 +15,7 @@ use std::env; use anyhow::{bail, Result}; +use risingwave_common::config::MetaConfig; use risingwave_common::util::addr::HostAddr; use risingwave_pb::common::WorkerType; use risingwave_rpc_client::MetaClient; @@ -58,6 +59,7 @@ Note: the default value of `RW_META_ADDR` is 'http://127.0.0.1:5690'."; WorkerType::RiseCtl, &get_new_ctl_identity(), 0, + &MetaConfig::default(), ) .await?; let worker_id = client.worker_id(); diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 58389391f6ff..00ed8d68e90b 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -191,6 +191,7 @@ impl FrontendEnv { WorkerType::Frontend, &frontend_address, 0, + &config.meta, ) .await?; diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index 5b357e85282c..1321f9a16f96 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -619,6 +619,8 @@ pub async fn start_service_as_election_leader( } }; + tracing::info!("Starting meta services"); + tonic::transport::Server::builder() .layer(MetricsMiddlewareLayer::new(meta_metrics)) .add_service(HeartbeatServiceServer::new(heartbeat_srv)) diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 230853ab06d8..ab0159215d77 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -25,7 +25,7 @@ use futures::stream::BoxStream; use itertools::Itertools; use lru::LruCache; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; -use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; +use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::telemetry::report::TelemetryInfoFetcher; use risingwave_common::util::addr::HostAddr; @@ -89,6 +89,7 @@ pub struct MetaClient { worker_type: WorkerType, host_addr: HostAddr, inner: GrpcMetaClient, + meta_config: MetaConfig, } impl MetaClient { @@ -116,12 +117,22 @@ impl MetaClient { host: Some(self.host_addr.to_protobuf()), worker_id: self.worker_id(), }; - let retry_strategy = GrpcMetaClient::retry_strategy_for_request(); - tokio_retry::Retry::spawn(retry_strategy, || async { + + let retry_strategy = GrpcMetaClient::retry_strategy_to_bound( + Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64), + true, + ); + + let result = tokio_retry::Retry::spawn(retry_strategy, || async { let request = request.clone(); self.inner.subscribe(request).await }) - .await + .await; + if let Err(_e) = result.as_ref() { + tracing::error!("debug error {}", self.host_addr.to_string()) + } + + result } pub async fn create_connection(&self, req: create_connection_request::Payload) -> Result { @@ -188,41 +199,49 @@ impl MetaClient { worker_type: WorkerType, addr: &HostAddr, worker_node_parallelism: usize, + meta_config: &MetaConfig, ) -> Result<(Self, SystemParamsReader)> { let addr_strategy = Self::parse_meta_addr(meta_addr)?; tracing::info!("register meta client using strategy: {}", addr_strategy); - let grpc_meta_client = GrpcMetaClient::new(addr_strategy).await?; + // Retry until reaching `max_heartbeat_interval_secs` + let retry_strategy = GrpcMetaClient::retry_strategy_to_bound( + Duration::from_secs(meta_config.max_heartbeat_interval_secs as u64), + true, + ); + + let init_result: Result<_> = tokio_retry::Retry::spawn(retry_strategy, || async { + let grpc_meta_client = GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?; + + let add_worker_resp = grpc_meta_client + .add_worker_node(AddWorkerNodeRequest { + worker_type: worker_type as i32, + host: Some(addr.to_protobuf()), + worker_node_parallelism: worker_node_parallelism as u64, + }) + .await?; + + let system_params_resp = grpc_meta_client + .get_system_params(GetSystemParamsRequest {}) + .await?; + + Ok((add_worker_resp, system_params_resp, grpc_meta_client)) + }) + .await; + + let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?; - let add_worker_request = AddWorkerNodeRequest { - worker_type: worker_type as i32, - host: Some(addr.to_protobuf()), - worker_node_parallelism: worker_node_parallelism as u64, - }; - let add_worker_resp = - tokio_retry::Retry::spawn(GrpcMetaClient::retry_strategy_for_request(), || async { - let request = add_worker_request.clone(); - grpc_meta_client.add_worker_node(request).await - }) - .await?; let worker_node = add_worker_resp .node .expect("AddWorkerNodeResponse::node is empty"); - let system_params_request = GetSystemParamsRequest {}; - let system_params_resp = - tokio_retry::Retry::spawn(GrpcMetaClient::retry_strategy_for_request(), || async { - let request = system_params_request.clone(); - grpc_meta_client.get_system_params(request).await - }) - .await?; - Ok(( Self { worker_id: worker_node.id, worker_type, host_addr: addr.clone(), inner: grpc_meta_client, + meta_config: meta_config.to_owned(), }, system_params_resp.params.unwrap().into(), )) @@ -233,7 +252,10 @@ impl MetaClient { let request = ActivateWorkerNodeRequest { host: Some(addr.to_protobuf()), }; - let retry_strategy = GrpcMetaClient::retry_strategy_for_request(); + let retry_strategy = GrpcMetaClient::retry_strategy_to_bound( + Duration::from_secs(self.meta_config.max_heartbeat_interval_secs as u64), + true, + ); tokio_retry::Retry::spawn(retry_strategy, || async { let request = request.clone(); self.inner.activate_worker_node(request).await @@ -1107,14 +1129,15 @@ struct MetaMemberGroup { members: LruCache>, } -struct ElectionMemberManagement { +struct MetaMemberManagement { core_ref: Arc>, members: Either, current_leader: String, + meta_config: MetaConfig, } -impl ElectionMemberManagement { - const ELECTION_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5); +impl MetaMemberManagement { + const META_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5); fn host_address_to_url(addr: HostAddress) -> String { format!("http://{}:{}", addr.host, addr.port) @@ -1193,8 +1216,17 @@ impl ElectionMemberManagement { if discovered_leader != self.current_leader { tracing::info!("new meta leader {} discovered", discovered_leader); - let (channel, _) = - GrpcMetaClient::try_build_rpc_channel(vec![discovered_leader.clone()]).await?; + + let retry_strategy = GrpcMetaClient::retry_strategy_to_bound( + Duration::from_secs(self.meta_config.meta_leader_lease_secs), + false, + ); + + let channel = tokio_retry::Retry::spawn(retry_strategy, || async { + let endpoint = GrpcMetaClient::addr_to_endpoint(discovered_leader.clone())?; + GrpcMetaClient::connect_to_endpoint(endpoint).await + }) + .await?; self.recreate_core(channel).await; self.current_leader = discovered_leader; @@ -1206,44 +1238,39 @@ impl ElectionMemberManagement { } impl GrpcMetaClient { - // Retry base interval in ms for connecting to meta server. - const CONN_RETRY_BASE_INTERVAL_MS: u64 = 100; - // Max retry interval in ms for connecting to meta server. - const CONN_RETRY_MAX_INTERVAL_MS: u64 = 5000; // See `Endpoint::http2_keep_alive_interval` const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60; // See `Endpoint::keep_alive_timeout` const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60; - // Max retry times for request to meta server. - const REQUEST_RETRY_BASE_INTERVAL_MS: u64 = 50; + // Retry base interval in ms for connecting to meta server. + const INIT_RETRY_BASE_INTERVAL_MS: u64 = 50; // Max retry times for connecting to meta server. - const REQUEST_RETRY_MAX_ATTEMPTS: usize = 10; - // Max retry interval in ms for request to meta server. - const REQUEST_RETRY_MAX_INTERVAL_MS: u64 = 5000; + const INIT_RETRY_MAX_INTERVAL_MS: u64 = 5000; async fn start_meta_member_monitor( &self, init_leader_addr: String, members: Either, force_refresh_receiver: Receiver>>, + meta_config: MetaConfig, ) -> Result<()> { let core_ref = self.core.clone(); let current_leader = init_leader_addr; let enable_period_tick = matches!(members, Either::Right(_)); - let member_management = ElectionMemberManagement { + let member_management = MetaMemberManagement { core_ref, members, current_leader, + meta_config, }; let mut force_refresh_receiver = force_refresh_receiver; tokio::spawn(async move { let mut member_management = member_management; - let mut ticker = - time::interval(ElectionMemberManagement::ELECTION_MEMBER_REFRESH_PERIOD); + let mut ticker = time::interval(MetaMemberManagement::META_MEMBER_REFRESH_PERIOD); loop { let result_sender: Option>> = if enable_period_tick { @@ -1257,7 +1284,7 @@ impl GrpcMetaClient { let tick_result = member_management.refresh_members().await; if let Err(e) = tick_result.as_ref() { - tracing::warn!("refresh election client failed {}", e); + tracing::warn!("refresh meta member client failed {}", e); } if let Some(sender) = result_sender { @@ -1282,8 +1309,8 @@ impl GrpcMetaClient { } /// Connect to the meta server from `addrs`. - pub async fn new(strategy: MetaAddressStrategy) -> Result { - let (channel, addr) = match &strategy { + pub async fn new(strategy: &MetaAddressStrategy, config: MetaConfig) -> Result { + let (channel, addr) = match strategy { MetaAddressStrategy::LoadBalance(addr) => { Self::try_build_rpc_channel(vec![addr.clone()]).await } @@ -1296,7 +1323,7 @@ impl GrpcMetaClient { }; let meta_member_client = client.core.read().await.meta_member_client.clone(); - let members = match &strategy { + let members = match strategy { MetaAddressStrategy::LoadBalance(_) => Either::Left(meta_member_client), MetaAddressStrategy::List(addrs) => { let mut members = LruCache::new(20); @@ -1310,12 +1337,10 @@ impl GrpcMetaClient { }; client - .start_meta_member_monitor(addr, members, force_refresh_receiver) + .start_meta_member_monitor(addr, members, force_refresh_receiver, config) .await?; - if let Err(e) = client.force_refresh_leader().await { - tracing::warn!("force refresh leader failed {}, init leader may failed", e); - } + client.force_refresh_leader().await?; Ok(client) } @@ -1332,35 +1357,27 @@ impl GrpcMetaClient { .map(|addr| Self::addr_to_endpoint(addr.clone()).map(|endpoint| (endpoint, addr))) .try_collect()?; - let retry_strategy = ExponentialBackoff::from_millis(Self::CONN_RETRY_BASE_INTERVAL_MS) - .max_delay(Duration::from_millis(Self::CONN_RETRY_MAX_INTERVAL_MS)) - .map(jitter); - - let channel = tokio_retry::Retry::spawn(retry_strategy, || async { - let endpoints = endpoints.clone(); + let endpoints = endpoints.clone(); - for (endpoint, addr) in endpoints { - match Self::connect_to_endpoint(endpoint).await { - Ok(channel) => { - tracing::info!("Connect to meta server {} successfully", addr); - return Ok((channel, addr)); - } - Err(e) => { - tracing::warn!( - "Failed to connect to meta server {}, trying again: {}", - addr, - e - ) - } + for (endpoint, addr) in endpoints { + match Self::connect_to_endpoint(endpoint).await { + Ok(channel) => { + tracing::info!("Connect to meta server {} successfully", addr); + return Ok((channel, addr)); + } + Err(e) => { + tracing::warn!( + "Failed to connect to meta server {}, trying again: {}", + addr, + e + ) } } + } - Err(RpcError::Internal(anyhow!( - "Failed to connect to meta server" - ))) - }) - .await?; - Ok(channel) + Err(RpcError::Internal(anyhow!( + "Failed to connect to meta server" + ))) } async fn connect_to_endpoint(endpoint: Endpoint) -> Result { @@ -1373,12 +1390,25 @@ impl GrpcMetaClient { .map_err(RpcError::TransportError) } - /// Return retry strategy for retrying meta requests. - pub fn retry_strategy_for_request() -> impl Iterator { - ExponentialBackoff::from_millis(Self::REQUEST_RETRY_BASE_INTERVAL_MS) - .max_delay(Duration::from_millis(Self::REQUEST_RETRY_MAX_INTERVAL_MS)) - .map(jitter) - .take(Self::REQUEST_RETRY_MAX_ATTEMPTS) + pub(crate) fn retry_strategy_to_bound( + high_bound: Duration, + exceed: bool, + ) -> impl Iterator { + let iter = ExponentialBackoff::from_millis(Self::INIT_RETRY_BASE_INTERVAL_MS) + .max_delay(Duration::from_millis(Self::INIT_RETRY_MAX_INTERVAL_MS)) + .map(jitter); + + let mut sum = Duration::default(); + + iter.take_while(move |duration| { + sum += *duration; + + if exceed { + sum < high_bound + *duration + } else { + sum < high_bound + } + }) } } diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index d22e1b2c9fb0..c31efc84faa6 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -70,9 +70,11 @@ pub async fn compactor_serve( WorkerType::Compactor, &advertise_addr, 0, + &config.meta, ) .await .unwrap(); + info!("Assigned compactor id {}", meta_client.worker_id()); meta_client.activate(&advertise_addr).await.unwrap(); diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 1544214751ae..b1004ead0cde 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -26,7 +26,9 @@ use clap::Parser; use futures::TryStreamExt; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; -use risingwave_common::config::{extract_storage_memory_config, load_config, NO_OVERRIDE}; +use risingwave_common::config::{ + extract_storage_memory_config, load_config, MetaConfig, NO_OVERRIDE, +}; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, FIRST_VERSION_ID}; @@ -230,13 +232,14 @@ async fn init_metadata_for_replay( // filter key manager will fail to acquire a key extractor. tokio::time::sleep(Duration::from_secs(2)).await; + let meta_config = MetaConfig::default(); let meta_client: MetaClient; tokio::select! { _ = tokio::signal::ctrl_c() => { tracing::info!("Ctrl+C received, now exiting"); std::process::exit(0); }, - ret = MetaClient::register_new(cluster_meta_endpoint, WorkerType::RiseCtl, advertise_addr, 0) => { + ret = MetaClient::register_new(cluster_meta_endpoint, WorkerType::RiseCtl, advertise_addr, 0, &meta_config) => { (meta_client, _) = ret.unwrap(); }, } @@ -246,8 +249,14 @@ async fn init_metadata_for_replay( let tables = meta_client.risectl_list_state_tables().await?; - let (new_meta_client, _) = - MetaClient::register_new(new_meta_endpoint, WorkerType::RiseCtl, advertise_addr, 0).await?; + let (new_meta_client, _) = MetaClient::register_new( + new_meta_endpoint, + WorkerType::RiseCtl, + advertise_addr, + 0, + &meta_config, + ) + .await?; new_meta_client.activate(advertise_addr).await.unwrap(); if ci_mode { let table_to_check = tables.iter().find(|t| t.name == "nexmark_q7").unwrap(); @@ -277,6 +286,7 @@ async fn pull_version_deltas( WorkerType::RiseCtl, advertise_addr, 0, + &MetaConfig::default(), ) .await?; let worker_id = meta_client.worker_id(); @@ -325,9 +335,14 @@ async fn start_replay( // Register to the cluster. // We reuse the RiseCtl worker type here - let (meta_client, system_params) = - MetaClient::register_new(&opts.meta_address, WorkerType::RiseCtl, &advertise_addr, 0) - .await?; + let (meta_client, system_params) = MetaClient::register_new( + &opts.meta_address, + WorkerType::RiseCtl, + &advertise_addr, + 0, + &config.meta, + ) + .await?; let worker_id = meta_client.worker_id(); tracing::info!("Assigned replay worker id {}", worker_id); meta_client.activate(&advertise_addr).await.unwrap(); From 887abd810a0f55dbfbc7ee2773af872aec5cce6b Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 4 Apr 2023 13:45:54 +0800 Subject: [PATCH 10/12] Update pull-request.yml workflow: timeout increased to 25 min, misc check label & command changed. --- ci/workflows/pull-request.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index fefd3868d3d0..2f2912e829ad 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -286,7 +286,7 @@ steps: # files: "*-junit.xml" # format: "junit" - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 18 + timeout_in_minutes: 25 - label: "misc check" command: "ci/scripts/misc-check.sh" From 0d8c2836f7c7debc66c6948e4d3662bc27596674 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Tue, 4 Apr 2023 14:21:30 +0800 Subject: [PATCH 11/12] Simplify async subscription retry in `meta_client.rs` --- src/rpc_client/src/meta_client.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index ab0159215d77..99f066cfd102 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -123,16 +123,11 @@ impl MetaClient { true, ); - let result = tokio_retry::Retry::spawn(retry_strategy, || async { + tokio_retry::Retry::spawn(retry_strategy, || async { let request = request.clone(); self.inner.subscribe(request).await }) - .await; - if let Err(_e) = result.as_ref() { - tracing::error!("debug error {}", self.host_addr.to_string()) - } - - result + .await } pub async fn create_connection(&self, req: create_connection_request::Payload) -> Result { From f77570b11aa8123d1f8a021834187211c059d94e Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Thu, 6 Apr 2023 14:06:06 +0800 Subject: [PATCH 12/12] Downgraded "lru" to 0.7.6. --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index f6e1f73bf93a..adac570e684f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6524,7 +6524,7 @@ dependencies = [ "futures", "glob", "itertools", - "lru 0.10.0", + "lru 0.7.6", "madsim", "madsim-aws-sdk-s3", "madsim-etcd-client",