Skip to content

Commit

Permalink
Merge branch 'main' into zj/refine_cast_error
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Feb 22, 2023
2 parents c30487f + 05e7a0e commit 8bd6b66
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 194 deletions.
278 changes: 180 additions & 98 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions dashboard/proto/gen/common.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,15 @@ def section_cluster_node(panels):
)
],
),
panels.timeseries_count(
"Meta Cluster",
"",
[
panels.target(f"sum({metric('meta_num')}) by (worker_addr,role)",
"{{worker_addr}} @ {{role}}")
],
["last"],
),
]


Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ enum WorkerType {
COMPUTE_NODE = 2;
RISE_CTL = 3;
COMPACTOR = 4;
META = 5;
}

message ParallelUnit {
Expand Down
35 changes: 2 additions & 33 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use tokio::task::JoinHandle;

use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv};
use crate::model::{MetadataModel, Worker, INVALID_EXPIRE_AT};
use crate::rpc::metrics::MetaMetrics;
use crate::storage::MetaStore;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -87,38 +86,8 @@ where
self.core.read().await
}

pub async fn start_worker_num_monitor(
cluster_manager: ClusterManagerRef<S>,
interval: Duration,
meta_metrics: Arc<MetaMetrics>,
) -> (JoinHandle<()>, Sender<()>) {
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
let mut monitor_interval = tokio::time::interval(interval);
monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
// Wait for interval
_ = monitor_interval.tick() => {},
// Shutdown monitor
_ = &mut shutdown_rx => {
tracing::info!("Worker number monitor is stopped");
return;
}
}

for (worker_type, worker_num) in
cluster_manager.core.read().await.count_worker_node()
{
meta_metrics
.worker_num
.with_label_values(&[(worker_type.as_str_name())])
.set(worker_num as i64);
}
}
});

(join_handle, shutdown_tx)
pub async fn count_worker_node(&self) -> HashMap<WorkerType, u64> {
self.core.read().await.count_worker_node()
}

/// A worker node will immediately register itself to meta when it bootstraps.
Expand Down
66 changes: 65 additions & 1 deletion src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,22 @@
// limitations under the License.

use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;

use prometheus::{
exponential_buckets, histogram_opts, register_histogram_vec_with_registry,
register_histogram_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, Histogram,
HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use risingwave_pb::common::WorkerType;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;

use crate::manager::ClusterManagerRef;
use crate::rpc::server::ElectionClientRef;
use crate::storage::MetaStore;

pub struct MetaMetrics {
registry: Registry,
Expand All @@ -34,7 +43,7 @@ pub struct MetaMetrics {

/// Latency between each barrier send
pub barrier_send_latency: Histogram,
/// The number of all barriers. It is the sum of barreriers that are in-flight or completed but
/// The number of all barriers. It is the sum of barriers that are in-flight or completed but
/// waiting for other barriers
pub all_barrier_nums: IntGauge,
/// The number of in-flight barriers
Expand Down Expand Up @@ -81,6 +90,9 @@ pub struct MetaMetrics {
/// The number of workers in the cluster.
pub worker_num: IntGaugeVec,
pub compact_skip_frequency: IntCounterVec,

/// The roles of all meta nodes in the cluster.
pub meta_type: IntGaugeVec,
}

impl MetaMetrics {
Expand Down Expand Up @@ -256,6 +268,14 @@ impl MetaMetrics {
)
.unwrap();

let meta_type = register_int_gauge_vec_with_registry!(
"meta_num",
"role of meta nodes in the cluster",
&["worker_addr", "role"],
registry,
)
.unwrap();

Self {
registry,

Expand Down Expand Up @@ -286,6 +306,7 @@ impl MetaMetrics {
time_after_last_observation: AtomicU64::new(0),

worker_num,
meta_type,
}
}

Expand All @@ -298,3 +319,46 @@ impl Default for MetaMetrics {
Self::new()
}
}

pub async fn start_worker_info_monitor<S: MetaStore>(
cluster_manager: ClusterManagerRef<S>,
election_client: Option<ElectionClientRef>,
interval: Duration,
meta_metrics: Arc<MetaMetrics>,
) -> (JoinHandle<()>, Sender<()>) {
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
let mut monitor_interval = tokio::time::interval(interval);
monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
// Wait for interval
_ = monitor_interval.tick() => {},
// Shutdown monitor
_ = &mut shutdown_rx => {
tracing::info!("Worker number monitor is stopped");
return;
}
}

for (worker_type, worker_num) in cluster_manager.count_worker_node().await {
meta_metrics
.worker_num
.with_label_values(&[(worker_type.as_str_name())])
.set(worker_num as i64);
}
if let Some(client) = &election_client && let Ok(meta_members) = client.get_members().await {
meta_metrics
.worker_num
.with_label_values(&[WorkerType::Meta.as_str_name()])
.set(meta_members.len() as i64);
meta_members.into_iter().for_each(|m| {
let role = if m.is_leader {"leader"} else {"follower"};
meta_metrics.meta_type.with_label_values(&[&m.id, role]).set(1);
});
}
}
});

(join_handle, shutdown_tx)
}
7 changes: 4 additions & 3 deletions src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::manager::{
SystemParamManager,
};
use crate::rpc::election_client::{ElectionClient, EtcdElectionClient};
use crate::rpc::metrics::MetaMetrics;
use crate::rpc::metrics::{start_worker_info_monitor, MetaMetrics};
use crate::rpc::service::backup_service::BackupServiceImpl;
use crate::rpc::service::cluster_service::ClusterServiceImpl;
use crate::rpc::service::heartbeat_service::HeartbeatServiceImpl;
Expand Down Expand Up @@ -338,7 +338,7 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
.await
.unwrap();

