Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: telemetry #7384

Merged
merged 54 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
37871ec
boil telemetry
odysa Jan 14, 2023
b6d5252
sys info and ticker
odysa Jan 14, 2023
0cac0e8
add telemetry starting
odysa Jan 16, 2023
f8b7eac
move to start_leader_srv
odysa Jan 16, 2023
085d146
post telemetry report
odysa Jan 16, 2023
396bb50
add telemetry_enabled
odysa Jan 17, 2023
de56427
refactor telemetry loop
odysa Jan 17, 2023
17b0842
add telemetry fail test
odysa Jan 17, 2023
991135a
add telemetry rpc
odysa Jan 19, 2023
e3f347b
move telemetry to common
odysa Feb 7, 2023
b41da80
refactor reporting loop
odysa Feb 7, 2023
93c1c9b
add telemetry rpc service
odysa Feb 8, 2023
eb3ef04
Merge branch 'main' into feature/telemetry
odysa Feb 8, 2023
be5e248
refactor telemetry config
odysa Feb 8, 2023
a096021
refactor telemetry report
odysa Feb 8, 2023
6caee0c
add frontend telemetry
odysa Feb 10, 2023
8453623
refactor report creator
odysa Feb 13, 2023
50fd0bd
refactor url
odysa Feb 15, 2023
f3d2c05
Merge branch 'main' into feature/telemetry
odysa Feb 15, 2023
87ef555
etcd config
odysa Feb 16, 2023
93e638c
incorpa config
odysa Feb 16, 2023
433d122
fix lience header
odysa Feb 16, 2023
31dc7fe
add post telemetry tests
odysa Feb 16, 2023
c14902b
fix dashboard proto
odysa Feb 16, 2023
c29d8ba
add to system params
odysa Feb 23, 2023
99b9f71
Merge branch 'main' into feature/telemetry
odysa Feb 24, 2023
bfc0c2c
merge main
odysa Mar 7, 2023
4dbd365
introduce telemetry mgr
odysa Mar 7, 2023
5ee5f8f
refacotor system params
odysa Mar 7, 2023
b7e9fb7
refactor meta reporting and telemetry info fetching
odysa Mar 8, 2023
317de3a
make sys mutable and test
odysa Mar 9, 2023
b93f89b
Merge branch 'main' into feature/telemetry
odysa Mar 9, 2023
21f44f4
fix proto lint error
odysa Mar 10, 2023
7dd86bf
Merge branch 'main' into feature/telemetry
odysa Mar 10, 2023
6472186
try to fix CI type infer error
odysa Mar 10, 2023
adc046d
update url
odysa Mar 14, 2023
16b968d
fix simulation tests
odysa Mar 14, 2023
58b29fd
update hakari
odysa Mar 14, 2023
f268cf3
Merge branch 'main' into feature/telemetry
odysa Mar 14, 2023
8e985a2
fix dependecies
odysa Mar 14, 2023
0a986ba
fix borrow issues
odysa Mar 14, 2023
4a5b39e
Merge branch 'main' into feature/telemetry
odysa Mar 14, 2023
2bd09ca
Merge branch 'main' into feature/telemetry
odysa Mar 15, 2023
41358ea
add tracking_id to backup
odysa Mar 15, 2023
d944b6c
interval and starting change
odysa Mar 16, 2023
cfa3324
Merge branch 'main' into feature/telemetry
odysa Mar 17, 2023
3e54af6
disable in risedev and revert backup
odysa Mar 17, 2023
013b392
disable in integration tests
odysa Mar 17, 2023
5c67e35
fix integration test env
odysa Mar 17, 2023
dec9296
fix check
odysa Mar 17, 2023
d0f8933
Merge branch 'main' into feature/telemetry
odysa Mar 18, 2023
68d0317
Merge branch 'main' into feature/telemetry
odysa Mar 21, 2023
0fe4322
refine env tests
odysa Mar 21, 2023
64778d7
fix meta reporting
odysa Mar 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
472 changes: 227 additions & 245 deletions Cargo.lock

