Skip to content

Commit

Permalink
Add metrics for request size and row count
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Jun 20, 2024
1 parent 01080d0 commit 34556d4
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 24 deletions.
51 changes: 37 additions & 14 deletions src/frontend/flight/sync/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ use metrics::{
Gauge, Histogram,
};

const IN_MEMORY_SIZE: &str = "seafowl_sync_writer_in_memory_size";
const IN_MEMORY_ROWS: &str = "seafowl_sync_writer_in_memory_rows";
const REQUEST: &str = "seafowl_sync_writer_request";
const IN_MEMORY: &str = "seafowl_sync_writer_in_memory";
const COMPACTION_TIME: &str = "seafowl_sync_writer_compaction_time";
const COMPACTED: &str = "seafowl_sync_writer_compacted";
const FLUSHING_TIME: &str = "seafowl_sync_writer_flushing_time";
const FLUSHED: &str = "seafowl_sync_writer_flushed";
const UNIT_LABEL: &str = "unit";
const UNIT_SIZE: &str = "size";
const UNIT_ROWS: &str = "rows";

#[derive(Clone)]
pub struct SyncMetrics {
pub in_memory_size: Gauge,
pub in_memory_rows: Gauge,
pub compaction_time: Histogram,
pub flushing_time: Histogram,
}
Expand All @@ -26,12 +27,18 @@ impl Default for SyncMetrics {

impl SyncMetrics {
fn new() -> Self {
describe_counter!(
REQUEST,
"The total byte size and row count of of all batches in the sync message"
);
describe_gauge!(
IN_MEMORY_SIZE,
"The total size of all pending syncs in bytes"
IN_MEMORY,
"The total byte size and row count of all pending syncs"
);
describe_histogram!(
COMPACTION_TIME,
"The time taken to compact a single sync message"
);
describe_gauge!(IN_MEMORY_ROWS, "The total row count of all pending syncs");
describe_histogram!(COMPACTION_TIME, "The time taken to compact a sync");
describe_counter!(
COMPACTED,
"The reduction in rows and size due to batch compaction"
Expand All @@ -43,30 +50,46 @@ impl SyncMetrics {
describe_counter!(FLUSHED, "The total rows and size flushed");

Self {
in_memory_size: gauge!(IN_MEMORY_SIZE),
in_memory_rows: gauge!(IN_MEMORY_ROWS),
compaction_time: histogram!(COMPACTION_TIME),
flushing_time: histogram!(FLUSHING_TIME),
}
}

pub fn request_size(&self, size: u64) {
let request_size = counter!(REQUEST, UNIT_LABEL => UNIT_SIZE);
request_size.increment(size);
}

pub fn request_rows(&self, rows: u64) {
let request_rows = counter!(REQUEST, UNIT_LABEL => UNIT_ROWS);
request_rows.increment(rows);
}

pub fn in_memory_size(&self) -> Gauge {
gauge!(IN_MEMORY, UNIT_LABEL => UNIT_SIZE)
}

pub fn in_memory_rows(&self) -> Gauge {
gauge!(IN_MEMORY, UNIT_LABEL => UNIT_ROWS)
}

pub fn compacted_size(&self, size: u64) {
let compacted_size = counter!(COMPACTED, "unit" => "size");
let compacted_size = counter!(COMPACTED, UNIT_LABEL => UNIT_SIZE);
compacted_size.increment(size);
}

pub fn compacted_rows(&self, rows: u64) {
let compacted_rows = counter!(COMPACTED, "unit" => "rows");
let compacted_rows = counter!(COMPACTED, UNIT_LABEL => UNIT_ROWS);
compacted_rows.increment(rows);
}

pub fn flushed_size(&self, size: u64) {
let flushed_size = counter!(FLUSHED, "unit" => "size");
let flushed_size = counter!(FLUSHED, UNIT_LABEL => UNIT_SIZE);
flushed_size.increment(size);
}

pub fn flushed_rows(&self, rows: u64) {
let flushed_rows = counter!(FLUSHED, "unit" => "rows");
let flushed_rows = counter!(FLUSHED, UNIT_LABEL => UNIT_ROWS);
flushed_rows.increment(rows);
}
}
22 changes: 12 additions & 10 deletions src/frontend/flight/sync/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,14 @@ impl SeafowlDataSyncWriter {
.or_insert(IndexMap::from([(sequence_number, sequence)]));

// Compactify the batches and measure the time it took and the reduction in rows/size
let old_rows = batches
.iter()
.fold(0, |rows, batch| rows + batch.num_rows());
let old_size = batches
.iter()
.fold(0, |size, batch| size + batch.get_array_memory_size());
let (old_size, old_rows) = batches.iter().fold((0, 0), |(size, rows), batch| {
(
size + batch.get_array_memory_size(),
rows + batch.num_rows(),
)
});
self.metrics.request_size(old_size as u64);
self.metrics.request_rows(old_rows as u64);
let start = Instant::now();
let batch = compact_batches(&sync_schema, batches)?;
let duration = start.elapsed().as_millis();
Expand Down Expand Up @@ -233,8 +235,8 @@ impl SeafowlDataSyncWriter {

// Update the total size and metrics
self.size += size;
self.metrics.in_memory_size.increment(size as f64);
self.metrics.in_memory_rows.increment(rows as f64);
self.metrics.in_memory_size().increment(size as f64);
self.metrics.in_memory_rows().increment(rows as f64);
self.metrics.compaction_time.record(duration as f64);
self.metrics.compacted_size((old_size - size) as u64);
self.metrics.compacted_rows((old_rows - rows) as u64);
Expand Down Expand Up @@ -611,8 +613,8 @@ impl SeafowlDataSyncWriter {
fn remove_sync(&mut self, url: &String) {
if let Some(sync) = self.syncs.shift_remove(url) {
self.size -= sync.size;
self.metrics.in_memory_size.decrement(sync.size as f64);
self.metrics.in_memory_rows.decrement(sync.rows as f64);
self.metrics.in_memory_size().decrement(sync.size as f64);
self.metrics.in_memory_rows().decrement(sync.rows as f64);
}
}

Expand Down

0 comments on commit 34556d4

Please sign in to comment.