Skip to content
Permalink
Browse files

Auto merge of #487 - Zeegomo:lazy-worker-test, r=pietroalbini

Lazy Workers

This PR sets up metrics to detect whether an agent is supposed to work or not so that we can set up some alert for lazy workers
  • Loading branch information
bors committed Nov 18, 2019
2 parents 0904dda + 09fd063 commit e53154fbfc3df91a39652b03901c36380ad2d79d
Showing with 167 additions and 30 deletions.
  1. +38 −16 src/experiments.rs
  2. +112 −10 src/server/metrics.rs
  3. +1 −1 src/server/mod.rs
  4. +16 −3 src/server/routes/metrics.rs
@@ -313,10 +313,10 @@ impl Experiment {
}
}

pub fn next(db: &Database, assignee: &Assignee) -> Fallible<Option<(bool, Experiment)>> {
pub fn find_next(db: &Database, assignee: &Assignee) -> Fallible<Option<Experiment>> {
// Avoid assigning two experiments to the same agent
if let Some(experiment) = Experiment::run_by(db, assignee)? {
return Ok(Some((false, experiment)));
return Ok(Some(experiment));
}

// Get an experiment whose requirements are met by this agent, preferring (in order of
@@ -340,13 +340,43 @@ impl Experiment {
})
}

pub fn next(db: &Database, assignee: &Assignee) -> Fallible<Option<(bool, Experiment)>> {
Self::find_next(db, assignee).and_then(|ex| Self::assign_experiment(db, ex))
}
pub fn has_next(db: &Database, assignee: &Assignee) -> Fallible<bool> {
Ok(Self::find_next(db, assignee)?.is_some())
}

fn assign_experiment(
db: &Database,
ex: Option<Experiment>,
) -> Fallible<Option<(bool, Experiment)>> {
if let Some(mut experiment) = ex {
let new_ex = experiment.status != Status::Running;
if new_ex {
experiment.set_status(&db, Status::Running)?;
// If this experiment was not assigned to a specific agent make it distributed
experiment.set_assigned_to(
&db,
experiment
.assigned_to
.clone()
.or(Some(Assignee::Distributed))
.as_ref(),
)?;
}
return Ok(Some((new_ex, experiment)));
}
Ok(None)
}