Large diffs are not rendered by default.

54 changes: 54 additions & 0 deletions dashboard/proto/gen/meta.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ import "user.proto";
option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message GetTelemetryInfoRequest {}

message TelemetryInfoResponse {
optional string tracking_id = 1;
}

service TelemetryInfoService {
// Request telemetry info from meta node
rpc GetTelemetryInfo(GetTelemetryInfoRequest) returns (TelemetryInfoResponse);
}

message HeartbeatRequest {
message ExtraInfo {
oneof info {
Expand Down Expand Up @@ -344,6 +355,7 @@ message SystemParams {
optional string data_directory = 7;
optional string backup_storage_url = 8;
optional string backup_storage_directory = 9;
optional bool telemetry_enabled = 10;
}

message GetSystemParamsRequest {}
Expand Down
2 changes: 2 additions & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ prometheus = { version = "0.13" }
prost = "0.11"
rand = "0.8"
regex = "1"
reqwest = { version = "0.11", features = ["json"] }
risingwave_pb = { path = "../prost" }
rust_decimal = { version = "1", features = ["db-tokio-postgres"] }
ryu = "1.0"
Expand All @@ -64,6 +65,7 @@ toml = "0.5"
tonic = { version = "0.2", package = "madsim-tonic" }
tracing = "0.1"
url = "2"
uuid = "1.2.2"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ pub struct ServerConfig {
/// >0 = open metrics
pub metrics_level: u32,

#[serde(default = "default::server::telemetry_enabled")]
pub telemetry_enabled: bool,
odysa marked this conversation as resolved.
Show resolved Hide resolved

#[serde(flatten)]
pub unrecognized: HashMap<String, Value>,
}
Expand Down Expand Up @@ -572,6 +575,10 @@ mod default {
pub fn metrics_level() -> u32 {
0
}

pub fn telemetry_enabled() -> bool {
true
}
}

pub mod storage {
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub mod monitor;
pub mod row;
pub mod session_config;
pub mod system_param;
pub mod telemetry;

#[cfg(test)]
pub mod test_utils;
pub mod types;
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ macro_rules! for_all_undeprecated_params {
{ data_directory, String, "hummock_001".to_string() },
{ backup_storage_url, String, "memory".to_string() },
{ backup_storage_directory, String, "backup".to_string() },
{ telemetry_enabled, bool, true},
$({ $field, $type, $default },)*
}
};
Expand Down Expand Up @@ -291,6 +292,10 @@ impl ValidateOnSet for OverrideValidateOnSet {
// TODO
Ok(())
}

fn telemetry_enabled(_: &bool) -> Result<()> {
Ok(())
}
}

for_all_undeprecated_params!(impl_default_from_other_params);
Expand All @@ -315,6 +320,7 @@ mod tests {
(DATA_DIRECTORY_KEY, "a"),
(BACKUP_STORAGE_URL_KEY, "a"),
(BACKUP_STORAGE_DIRECTORY_KEY, "a"),
(TELEMETRY_ENABLED_KEY, "false"),
];

// To kv - missing field.
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/system_param/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ impl SystemParamsReader {
self.prost.backup_storage_directory.as_ref().unwrap()
}

pub fn telemetry_enabled(&self) -> bool {
self.prost.telemetry_enabled.unwrap()
}

pub fn to_kv(&self) -> Vec<(String, String)> {
system_params_to_kv(&self.prost).unwrap()
}
Expand Down
163 changes: 163 additions & 0 deletions src/common/src/telemetry/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use parking_lot::RwLock;
use tokio::select;
use tokio::sync::oneshot::{self, Sender};
use tokio::sync::watch::Receiver;
use tokio::task::JoinHandle;

use super::report::{start_telemetry_reporting, TelemetryInfoFetcher, TelemetryReportCreator};
use crate::system_param::local_manager::SystemParamsReaderRef;
use crate::telemetry::telemetry_env_enabled;

pub struct TelemetryManager<F, I>
where
F: TelemetryReportCreator + Send + Sync + 'static,
I: TelemetryInfoFetcher + Send + Sync + 'static,
{
core: Arc<RwLock<TelemetryManagerCore<F, I>>>,
sys_params_change_rx: Receiver<SystemParamsReaderRef>,
}

impl<F, I> TelemetryManager<F, I>
where
F: TelemetryReportCreator + Send + Sync + 'static,
I: TelemetryInfoFetcher + Send + Sync + 'static,
{
pub fn new(
sys_params_change_rx: Receiver<SystemParamsReaderRef>,
info_fetcher: Arc<I>,
report_creator: Arc<F>,
) -> Self {
Self {
core: Arc::new(RwLock::new(TelemetryManagerCore::new(
info_fetcher,
report_creator,
))),
sys_params_change_rx,
}
}

pub fn start_telemetry_reporting(&self) {
self.core.write().start();
}

pub fn watch_params_change(mut self) -> (JoinHandle<()>, Sender<()>) {
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let handle = tokio::spawn(async move {
loop {
select! {
Ok(_) = self.sys_params_change_rx.changed() => {
let telemetry_enabled = {
let params = self.sys_params_change_rx.borrow().load();
// check both environment variable and system params
// if either is false, then stop telemetry
params.telemetry_enabled() && telemetry_env_enabled()
};

let telemetry_running = {
let core = self.core.read();
core.telemetry_running()
};

match (telemetry_running, telemetry_enabled) {
(false, true) => {
tracing::info!("telemetry config changed to true, start reporting");
self.core.write().start();
}
(true, false) => {
tracing::info!("telemetry config changed to false, stop reporting");
self.core.write().stop();
}
_ => {}
};
}
,
_ = &mut shutdown_rx =>{
tracing::info!("Telemetry exit");
return;
}
}
}
});
(handle, shutdown_tx)
}
}

struct TelemetryManagerCore<F, I>
where
F: TelemetryReportCreator + Send + Sync + 'static,
I: TelemetryInfoFetcher + Send + Sync + 'static,
{
telemetry_handle: Option<JoinHandle<()>>,
telemetry_shutdown_tx: Option<Sender<()>>,
telemetry_running: Arc<AtomicBool>,
info_fetcher: Arc<I>,
report_creator: Arc<F>,
}

impl<F, I> TelemetryManagerCore<F, I>
where
F: TelemetryReportCreator + Send + Sync + 'static,
I: TelemetryInfoFetcher + Send + Sync + 'static,
{
fn new(info_fetcher: Arc<I>, report_creator: Arc<F>) -> Self {
Self {
telemetry_handle: None,
telemetry_shutdown_tx: None,
telemetry_running: Arc::new(AtomicBool::new(false)),
info_fetcher,
report_creator,
}
}

fn telemetry_running(&self) -> bool {
self.telemetry_running.load(Ordering::Relaxed)
}

fn start(&mut self) {
if self.telemetry_running() {
return;
}

let (handle, tx) =
start_telemetry_reporting(self.info_fetcher.clone(), self.report_creator.clone());
self.telemetry_handle = Some(handle);
self.telemetry_shutdown_tx = Some(tx);
self.telemetry_running.store(true, Ordering::Relaxed);
}

fn stop(&mut self) {
match (
self.telemetry_running.load(Ordering::Relaxed),
self.telemetry_shutdown_tx.take(),
self.telemetry_handle.take(),
) {
(true, Some(shutdown_rx), Some(_)) => {
if let Err(()) = shutdown_rx.send(()) {
tracing::error!("telemetry mgr failed to send stop signal");
} else {
self.telemetry_running.store(false, Ordering::Relaxed)
}
}
// do nothing if telemetry is not running
(false, None, None) => {}
_ => unreachable!("impossible telemetry handler"),
}
}
}