Skip to content

Commit

Permalink
Add new, more robust scheduler test (#1045)
Browse files Browse the repository at this point in the history
* The new `test_scheduler` is significantly more robust than
  the old one. Currently, the test isn't particularly useful because
  we don't have task migration enabled, but #1042 will add
  implicit task migration when unblocking a task.
  * Hence, the test currently focuses on blocking/unblocking tasks.

* Add a function to iterate over all initialized CPUs.

Signed-off-by: Klimenty Tsoutsman <klim@tsoutsman.com>
  • Loading branch information
tsoutsman committed Oct 5, 2023
1 parent 1be62d4 commit e9416d6
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 72 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 14 additions & 15 deletions applications/test_scheduler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
[package]
name = "test_scheduler"
version = "0.1.0"
authors = ["Namitha Liyanage <namithaliyanage@gmail.com>"]
authors = ["Klim Tsoutsman <klim@tsoutsman.com>"]
description = "An application to test the scheduler"
edition = "2021"

[dependencies]
app_io = { path = "../../kernel/app_io" }
cpu = { path = "../../kernel/cpu" }
log = "0.4.8"
random = { path = "../../kernel/random" }
spawn = { path = "../../kernel/spawn" }
sync_block = { path = "../../kernel/sync_block" }
task = { path = "../../kernel/task" }

[dependencies.log]
version = "0.4.8"

[dependencies.spawn]
path = "../../kernel/spawn"

[dependencies.scheduler]
path = "../../kernel/scheduler"

[dependencies.task]
path = "../../kernel/task"

[dependencies.cpu]
path = "../../kernel/cpu"
[dependencies.rand]
version = "0.8.5"
default-features = false
features = ["small_rng"]
181 changes: 127 additions & 54 deletions applications/test_scheduler/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,151 @@
#![no_std]

#[macro_use] extern crate log;
extern crate alloc;
extern crate spawn;
extern crate scheduler;
extern crate task;
extern crate cpu;

use core::convert::TryFrom;

use alloc::string::String;
use alloc::vec::Vec;
use cpu::CpuId;
use alloc::{format, string::String, vec::Vec};
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

use app_io::println;
use cpu::{cpus, CpuId};
use rand::seq::SliceRandom;
use sync_block::RwLock;
use task::TaskRef;

pub fn main(_args: Vec<String>) -> isize {
let cpu_1 = CpuId::try_from(1).expect("CPU ID 1 did not exist");

let taskref1 = spawn::new_task_builder(worker, ())
.name(String::from("test1"))
.pin_on_cpu(cpu_1)
.spawn().expect("failed to initiate task");
println!("testing pinned");
test_pinned();
println!("testing unpinned");
test_unpinned();
0
}

if !scheduler::set_priority(&taskref1, 30) {
error!("scheduler_eval(): Could not set priority to taskref1");
// Spawn a bunch of pinned tasks, and then each pinned task randomly blocks and
// unblocks other tasks than are pinned to the same CPU.
//
// The tasks must be pinned to the same CPU to avoid a deadlock where two tasks
// on different CPUs block each other at the same time and then yield.
pub fn test_pinned() {
static TASKS: RwLock<Vec<(CpuId, Vec<TaskRef>)>> = RwLock::new(Vec::new());
static READY: AtomicBool = AtomicBool::new(false);

let tasks = cpus()
.map(|cpu| {
(
#[allow(clippy::clone_on_copy)]
cpu.clone(),
(0..100)
.map(move |id| {
spawn::new_task_builder(pinned_worker, cpu)
.name(format!("test-scheduler-pinned-{cpu}-{id}"))
.pin_on_cpu(cpu)
.block()
.spawn()
.expect("failed to spawn task")
})
.collect::<Vec<_>>(),
)
})
.collect::<Vec<_>>();

*TASKS.write() = tasks
.iter()
.map(|(cpu, task_iter)| (*cpu, task_iter.iter().map(|task| (*task).clone()).collect()))
.collect();

for (_, task_list) in tasks.iter() {
for task in task_list {
task.unblock().unwrap();
}
}

debug!("Spawned Task 1");

let taskref2 = spawn::new_task_builder(worker, ())
.name(String::from("test2"))
.pin_on_cpu(cpu_1)
.spawn().expect("failed to initiate task");
READY.store(true, Ordering::Release);

if !scheduler::set_priority(&taskref2, 20) {
error!("scheduler_eval(): Could not set priority to taskref2");
for (_, task_list) in tasks {
for task in task_list {
task.join().unwrap();
}
}

debug!("Spawned Task 2");

let taskref3 = spawn::new_task_builder(worker, ())
.name(String::from("test3"))
.pin_on_cpu(cpu_1)
.spawn().expect("failed to initiate task");

if !scheduler::set_priority(&taskref3, 10) {
error!("scheduler_eval(): Could not set priority to taskref3");
// We have to drop the tasks so that the `test-scheduler` crate can be dropped.
*TASKS.write() = Vec::new();

fn pinned_worker(pinned_cpu: CpuId) {
let mut rng = random::init_rng::<rand::rngs::SmallRng>().unwrap();
while !READY.load(Ordering::Acquire) {
core::hint::spin_loop();
}

let locked = TASKS.read();
let tasks = &locked.iter().find(|(cpu, _)| *cpu == pinned_cpu).unwrap().1;
for _ in 0..100 {
assert_eq!(
cpu::current_cpu(),
pinned_cpu,
"pinned worker migrated cores"
);

let random_task = tasks.choose(&mut rng).unwrap();

let chose_self =
task::with_current_task(|current_task| random_task == current_task).unwrap();
if chose_self {
continue;
}

let _ = random_task.block();
task::schedule();
let _ = random_task.unblock();
}
}
}

debug!("Spawned Task 3");
/// Spawn a bunch of unpinned tasks, and then block and unblock random tasks
/// from the main thread.
pub fn test_unpinned() {
const NUM_TASKS: usize = 500;

static READY: AtomicBool = AtomicBool::new(false);
static NUM_RUNNING: AtomicUsize = AtomicUsize::new(NUM_TASKS);

let tasks = (0..NUM_TASKS)
.map(move |id| {
spawn::new_task_builder(unpinned_worker, ())
.name(format!("test-scheduler-unpinned-{id}"))
.block()
.spawn()
.expect("failed to spawn task")
})
.collect::<Vec<_>>();

for task in tasks.iter() {
task.unblock().unwrap();
}

debug!("Spawned all tasks");
READY.store(true, Ordering::Release);

let _priority1 = scheduler::priority(&taskref1);
let _priority2 = scheduler::priority(&taskref2);
let _priority3 = scheduler::priority(&taskref3);
// Cause some mayhem.
let mut rng = random::init_rng::<rand::rngs::SmallRng>().unwrap();
while NUM_RUNNING.load(Ordering::Relaxed) != 0 {
let random_task = tasks.choose(&mut rng).unwrap();
let _ = random_task.block();
// Let the worker tasks on this core run.
task::schedule();
let _ = random_task.unblock();
}

#[cfg(epoch_scheduler)]
{
assert_eq!(_priority1,Some(30));
assert_eq!(_priority2,Some(20));
assert_eq!(_priority3,Some(10));
for task in tasks {
task.join().unwrap();
}

taskref1.join().expect("Task 1 join failed");
taskref2.join().expect("Task 2 join failed");
taskref3.join().expect("Task 3 join failed");
fn unpinned_worker(_: ()) {
while !READY.load(Ordering::Acquire) {
core::hint::spin_loop();
}

0
}
for _ in 0..1000 {
task::schedule();
}

fn worker(_: ()) {
for i in 0..1000 {
debug!("Task_ID : {} , Instance : {}", task::get_my_current_task_id(), i);
scheduler::schedule();
NUM_RUNNING.fetch_sub(1, Ordering::Relaxed);
}
}
5 changes: 5 additions & 0 deletions kernel/cpu/src/aarch64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ pub fn register_cpu(bootstrap: bool) -> Result<(), &'static str> {
}
}

// Returns an iterator over the available CPUs.
pub fn cpus() -> impl Iterator<Item = CpuId> {
ONLINE_CPUS.read().clone().into_iter()
}

/// Returns the number of CPUs (SMP cores) that exist and
/// are currently initialized on this system.
pub fn cpu_count() -> u32 {
Expand Down
4 changes: 4 additions & 0 deletions kernel/cpu/src/x86_64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl TryFrom<u32> for CpuId {
}
}

// Returns an iterator over the available CPUs.
pub fn cpus() -> impl Iterator<Item = CpuId> {
apic::get_lapics().iter().map(|(apic_id, _)| (*apic_id).into())
}

/// Returns the number of CPUs (SMP cores) that exist and
/// are currently initialized on this system.
Expand Down
4 changes: 2 additions & 2 deletions kernel/task_struct/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ impl Task {
if self.runstate.compare_exchange(Runnable, Blocked).is_ok() {
Ok(Runnable)
} else if self.runstate.compare_exchange(Blocked, Blocked).is_ok() {
warn!("Blocked an already blocked task: {:?}", self);
// warn!("Blocked an already blocked task: {:?}", self);
Ok(Blocked)
} else {
Err(self.runstate.load())
Expand Down Expand Up @@ -510,7 +510,7 @@ impl Task {
if self.runstate.compare_exchange(Blocked, Runnable).is_ok() {
Ok(Blocked)
} else if self.runstate.compare_exchange(Runnable, Runnable).is_ok() {
warn!("Unblocked an already runnable task: {:?}", self);
// warn!("Unblocked an already runnable task: {:?}", self);
Ok(Runnable)
} else {
Err(self.runstate.load())
Expand Down

0 comments on commit e9416d6

Please sign in to comment.