Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(risedev): support multiple etcd and multiple meta node #6421

Merged
merged 3 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 43 additions & 9 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ risedev:
# If you want to use the local s3 storage, enable the following line
# - use: minio

# If you want to use aws-s3, configure AK and SK in env var and enable the following lines:
# If you want to use aws-s3, configure AK and SK in env var and enable the following lines:
# - use: aws-s3

# bucket: test-bucket

# If you want to use other s3 compatible object store, open this flag:
# s3-compatible: false

# bucket: test-bucket
# If you want to use other s3 compatible object store, open this flag:
# s3-compatible: false

# if you want to enable etcd backend, uncomment the following lines.
# - use: etcd
Expand Down Expand Up @@ -130,6 +127,40 @@ risedev:
- use: kafka
persist-data: true

3etcd-3meta-1cn-1fe:
- use: etcd
unsafe-no-fsync: true
port: 2388
peer-port: 2389
exporter-port: 2379
- use: etcd
unsafe-no-fsync: true
port: 12388
peer-port: 12389
exporter-port: 12379
- use: etcd
unsafe-no-fsync: true
port: 22388
peer-port: 22389
exporter-port: 22379
- use: meta-node
unsafe-disable-recovery: true
port: 5690
dashboard-port: 5691
exporter-port: 1250
- use: meta-node
unsafe-disable-recovery: true
port: 15690
dashboard-port: 15691
exporter-port: 11250
- use: meta-node
unsafe-disable-recovery: true
port: 25690
dashboard-port: 25691
exporter-port: 21250
- use: compute-node
- use: frontend

##########################################
### Configuration used for ./risedev p ###
##########################################
Expand Down Expand Up @@ -500,7 +531,7 @@ template:

etcd:
# Id of this instance
id: "etcd"
id: etcd-${port}

# Advertise address of the single-node etcd.
address: "127.0.0.1"
Expand All @@ -520,6 +551,9 @@ template:
# Whether to enable fsync (NEVER SET TO TRUE IN PRODUCTION ENVIRONMENT!)
unsafe-no-fsync: false

# Other etcd nodes
provide-etcd: "etcd*"

compute-node:
# Compute-node advertise address
address: "127.0.0.1"
Expand Down Expand Up @@ -755,7 +789,7 @@ template:

# access key, secret key and region should be set in aws config (either by env var or .aws/config)

# used to support other s3 compatible object store.
# used to support other s3 compatible object store.
s3-compatible: false

