Permalink
Browse files

Correct Overload Issues (#300)

* Investigate cernan freeze-up issue

Latest cernan has an issue with freezing up and allocating memory
while it's frozen. I dunno where or why but it seems to freeze the
whole topology when it happens. That is... weird.

This behaviour can be reproduced with the following config:

```
data-directory = "data/"
scripts-directory = "examples/scripts/"

flush-interval = 5

[tags]
source = "cernan"

[sources]
  [sources.internal]
  # forwards = ["filters.delay.two_seconds"]
  forwards = ["sinks.console"]

  [sources.graphite.primary]
  port = 2004
  forwards = ["filters.delay.two_seconds"]

[filters]
  [filters.delay.two_seconds]
  tolerance = 2
  forwards = ["sinks.console"]

[sinks]
  [sinks.console]
```

and the emission script at https://gist.github.com/blt/045160a243741d2390a03762900d4329

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Use atomic usizes in place of report_telemetry

The report_telemetry function created a new Telemetry for every
point recorded. This was very expensive. We now require that
modules which want to do static named telemetry expose atomic
usizes which internal.rs will pull and emit.

This significantly reduces cernan's CPU and memory load in high
self-telemetry situations.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Add further self-telemetry

This commit adds self-telemetry to the delay_filter and more to
the graphite source. cernan.graphite.packet and cernan.graphite.telemetry.received
should be roughly the same when the payloads have a single line
in them but this is not the case.

Worth investigation.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Adjust constraints on telem read/write

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* Simplify Buckets iteration, correct missing metrics

This commit corrects the issue with cernan that started off the whole
chain of investigation. I'm not 100% sure what the issue was, other
than that the indexing present in the old implementation of iteration
for buckets::Iter was too complicated and gave wrong results. While
simplifying the implementation to understand it better I also fixed
what ailed cernan.

Welp.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>

* peg hopper to 0.3.1

Signed-off-by: Tom Santero <tom.santero@postmates.com>

* Repair basic.toml

This configuration file had some pretty drastic changes checked
in to it. The majority of these changes were not intended.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information...
blt authored and tsantero committed Aug 31, 2017
1 parent 9831b87 commit b1b1a5a78656b6187e01ec6f45bdec875a037974
View

Large diffs are not rendered by default.

Oops, something went wrong.
View
@@ -17,11 +17,12 @@ doc = false
byteorder = "1.0"
chrono = "0.4"
clap = "2.20.5"
coco = "0.2.0"
fern = "0.4"
elastic = "0.13"
elastic_types = "0.19"
glob = "0.2.11"
hopper = "0.3"
hopper = "0.3.1"
hyper = "0.10"
hyper-native-tls = "0.2"
lazy_static = "0.2.1"
@@ -13,17 +13,17 @@ source = "cernan"
[sources.statsd.primary]
enabled = true
port = 8125
forwards = ["sinks.console", "sinks.null", "sinks.influxdb", "sinks.prometheus"]
forwards = ["sinks.console", "filters.delay.two_seconds"]
[sources.native.primary]
ip = "127.0.0.1"
port = 1972
forwards = ["sinks.console", "sinks.null", "sinks.influxdb", "sinks.prometheus"]
forwards = ["filters.delay.two_seconds"]
[sources.graphite.primary]
enabled = true
port = 2004
forwards = ["filters.collectd_scrub"]
forwards = ["filters.programmable.collectd_scrub"]
[sources.files]
[sources.files.example_log]
@@ -35,8 +35,16 @@ source = "cernan"
forwards = ["sinks.firehose.stream_two"]
[filters]
[filters.collectd_scrub]
[filters.programmable.collectd_scrub]
script = "collectd_scrub.lua"
forwards = ["filters.delay.two_seconds"]
[filters.delay.two_seconds]
tolerance = 2
forwards = ["filters.flush_boundary.two_seconds"]
[filters.flush_boundary.two_seconds]
tolerance = 2
forwards = ["sinks.console", "sinks.null", "sinks.influxdb", "sinks.prometheus"]
[sinks]
@@ -64,4 +72,3 @@ source = "cernan"
delivery_stream = "stream_two"
batch_size = 800
region = "us-east-1"
View
@@ -43,29 +43,21 @@ impl Default for Buckets {
pub struct Iter<'a> {
buckets: &'a Buckets,
key_index: usize,
value_index: Option<usize>,
value_index: usize,
}
impl<'a> Iterator for Iter<'a> {
type Item = &'a Telemetry;
fn next(&mut self) -> Option<&'a Telemetry> {
while self.key_index < self.buckets.keys.len() {
if let Some(value_index) = self.value_index {
if value_index < self.buckets.values[self.key_index].len() {
let v = &self.buckets.values[self.key_index][value_index];
self.value_index = Some(value_index + 1);
return Some(v);
} else {
self.value_index = None;
self.key_index += 1;
}
} else if !self.buckets.values[self.key_index].is_empty() {
let v = &self.buckets.values[self.key_index][0];
self.value_index = Some(1);
if self.value_index < self.buckets.values[self.key_index].len() {
let v = &self.buckets.values[self.key_index][self.value_index];
self.value_index += 1;
return Some(v);
} else {
return None;
self.value_index = 0;
self.key_index += 1;
}
}
None
@@ -212,7 +204,7 @@ impl Buckets {
Iter {
buckets: self,
key_index: 0,
value_index: None,
value_index: 0,
}
}
}
@@ -829,6 +821,40 @@ mod test {
QuickCheck::new().quickcheck(inner as fn(Vec<Telemetry>) -> TestResult);
}
#[test]
fn same_names_in_and_out() {
fn inner(ms: Vec<Telemetry>, loops: usize) -> TestResult {
if loops == 0 {
return TestResult::discard()
}
let mut bucket = Buckets::new(1);
let mut expected_names: HashSet<String> = HashSet::new();
for m in ms.clone() {
expected_names.insert(m.name);
}
if expected_names.len() == 1 {
return TestResult::discard()
}
for _ in 0..loops {
for m in ms.clone() {
bucket.add(m);
}
let mut names: HashSet<String> = HashSet::new();
for telem in bucket.iter() {
names.insert(telem.name.clone());
}
bucket.reset();
assert_eq!(expected_names, names);
}
TestResult::passed()
}
QuickCheck::new().quickcheck(inner as fn(Vec<Telemetry>, usize) -> TestResult);
}
#[test]
fn test_reset_clears_space() {
fn qos_ret(ms: Vec<Telemetry>) -> TestResult {
View
@@ -1,7 +1,27 @@
//! Filter streams to within a bounded interval of current time.
//!
//! This filter is intended to remove items from the stream which are too old,
//! as defined by the current time and the configured `tolerance`. That is, if
//! for some time `T`, `(T - time::now()).abs() > tolerance` the item associated
//! with `T` will be rejected.
use filter;
use metric;
use source::report_telemetry;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use time;
use util;
lazy_static! {
/// Total number of telemetry rejected for age
pub static ref DELAY_TELEM_REJECT: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of telemetry accepted for age
pub static ref DELAY_TELEM_ACCEPT: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of logline rejected for age
pub static ref DELAY_LOG_REJECT: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
/// Total number of logline accepted for age
pub static ref DELAY_LOG_ACCEPT: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
}
/// Filter streams to within a bounded interval of current time.
///
@@ -10,7 +30,6 @@ use time;
/// for some time `T`, `(T - time::now()).abs() > tolerance` the item associated
/// with `T` will be rejected.
pub struct DelayFilter {
config_path: String,
tolerance: i64,
}
@@ -29,39 +48,41 @@ impl DelayFilter {
/// Create a new DelayFilter
pub fn new(config: DelayFilterConfig) -> DelayFilter {
DelayFilter {
config_path: config
.config_path
.expect("must supply config_path for delay filter"),
tolerance: config.tolerance,
}
}
}
impl filter::Filter for DelayFilter {
fn valve_state(&self) -> util::Valve {
util::Valve::Open
}
fn process(
&mut self,
event: metric::Event,
res: &mut Vec<metric::Event>,
) -> Result<(), filter::FilterError> {
match event {
metric::Event::Telemetry(m) => {
report_telemetry(format!("{}.telemetry", self.config_path), 1.0);
if let Some(ref telem) = *m {
let telem = telem.clone();
if (telem.timestamp - time::now()).abs() < self.tolerance {
res.push(metric::Event::new_telemetry(telem));
}
metric::Event::Telemetry(m) => if let Some(ref telem) = *m {
let telem = telem.clone();
if (telem.timestamp - time::now()).abs() < self.tolerance {
DELAY_TELEM_ACCEPT.fetch_add(1, Ordering::Relaxed);
res.push(metric::Event::new_telemetry(telem));
} else {
DELAY_TELEM_REJECT.fetch_add(1, Ordering::Relaxed);
}
}
},
metric::Event::Log(l) => if let Some(ref log) = *l {
report_telemetry(format!("{}.log", self.config_path), 1.0);
let log = log.clone();
if (log.time - time::now()).abs() < self.tolerance {
DELAY_LOG_ACCEPT.fetch_add(1, Ordering::Relaxed);
res.push(metric::Event::new_log(log));
} else {
DELAY_LOG_REJECT.fetch_add(1, Ordering::Relaxed);
}
},
metric::Event::TimerFlush(f) => {
report_telemetry(format!("{}.flush", self.config_path), 1.0);
res.push(metric::Event::TimerFlush(f));
}
}
@@ -1,6 +1,7 @@
use filter;
use metric;
use std::mem;
use util;
/// Buffer events for a set period of flushes
///
@@ -51,9 +52,22 @@ impl FlushBoundaryFilter {
holds: Vec::new(),
}
}
/// Count the number of stored events in the filter
pub fn count(&self) -> usize {
self.holds.iter().fold(0, |acc, hld| acc + hld.events.len())
}
}
impl filter::Filter for FlushBoundaryFilter {
fn valve_state(&self) -> util::Valve {
if self.count() > 10_000 {
util::Valve::Closed
} else {
util::Valve::Open
}
}
fn process(
&mut self,
event: metric::Event,
View
@@ -11,7 +11,7 @@ use time;
use util;
mod programmable_filter;
mod delay_filter;
pub mod delay_filter;
mod flush_boundary_filter;
pub use self::delay_filter::{DelayFilter, DelayFilterConfig};
@@ -54,6 +54,10 @@ pub trait Filter {
res: &mut Vec<metric::Event>,
) -> Result<(), FilterError>;
/// Lookup the `Filter` valve state. See `Valve` documentation for more
/// information.
fn valve_state(&self) -> util::Valve;
/// Run the Filter
///
/// It is not expected that most Filters will re-implement this. If this is
@@ -73,22 +77,29 @@ pub trait Filter {
None => attempts += 1,
Some(event) => {
attempts = 0;
match self.process(event, &mut events) {
Ok(()) => {
match self.valve_state() {
util::Valve::Open => match self.process(event, &mut events) {
Ok(()) => {
for ev in events.drain(..) {
util::send(&mut chans, ev)
}
}
Err(fe) => {
error!(
"Failed to run filter with error: {:?}",
name_in_fe(&fe)
);
let event = event_in_fe(fe);
util::send(&mut chans, event);
Err(fe) => {
error!(
"Failed to run filter with error: {:?}",
name_in_fe(&fe)
);
let event = event_in_fe(fe);
util::send(&mut chans, event);
}
},
util::Valve::Closed => {
attempts += 1;
continue;
}
}
}
}
}
}
@@ -1,12 +1,12 @@
use filter;
use libc::c_int;
use lua;
use lua::{Function, State, ThreadStatus};
use lua::ffi::lua_State;
use metric;
use std::path::PathBuf;
use std::sync;
use util;
struct Payload<'a> {
metrics: Vec<Box<metric::Telemetry>>,
@@ -508,6 +508,10 @@ impl ProgrammableFilter {
}
impl filter::Filter for ProgrammableFilter {
fn valve_state(&self) -> util::Valve {
util::Valve::Open
}
fn process(
&mut self,
event: metric::Event,
View
@@ -31,6 +31,7 @@ extern crate rusoto_core;
extern crate rusoto_firehose;
extern crate seahash;
extern crate serde;
extern crate coco;
#[macro_use]
extern crate serde_json;
extern crate toml;
View
@@ -41,7 +41,9 @@ where
if self.inner.len() != other.inner.len() {
false
} else {
for (&(ref k, ref v), &(ref o_k, ref o_v)) in self.inner.iter().zip(other.inner.iter()) {
for (&(ref k, ref v), &(ref o_k, ref o_v)) in
self.inner.iter().zip(other.inner.iter())
{
if (k != o_k) || (v != o_v) {
return false;
}
Oops, something went wrong.

0 comments on commit b1b1a5a

Please sign in to comment.