let meta_member_srv = MetaMemberServiceImpl::new(match election_client {
let meta_member_srv = MetaMemberServiceImpl::new(match election_client.clone() {
None => Either::Right(address_info.clone()),
Some(election_client) => Either::Left(election_client),
});
Expand Down Expand Up @@ -509,8 +509,9 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
let mut sub_tasks =
hummock::start_hummock_workers(vacuum_manager, compaction_scheduler, &env.opts);
sub_tasks.push(
ClusterManager::start_worker_num_monitor(
start_worker_info_monitor(
cluster_manager.clone(),
election_client.clone(),
Duration::from_secs(env.opts.node_num_monitor_interval_sec),
meta_metrics.clone(),
)
Expand Down
2 changes: 1 addition & 1 deletion src/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fail = "0.5"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14"
itertools = "0.10"
opendal = "0.26.2"
opendal = "0.27.2"
prometheus = { version = "0.13", features = ["process"] }
random-string = "1.0"
risingwave_common = { path = "../common" }
Expand Down
39 changes: 17 additions & 22 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ impl ObjectStore for OpendalObjectStore {
/// Deletes the objects with the given paths permanently from the storage. If an object
/// specified in the request is not found, it will be considered as successfully deleted.
async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
for path in paths {
self.op.object(path).delete().await?;
}
self.op.batch().remove(paths.to_vec()).await?;
Ok(())
}

Expand Down Expand Up @@ -287,29 +285,26 @@ mod tests {
obj_store.delete(&path).await.unwrap();
}

// Currently OpenDAL does not support delete objects operation, but they are planning this
// feature. So let's not delete this unit test now. https://github.com/datafuselabs/opendal/issues/1279

// #[tokio::test]
// async fn test_memory_delete_objects() {
// let block1 = Bytes::from("123456");
// let block2 = Bytes::from("987654");
// let store = OpendalObjectStore::new_memory_engine().unwrap();
// store.upload("/abc", block1).await.unwrap();
// store.upload("/klm", block2).await.unwrap();
#[tokio::test]
async fn test_memory_delete_objects() {
let block1 = Bytes::from("123456");
let block2 = Bytes::from("987654");
let store = OpendalObjectStore::new_memory_engine().unwrap();
store.upload("abc", block1).await.unwrap();
store.upload("/klm", block2).await.unwrap();

// assert_eq!(store.list("").await.unwrap().len(), 2);
assert_eq!(store.list("").await.unwrap().len(), 2);

// let str_list = [
// String::from("/abc"),
// String::from("/klm"),
// String::from("/xyz"),
// ];
let str_list = [
String::from("abc"),
String::from("klm"),
String::from("xyz"),
];

// store.delete_objects(&str_list).await.unwrap();
store.delete_objects(&str_list).await.unwrap();

// assert_eq!(store.list("").await.unwrap().len(), 0);
// }
assert_eq!(store.list("").await.unwrap().len(), 0);
}

#[tokio::test]
async fn test_memory_read_multi_block() {
Expand Down
35 changes: 0 additions & 35 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use futures::future::try_join_all;
use futures::stream;
use hyper::Body;
use itertools::Itertools;
use random_string::generate;
use tokio::io::AsyncRead;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -292,7 +291,6 @@ pub struct S3ObjectStore {
part_size: usize,
/// For S3 specific metrics.
metrics: Arc<ObjectStoreMetrics>,
object_store_use_batch_delete: bool,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -433,12 +431,6 @@ impl ObjectStore for S3ObjectStore {
async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
// AWS restricts the number of objects per request to 1000.
const MAX_LEN: usize = 1000;
if !self.object_store_use_batch_delete {
for path in paths {
self.delete(path).await?;
}
return Ok(());
}

// If needed, split given set into subsets of size with no more than `MAX_LEN` objects.
for start_idx /* inclusive */ in (0..paths.len()).step_by(MAX_LEN) {
Expand Down Expand Up @@ -531,7 +523,6 @@ impl S3ObjectStore {
bucket,
part_size: S3_PART_SIZE,
metrics,
object_store_use_batch_delete: true,
}
}

Expand Down Expand Up @@ -566,36 +557,11 @@ impl S3ObjectStore {
let client = Client::new(&sdk_config);
Self::configure_bucket_lifecycle(&client, bucket.as_str()).await;

// check whether use batch delete
let charset = "1234567890";
let test_path = "risingwave_check_batch_delete/".to_string() + &generate(10, charset);
client
.put_object()
.bucket(&bucket)
.body(aws_sdk_s3::types::ByteStream::from(Bytes::from(
"test batch delete",
)))
.key(&test_path)
.send()
.await
.unwrap();
let obj_ids = vec![ObjectIdentifier::builder().key(&test_path).build()];

let delete_builder = Delete::builder().set_objects(Some(obj_ids));
let object_store_use_batch_delete = client
.delete_objects()
.bucket(&bucket)
.delete(delete_builder.build())
.send()
.await
.is_ok();

Self {
client,
bucket: bucket.to_string(),
part_size: S3_PART_SIZE,
metrics,
object_store_use_batch_delete,
}
}

Expand Down Expand Up @@ -630,7 +596,6 @@ impl S3ObjectStore {
bucket: bucket.to_string(),
part_size: MINIO_PART_SIZE,
metrics,
object_store_use_batch_delete: true,
}
}

Expand Down

0 comments on commit 8bd6b66

Please sign in to comment.