Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Fix state management in the scheduler #337

Merged
13 commits merged into from
Nov 24, 2020
22 changes: 9 additions & 13 deletions src/agent/onefuzz-supervisor/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,17 @@ impl Agent {

async fn update(&mut self) -> Result<bool> {
let last = self.scheduler.take().ok_or_else(scheduler_error)?;

let next = match last {
Scheduler::Free(s) => self.free(s).await?,
Scheduler::SettingUp(s) => self.setting_up(s).await?,
Scheduler::PendingReboot(s) => self.pending_reboot(s).await?,
Scheduler::Ready(s) => self.ready(s).await?,
Scheduler::Busy(s) => self.busy(s).await?,
Scheduler::Done(s) => self.done(s).await?,
let previous_state = NodeState::from(&last);
chkeita marked this conversation as resolved.
Show resolved Hide resolved
let (next, done) = match last {
Scheduler::Free(s) => (self.free(s).await?, false),
Scheduler::SettingUp(s) => (self.setting_up(s).await?, false),
Scheduler::PendingReboot(s) => (self.pending_reboot(s).await?, false),
Scheduler::Ready(s) => (self.ready(s).await?, false),
Scheduler::Busy(s) => (self.busy(s).await?, false),
Scheduler::Done(s) => (self.done(s).await?, true),
};

self.previous_state = NodeState::from(&next);
let done = matches!(next, Scheduler::Done(..));

self.previous_state = previous_state;
self.scheduler = Some(next);

Ok(done)
}

Expand Down
89 changes: 88 additions & 1 deletion src/agent/onefuzz-supervisor/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crate::setup::double::*;
use crate::work::double::*;
use crate::work::*;
use crate::worker::double::*;
use crate::worker::WorkerEvent;
use onefuzz::process::ExitStatus;

use super::*;

Expand All @@ -36,6 +38,13 @@ impl Fixture {
)
}

pub fn fail_setup_agent(&self, error_message: &str) -> Agent {
Agent {
setup_runner: Box::new(FailSetupRunnerDouble::new(String::from(error_message))),
..self.agent()
}
}

pub fn job_id(&self) -> Uuid {
"83267e88-efdd-4b1d-92c0-6b80d01887f8".parse().unwrap()
}
Expand Down Expand Up @@ -64,7 +73,7 @@ impl Fixture {
WorkSet {
reboot: false,
setup_url: self.setup_url(),
script: false,
script: true,
chkeita marked this conversation as resolved.
Show resolved Hide resolved
work_units: vec![self.work_unit()],
}
}
Expand Down Expand Up @@ -119,3 +128,81 @@ async fn test_update_free_has_work() {
let double: &WorkQueueDouble = agent.work_queue.downcast_ref().unwrap();
assert_eq!(double.claimed, &[Fixture.receipt()]);
}

#[tokio::test]
async fn test_emitted_state() {
let mut agent = Fixture.agent();
agent
.work_queue
.downcast_mut::<WorkQueueDouble>()
.unwrap()
.available
.push(Fixture.message());

for _i in 0..10 {
if agent.update().await.unwrap() {
break;
}
}

let expected_events: Vec<NodeEvent> = vec![
NodeEvent::StateUpdate(StateUpdateEvent::Free),
NodeEvent::StateUpdate(StateUpdateEvent::SettingUp {
tasks: vec![Fixture.task_id()],
}),
NodeEvent::StateUpdate(StateUpdateEvent::Ready),
NodeEvent::StateUpdate(StateUpdateEvent::Busy),
NodeEvent::WorkerEvent(WorkerEvent::Running {
task_id: Fixture.task_id(),
}),
NodeEvent::WorkerEvent(WorkerEvent::Done {
task_id: Fixture.task_id(),
exit_status: ExitStatus {
code: None,
signal: None,
success: true,
},
stderr: String::default(),
stdout: String::default(),
}),
NodeEvent::StateUpdate(StateUpdateEvent::Done {
error: None,
script_output: None,
}),
];
let coordinator: &CoordinatorDouble = agent.coordinator.downcast_ref().unwrap();
let events = &coordinator.events;
assert_eq!(events, &expected_events);
}

#[tokio::test]
async fn test_emitted_state_failed_setup() {
let error_message = "Failed setup";
let mut agent = Fixture.fail_setup_agent(error_message);
agent
.work_queue
.downcast_mut::<WorkQueueDouble>()
.unwrap()
.available
.push(Fixture.message());

for _i in 0..10 {
if agent.update().await.unwrap() {
break;
}
}

let expected_events: Vec<NodeEvent> = vec![
NodeEvent::StateUpdate(StateUpdateEvent::Free),
NodeEvent::StateUpdate(StateUpdateEvent::SettingUp {
tasks: vec![Fixture.task_id()],
}),
NodeEvent::StateUpdate(StateUpdateEvent::Done {
error: Some(String::from(error_message)),
script_output: None,
}),
];
let coordinator: &CoordinatorDouble = agent.coordinator.downcast_ref().unwrap();
let events = &coordinator.events;
assert_eq!(events, &expected_events);
}
18 changes: 18 additions & 0 deletions src/agent/onefuzz-supervisor/src/setup/double.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,21 @@ impl ISetupRunner for SetupRunnerDouble {
Ok(self.script.clone())
}
}

#[derive(Clone, Debug, Default)]
pub struct FailSetupRunnerDouble {
error_message: String,
}

impl FailSetupRunnerDouble {
pub fn new(error_message: String) -> Self {
Self { error_message }
}
}

#[async_trait]
impl ISetupRunner for FailSetupRunnerDouble {
async fn run(&mut self, _work_set: &WorkSet) -> Result<SetupOutput> {
anyhow::bail!(self.error_message.clone());
}
}
18 changes: 17 additions & 1 deletion src/agent/onefuzz-supervisor/src/worker/double.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl IWorkerRunner for WorkerRunnerDouble {
}
}

#[derive(Clone, Debug, Default, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ChildDouble {
pub id: u64,
pub exit_status: Option<ExitStatus>,
Expand All @@ -24,6 +24,22 @@ pub struct ChildDouble {
pub killed: bool,
}

impl Default for ChildDouble {
fn default() -> Self {
Self {
id: 0,
exit_status: Some(ExitStatus {
code: None,
signal: None,
success: true,
}),
stderr: String::default(),
stdout: String::default(),
killed: false,
}
}
}

impl IWorkerChild for ChildDouble {
fn try_wait(&mut self) -> Result<Option<Output>> {
let output = if let Some(exit_status) = self.exit_status {
Expand Down
18 changes: 17 additions & 1 deletion src/agent/onefuzz-supervisor/src/worker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl IWorkerRunner for RunnerDouble {
}
}

#[derive(Clone, Debug, Default, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ChildDouble {
id: u64,
exit_status: Option<ExitStatus>,
Expand All @@ -67,6 +67,22 @@ pub struct ChildDouble {
killed: bool,
}

impl Default for ChildDouble {
fn default() -> Self {
chkeita marked this conversation as resolved.
Show resolved Hide resolved
Self {
id: 0,
exit_status: Some(ExitStatus {
chkeita marked this conversation as resolved.
Show resolved Hide resolved
code: None,
signal: None,
success: true,
}),
stderr: String::default(),
stdout: String::default(),
killed: false,
}
}
}

impl IWorkerChild for ChildDouble {
fn try_wait(&mut self) -> Result<Option<Output>> {
let output = if let Some(exit_status) = self.exit_status {
Expand Down