//CLI query is only partially implemented and is therefore preceded by "unimplemented!"
#[allow(unreachable_code)]
fn next_inner(
db: &Database,
assignee: Option<&Assignee>,
agent: &Assignee,
) -> Fallible<Option<(bool, Experiment)>> {
) -> Fallible<Option<Experiment>> {
let agent_name = if let Assignee::Agent(agent_name) = agent {
agent_name.to_string()
} else {
@@ -431,21 +461,13 @@ impl Experiment {
(AGENT_UNASSIGNED_QUERY, vec![agent_name])
};

let next = db.get_row(query, params.as_slice(), |r| {
if let Some(record) = db.get_row(query, params.as_slice(), |r| {
ExperimentDBRecord::from_row(r)
})?;

if let Some(record) = next {
let mut experiment = record.into_experiment()?;
let new_ex = experiment.status != Status::Running;
if new_ex {
experiment.set_status(&db, Status::Running)?;
// If this experiment was not assigned to a specific agent make it distributed
experiment.set_assigned_to(&db, assignee.or(Some(&Assignee::Distributed)))?;
}
return Ok(Some((new_ex, experiment)));
})? {
Ok(Some(record.into_experiment()?))
} else {
Ok(None)
}
Ok(None)
}

pub fn get(db: &Database, name: &str) -> Fallible<Option<Experiment>> {
@@ -1,20 +1,29 @@
use crate::db::Database;
use crate::experiments::{Assignee, Experiment};
use crate::prelude::*;
use crate::server::agents::Agent;
use prometheus::proto::{Metric, MetricFamily};
use prometheus::{IntCounterVec, __register_counter_vec};
use prometheus::{IntCounterVec, IntGaugeVec, __register_counter_vec, __register_gauge_vec};
const JOBS_METRIC: &str = "crater_completed_jobs_total";
const AGENT_WORK_METRIC: &str = "crater_agent_supposed_to_work";

#[derive(Clone)]
pub struct Metrics {
crater_completed_jobs_total: IntCounterVec,
crater_work_status: IntGaugeVec,
}

impl Metrics {
pub fn new() -> Fallible<Self> {
let opts = prometheus::opts!(JOBS_METRIC, "total completed jobs");
let jobs_opts = prometheus::opts!(JOBS_METRIC, "total completed jobs");
let crater_completed_jobs_total =
prometheus::register_int_counter_vec!(opts, &["agent", "experiment"])?;
prometheus::register_int_counter_vec!(jobs_opts, &["agent", "experiment"])?;
let agent_opts = prometheus::opts!(AGENT_WORK_METRIC, "is agent supposed to work");
let crater_work_status = prometheus::register_int_gauge_vec!(agent_opts, &["agent"])?;

Ok(Metrics {
crater_completed_jobs_total,
crater_work_status,
})
}

@@ -55,16 +64,42 @@ impl Metrics {
Ok(())
}

pub fn update_agent_status(&self, db: &Database, agents: &[&Agent]) -> Fallible<()> {
self.crater_work_status.reset();

for agent in agents {
let assignee = Assignee::Agent(agent.name().to_string());
let has_work = Experiment::has_next(db, &assignee)?;

self.crater_work_status
.with_label_values(&[agent.name()])
.set(has_work as i64);
}

Ok(())
}

pub fn on_complete_experiment(&self, experiment: &str) -> Fallible<()> {
self.remove_experiment_jobs(experiment)
}
}

#[cfg(test)]
mod tests {
use super::{Metrics, JOBS_METRIC};
use super::{Metrics, AGENT_WORK_METRIC, JOBS_METRIC};
use crate::actions::{Action, ActionsCtx, CreateExperiment, EditExperiment};
use crate::config::Config;
use crate::db::Database;
use crate::experiments::{Assignee, Experiment};
use crate::server::agents::{Agent, Agents};
use crate::server::tokens::Tokens;
use lazy_static::lazy_static;
use prometheus::proto::MetricFamily;

lazy_static! {
static ref METRICS: Metrics = Metrics::new().unwrap();
}

fn test_experiment_presence(metric: &MetricFamily, experiment: &str) -> bool {
metric
.get_metric()
@@ -79,25 +114,92 @@ mod tests {
let agent1 = "agent-1";
let agent2 = "agent-2";

let metrics = Metrics::new().unwrap();
metrics.record_completed_jobs(agent1, ex1, 1);
metrics.record_completed_jobs(agent2, ex1, 1);
metrics.record_completed_jobs(agent2, ex2, 1);
METRICS.record_completed_jobs(agent1, ex1, 1);
METRICS.record_completed_jobs(agent2, ex1, 1);
METRICS.record_completed_jobs(agent2, ex2, 1);

//test metrics are correctly registered
let jobs = Metrics::get_metric_by_name(JOBS_METRIC).unwrap();
assert!(test_experiment_presence(&jobs, ex1));
assert!(test_experiment_presence(&jobs, ex2));

//test metrics are correctly removed when an experiment is completed
metrics.on_complete_experiment(ex1).unwrap();
METRICS.on_complete_experiment(ex1).unwrap();

let jobs = Metrics::get_metric_by_name(JOBS_METRIC).unwrap();
assert!(!test_experiment_presence(&jobs, ex1));
assert!(test_experiment_presence(&jobs, ex2));

//test nothing bad happens when a specific
//experiment is executed by a subset of the agents
metrics.on_complete_experiment(ex2).unwrap();
METRICS.on_complete_experiment(ex2).unwrap();
}

fn supposed_to_work(metric: &MetricFamily, agent_filter: Option<&str>) -> bool {
metric
.get_metric()
.iter()
.filter(|met| {
agent_filter.map_or_else(
|| true,
|agent| Metrics::get_label_by_name(met, "agent").unwrap() == agent,
)
})
.all(|met| met.get_gauge().get_value() as u64 == 1)
}

#[test]
fn test_lazy_agents() {
let agent1 = "agent-1";
let agent2 = "agent-2";

let db = Database::temp().unwrap();

let mut tokens = Tokens::default();
tokens.agents.insert("token1".into(), agent1.into());
tokens.agents.insert("token2".into(), agent2.into());
let agents = Agents::new(db.clone(), &tokens).unwrap();

for agent in agents.all().unwrap().iter() {
agents.record_heartbeat(agent.name()).unwrap();
}

let agent_list = agents.all().unwrap();
let agent_list_ref = agent_list.iter().collect::<Vec<&Agent>>();

METRICS.update_agent_status(&db, &agent_list_ref).unwrap();

// Nothing to do
let status = Metrics::get_metric_by_name(AGENT_WORK_METRIC).unwrap();
assert!(!supposed_to_work(&status, None));

let config = Config::default();
let ctx = ActionsCtx::new(&db, &config);
let assignee = Assignee::Agent(agent1.to_string());
crate::crates::lists::setup_test_lists(&db, &config).unwrap();
CreateExperiment::dummy("dummy").apply(&ctx).unwrap();

METRICS.update_agent_status(&db, &agent_list_ref).unwrap();

// Experiment is queued, all the agents should have work to do
let status = Metrics::get_metric_by_name(AGENT_WORK_METRIC).unwrap();
assert!(supposed_to_work(&status, None));

// Assign experiment to agent-1 so that get_uncompleted_crates returns all the crates
EditExperiment {
assign: Some(assignee.clone()),
..EditExperiment::dummy("dummy")
}
.apply(&ctx)
.unwrap();
let ex = Experiment::next(&db, &assignee).unwrap().unwrap().1;
ex.get_uncompleted_crates(&db, &config, &assignee).unwrap();
METRICS.update_agent_status(&db, &agent_list_ref).unwrap();

// There are no experiments in the queue but agent1 is still executing the
// last chunk of the previous experiment
let status = Metrics::get_metric_by_name(AGENT_WORK_METRIC).unwrap();
assert!(supposed_to_work(&status, Some(agent1)));
assert!(!supposed_to_work(&status, Some(agent2)));
}
}
@@ -85,7 +85,7 @@ pub fn run(config: Config) -> Fallible<()> {
.and(warp::path("webhooks").and(routes::webhooks::routes(data.clone())))
.or(warp::path("agent-api").and(routes::agent::routes(data.clone(), mutex.clone())))
.unify()
.or(warp::path("metrics").and(routes::metrics::routes()))
.or(warp::path("metrics").and(routes::metrics::routes(data.clone())))
.unify()
.or(routes::ui::routes(data.clone()))
.unify(),
@@ -1,13 +1,22 @@
use crate::prelude::*;
use crate::server::agents::Agent;
use crate::server::Data;
use http::{Response, StatusCode};
use hyper::Body;
use prometheus::{Encoder, TextEncoder};
use std::sync::Arc;
use warp::{self, Filter, Rejection};

pub fn routes() -> impl Filter<Extract = (Response<Body>,), Error = Rejection> + Clone {
pub fn routes(
data: Arc<Data>,
) -> impl Filter<Extract = (Response<Body>,), Error = Rejection> + Clone {
let data_cloned = data.clone();
let data_filter = warp::any().map(move || data_cloned.clone());

warp::get2()
.and(warp::path::end())
.map(|| match endpoint_metrics() {
.and(data_filter.clone())
.map(|data| match endpoint_metrics(data) {
Ok(resp) => resp,
Err(err) => {
error!("error while processing metrics");
@@ -20,7 +29,11 @@ pub fn routes() -> impl Filter<Extract = (Response<Body>,), Error = Rejection> +
})
}

fn endpoint_metrics() -> Fallible<Response<Body>> {
fn endpoint_metrics(data: Arc<Data>) -> Fallible<Response<Body>> {
data.metrics.update_agent_status(
&data.db,
&data.agents.all()?.iter().collect::<Vec<&Agent>>(),
)?;
let mut buffer = Vec::new();
let families = prometheus::gather();
TextEncoder::new().encode(&families, &mut buffer)?;

0 comments on commit e53154f

Please sign in to comment.
You can’t perform that action at this time.