Skip to content

Commit

Permalink
batch insertion of gauges and counts
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 3418b6f commit f2d07eb
Showing 1 changed file with 26 additions and 24 deletions.
50 changes: 26 additions & 24 deletions lib/src/metrics/local_drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,11 +563,8 @@ impl LocalDrain {
let complete_key = format!("{}\t{}", key, timestamp);

trace!("store gauge at {} -> {}", complete_key, i);
if is_backend {
self.backend_tree.insert(complete_key.as_bytes(), &i.to_le_bytes())?;
} else {
self.cluster_tree.insert(complete_key.as_bytes(), &i.to_le_bytes())?;
}
let mut batch = sled::Batch::default();
batch.insert(complete_key.as_bytes(), &i.to_le_bytes());

// aggregate at the last hour
let second = now.second();
Expand All @@ -578,7 +575,7 @@ impl LocalDrain {
let complete_key = format!("{}\t{}", key, timestamp);

if !self.tree(is_backend).contains_key(complete_key.as_bytes())? {
self.tree(is_backend).insert(complete_key.as_bytes(), &i.to_le_bytes())?;
batch.insert(complete_key.as_bytes(), &i.to_le_bytes());
}

let minute = previous_minute.minute();
Expand All @@ -588,11 +585,13 @@ impl LocalDrain {

let complete_key = format!("{}\t{}", key, timestamp);
if !self.tree(is_backend).contains_key(complete_key.as_bytes())? {
self.tree(is_backend).insert(complete_key.as_bytes(), &i.to_le_bytes())?;
batch.insert(complete_key.as_bytes(), &i.to_le_bytes());
}
}
}

self.tree(is_backend).apply_batch(batch)?;

Ok(())
}

Expand All @@ -602,6 +601,8 @@ impl LocalDrain {
let complete_key = format!("{}\t{}", key, timestamp);

trace!("add gauge at {} -> {}", complete_key, i);
let mut batch = sled::Batch::default();

let value = match self.tree(is_backend).get(complete_key.as_bytes())? {
Some(v) => i64::from_le_bytes((*v).try_into().unwrap()),
// start from the last known value, or zero
Expand All @@ -614,7 +615,7 @@ impl LocalDrain {
};

let new_value = value + i;
self.tree(is_backend).insert(complete_key.as_bytes(), &new_value.to_le_bytes())?;
batch.insert(complete_key.as_bytes(), &new_value.to_le_bytes());

// aggregate at the last hour
let second = now.second();
Expand All @@ -636,7 +637,7 @@ impl LocalDrain {
};

let new_value = value + i;
self.tree(is_backend).insert(complete_key.as_bytes(), &new_value.to_le_bytes())?;
batch.insert(complete_key.as_bytes(), &new_value.to_le_bytes());

let minute = previous_minute.minute();
if minute != 0 {
Expand All @@ -657,10 +658,12 @@ impl LocalDrain {
};

let new_value = value + i;
self.tree(is_backend).insert(complete_key.as_bytes(), &new_value.to_le_bytes())?;
batch.insert(complete_key.as_bytes(), &new_value.to_le_bytes());
}
}

self.tree(is_backend).apply_batch(batch)?;

Ok(())
}

Expand All @@ -669,19 +672,16 @@ impl LocalDrain {
let timestamp = now.unix_timestamp();
let complete_key = format!("{}\t{}", key, timestamp);

let tree = if is_backend {
&mut self.backend_tree
} else {
&mut self.cluster_tree
};
trace!("store count at {} -> {}", complete_key, i);
match tree.get(complete_key.as_bytes())? {
let mut batch = sled::Batch::default();

match self.tree(is_backend).get(complete_key.as_bytes())? {
None => {
tree.insert(complete_key.as_bytes(), &i.to_le_bytes())?;
batch.insert(complete_key.as_bytes(), &i.to_le_bytes());
},
Some(v) => {
let i2 = i64::from_le_bytes((*v).try_into().unwrap());
tree.insert(complete_key.as_bytes(), &(i+i2).to_le_bytes())?;
batch.insert(complete_key.as_bytes(), &(i+i2).to_le_bytes());
}
};

Expand All @@ -692,13 +692,13 @@ impl LocalDrain {
let timestamp = previous_minute.unix_timestamp();

let complete_key = format!("{}\t{}", key, timestamp);
match tree.get(complete_key.as_bytes())? {
match self.tree(is_backend).get(complete_key.as_bytes())? {
None => {
tree.insert(complete_key.as_bytes(), &i.to_le_bytes())?;
batch.insert(complete_key.as_bytes(), &i.to_le_bytes());
},
Some(v) => {
let i2 = i64::from_le_bytes((*v).try_into().unwrap());
tree.insert(complete_key.as_bytes(), &(i+i2).to_le_bytes())?;
batch.insert(complete_key.as_bytes(), &(i+i2).to_le_bytes());
}
};

Expand All @@ -708,18 +708,20 @@ impl LocalDrain {
let timestamp = previous_hour.unix_timestamp();

let complete_key = format!("{}\t{}", key, timestamp);
match tree.get(complete_key.as_bytes())? {
match self.tree(is_backend).get(complete_key.as_bytes())? {
None => {
tree.insert(complete_key.as_bytes(), &i.to_le_bytes())?;
batch.insert(complete_key.as_bytes(), &i.to_le_bytes());
},
Some(v) => {
let i2 = i64::from_le_bytes((*v).try_into().unwrap());
tree.insert(complete_key.as_bytes(), &(i+i2).to_le_bytes())?;
batch.insert(complete_key.as_bytes(), &(i+i2).to_le_bytes());
}
};
}
}

self.tree(is_backend).apply_batch(batch)?;

Ok(())
}

Expand Down

0 comments on commit f2d07eb

Please sign in to comment.