# Apache Kafka service
Expand Down
5 changes: 4 additions & 1 deletion src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ pub async fn register_leader_for_meta<S: MetaStore>(
if let Err(e) = meta_store.txn(txn).await {
match e {
MetaStoreError::TransactionAbort() => {
panic!("keep lease failed, another node has become new leader");
tracing::error!(
"keep lease failed, another node has become new leader"
);
futures::future::pending::<()>().await;
}
MetaStoreError::Internal(e) => {
tracing::warn!(
Expand Down
2 changes: 1 addition & 1 deletion src/risedevtool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ clap = { version = "3", features = ["derive"] }
console = "0.15"
dialoguer = "0.10"
enum-iterator = "1"
indicatif = "0.16"
indicatif = "0.17"
isahc = { version = "1", default-features = false, features = ["text-decoding"] }
itertools = "0.10"
kafka = { version = "0.9", default-features = false }
Expand Down
38 changes: 14 additions & 24 deletions src/risedevtool/src/bin/risedev-dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ use std::fs::{File, OpenOptions};
use std::io::Read;
use std::path::Path;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};

use anyhow::{anyhow, Result};
use console::style;
use indicatif::{MultiProgress, ProgressBar};
use indicatif::ProgressBar;
use risedev::util::{complete_spin, fail_spin};
use risedev::{
compute_risectl_env, preflight_check, AwsS3Config, CompactorService, ComputeNodeService,
Expand All @@ -37,9 +36,7 @@ use yaml_rust::YamlEmitter;

#[derive(Default)]
pub struct ProgressManager {
mp: Arc<MultiProgress>,
pa: Vec<ProgressBar>,
insert: Option<usize>,
pa: Option<ProgressBar>,
}

impl ProgressManager {
Expand All @@ -49,30 +46,21 @@ impl ProgressManager {

/// Create a new progress bar from task
pub fn new_progress(&mut self) -> ProgressBar {
let pb = risedev::util::new_spinner();
if let Some(ref mut insert) = self.insert {
self.mp.insert(*insert, pb.clone());
*insert += 1;
} else {
self.mp.add(pb.clone());
self.insert = Some(0);
if let Some(ref pa) = self.pa {
pa.finish();
}
self.pa.push(pb.clone());
pb.enable_steady_tick(100);
let pb = risedev::util::new_spinner();
pb.enable_steady_tick(Duration::from_millis(100));
self.pa = Some(pb.clone());
pb
}

/// Finish all progress bars.
pub fn finish_all(&self) {
for p in &self.pa {
p.finish();
if let Some(ref pa) = self.pa {
pa.finish();
}
}

pub fn spawn(&self) -> JoinHandle<anyhow::Result<()>> {
let mp = self.mp.clone();
std::thread::spawn(move || mp.join().map_err(|err| err.into()))
}
}

fn task_main(
Expand Down Expand Up @@ -164,7 +152,11 @@ fn task_main(
let mut service = risedev::EtcdService::new(c.clone())?;
service.execute(&mut ctx)?;

let mut task = risedev::EtcdReadyCheckTask::new(c.clone())?;
// let mut task = risedev::EtcdReadyCheckTask::new(c.clone())?;
// TODO(chi): etcd will set its health check to success only after all nodes are
// connected and there's a leader, therefore we cannot do health check for now.
let mut task =
risedev::ConfigureGrpcNodeTask::new(c.address.clone(), c.port, false)?;
task.execute(&mut ctx)?;
}
ServiceConfig::Prometheus(c) => {
Expand Down Expand Up @@ -363,7 +355,6 @@ fn main() -> Result<()> {
steps.len(),
task_name
));
let join_handle = manager.spawn();
let task_result = task_main(&mut manager, &steps, &services);

match task_result {
Expand All @@ -383,7 +374,6 @@ fn main() -> Result<()> {
}
}
manager.finish_all();
join_handle.join().unwrap()?;

match task_result {
Ok((stat, log_buffer)) => {
Expand Down
2 changes: 2 additions & 0 deletions src/risedevtool/src/service_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ pub struct EtcdConfig {
pub unsafe_no_fsync: bool,

pub exporter_port: u16,

pub provide_etcd: Option<Vec<EtcdConfig>>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand Down
19 changes: 18 additions & 1 deletion src/risedevtool/src/task/etcd_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::path::{Path, PathBuf};
use std::process::Command;

use anyhow::{anyhow, Result};
use itertools::Itertools;

use crate::{EtcdConfig, Task};

Expand Down Expand Up @@ -65,7 +66,23 @@ impl EtcdService {
.arg("--auto-compaction-retention")
.arg("1m")
.arg("--snapshot-count")
.arg("10000");
.arg("10000")
.arg("--name")
.arg(&config.id)
.arg("--initial-cluster-token")
.arg("risingwave-etcd")
.arg("--initial-cluster-state")
.arg("new")
.arg("--initial-cluster")
.arg(
config
.provide_etcd
.as_ref()
.unwrap()
.iter()
.map(|x| format!("{}=http://{}:{}", x.id, x.address, x.peer_port))
.join(","),
);

if config.unsafe_no_fsync {
cmd.arg("--unsafe-no-fsync");
Expand Down
30 changes: 13 additions & 17 deletions src/risedevtool/src/task/frontend_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,20 @@ impl FrontendService {
.arg("1");

let provide_meta_node = config.provide_meta_node.as_ref().unwrap();
match provide_meta_node.len() {
0 => {
return Err(anyhow!(
"Cannot configure node: no meta node found in this configuration."
));
}
1 => {
let meta_node = &provide_meta_node[0];
cmd.arg("--meta-addr")
.arg(format!("http://{}:{}", meta_node.address, meta_node.port));
}
other_size => {
return Err(anyhow!(
"Cannot configure node: {} meta nodes found in this configuration, but only 1 is needed.",
other_size
));
if provide_meta_node.is_empty() {
return Err(anyhow!(
"Cannot configure node: no meta node found in this configuration."
));
} else {
let meta_node = provide_meta_node.last().unwrap();
cmd.arg("--meta-addr")
.arg(format!("http://{}:{}", meta_node.address, meta_node.port));
if provide_meta_node.len() > 1 {
eprintln!("WARN: more than 1 meta node instance is detected, only using the last one for meta node.");
// According to some heruistics, the last etcd node seems always to be elected as
// leader. Therefore we ensure compute node can start by using the last one.
}
};
}

Ok(())
}
Expand Down
7 changes: 5 additions & 2 deletions src/risedevtool/src/task/meta_node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@ impl MetaNodeService {
Some([]) => {
cmd.arg("--backend").arg("mem");
}
Some([etcd]) => {
Some(etcds) => {
cmd.arg("--backend")
.arg("etcd")
.arg("--etcd-endpoints")
.arg(format!("{}:{}", etcd.address, etcd.port));
.arg(format!("{}:{}", etcds[0].address, etcds[0].port));
if etcds.len() > 1 {
eprintln!("WARN: more than 1 etcd instance is detected, only using the first one for meta node.");
}
}
_ => {
return Err(anyhow!(
Expand Down
18 changes: 10 additions & 8 deletions src/risedevtool/src/task/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@ pub fn add_meta_node(provide_meta_node: &[MetaNodeConfig], cmd: &mut Command) ->
"Cannot configure node: no meta node found in this configuration."
));
}
[meta_node] => {
cmd.arg("--meta-address")
.arg(format!("http://{}:{}", meta_node.address, meta_node.port));
}
other_meta_nodes => {
return Err(anyhow!(
"Cannot configure node: {} meta nodes found in this configuration, but only 1 is needed.",
other_meta_nodes.len()
meta_nodes => {
cmd.arg("--meta-address").arg(format!(
"http://{}:{}",
meta_nodes.last().unwrap().address,
meta_nodes.last().unwrap().port
));
if meta_nodes.len() > 1 {
eprintln!("WARN: more than 1 meta node instance is detected, only using the last one for meta node.");
// According to some heruistics, the last etcd node seems always to be elected as
// leader. Therefore we ensure compute node can start by using the last one.
}
}
};

Expand Down
18 changes: 15 additions & 3 deletions src/risedevtool/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,28 @@ pub fn get_program_env_cmd(cmd: &Command) -> String {

pub fn new_spinner() -> ProgressBar {
let pb = ProgressBar::new(0);
pb.set_style(ProgressStyle::default_spinner().template("{spinner} {prefix}: {msg}"));
pb.set_style(
ProgressStyle::default_spinner()
.template("{spinner} {prefix}: {msg}")
.unwrap(),
);
pb
}

pub fn complete_spin(pb: &ProgressBar) {
pb.set_style(ProgressStyle::default_spinner().template("✅ {prefix}: {msg}"));
pb.set_style(
ProgressStyle::default_spinner()
.template("✅ {prefix}: {msg}")
.unwrap(),
);
}

pub fn fail_spin(pb: &ProgressBar) {
pb.set_style(ProgressStyle::default_spinner().template("❗ {prefix}: {msg}"));
pb.set_style(
ProgressStyle::default_spinner()
.template("❗ {prefix}: {msg}")
.unwrap(),
);
}

pub fn is_env_set(var: &str) -> bool {
Expand Down