Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scheds/rust/scx_rustland/src/bpf/intf.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ typedef long long s64;
*/
struct queued_task_ctx {
s32 pid;
s32 cpu; /* CPU where the task is running */
s32 cpu; /* CPU where the task is running (-1 = exiting) */
u64 sum_exec_runtime; /* Total cpu time */
u64 weight; /* Task static priority */
};
Expand Down
44 changes: 40 additions & 4 deletions scheds/rust/scx_rustland/src/bpf/main.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,17 @@ static bool is_task_cpu_available(struct task_struct *p, u64 enq_flags)
* Fill @task with all the information that need to be sent to the user-space
* scheduler.
*/
static void
get_task_info(struct queued_task_ctx *task, const struct task_struct *p)
static void get_task_info(struct queued_task_ctx *task,
const struct task_struct *p, bool exiting)
{
task->pid = p->pid;
task->sum_exec_runtime = p->se.sum_exec_runtime;
task->weight = p->scx.weight;
task->cpu = scx_bpf_task_cpu(p);
/*
* Use a negative CPU number to notify that the task is exiting, so
* that we can free up its resources in the user-space scheduler.
*/
task->cpu = exiting ? -1 : scx_bpf_task_cpu(p);
}

/*
Expand Down Expand Up @@ -389,7 +393,7 @@ void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags)
* will be dispatched directly from the kernel (re-using their
* previously used CPU in this case).
*/
get_task_info(&task, p);
get_task_info(&task, p, false);
dbg_msg("enqueue: pid=%d", task.pid);
if (bpf_map_push_elem(&queued, &task, 0)) {
dbg_msg("scheduler congested: pid=%d", task.pid);
Expand Down Expand Up @@ -556,6 +560,37 @@ s32 BPF_STRUCT_OPS(rustland_prep_enable, struct task_struct *p,
return -ENOMEM;
}

/*
* Task @p is freed.
*
* Notify the user-space scheduler that we can free up all the allocated
* resources for this task.
*/
void BPF_STRUCT_OPS(rustland_disable, struct task_struct *p)
{
struct queued_task_ctx task;

dbg_msg("exiting: pid=%d", task.pid);
get_task_info(&task, p, true);
if (bpf_map_push_elem(&queued, &task, 0)) {
/*
* We may have a memory leak in the scheduler at this point,
* because we failed to notify it about this exiting task and
* some resources may remain allocated.
*
* Do not worrry too much about this condition for now, since
* it should be pretty rare (and it happens only when the
* scheduler is already congested, so it is probably a good
* thing to avoid introducing extra overhead to free up
* resources).
*/
dbg_msg("scheduler congested: pid=%d", task.pid);
__sync_fetch_and_add(&nr_sched_congested, 1);
return;
}
__sync_fetch_and_add(&nr_queued, 1);
}

/*
* Heartbeat scheduler timer callback.
*/
Expand Down Expand Up @@ -633,6 +668,7 @@ struct sched_ext_ops rustland = {
.stopping = (void *)rustland_stopping,
.update_idle = (void *)rustland_update_idle,
.prep_enable = (void *)rustland_prep_enable,
.disable = (void *)rustland_disable,
.init = (void *)rustland_init,
.exit = (void *)rustland_exit,
.flags = SCX_OPS_ENQ_LAST,
Expand Down
24 changes: 7 additions & 17 deletions scheds/rust/scx_rustland/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use std::fs::metadata;
use std::fs::File;
use std::io::{self, Read};
use std::path::Path;
Expand Down Expand Up @@ -158,20 +157,6 @@ impl TaskInfoMap {
tasks: HashMap::new(),
}
}

// Clean up old entries (pids that don't exist anymore).
fn gc(&mut self) {
fn is_pid_running(pid: i32) -> bool {
let path = format!("/proc/{}", pid);
metadata(path).is_ok()
}
let pids: Vec<i32> = self.tasks.keys().cloned().collect();
for pid in pids {
if !is_pid_running(pid) {
self.tasks.remove(&pid);
}
}
}
}

// Basic task item stored in the task pool.
Expand Down Expand Up @@ -363,6 +348,13 @@ impl<'a> Scheduler<'a> {
// Extract the task object from the message.
let task = EnqueuedMessage::from_bytes(msg.as_slice()).as_queued_task_ctx();

// Check for exiting tasks (cpu < 0) and remove their corresponding entries in
// the task map (if present).
if task.cpu < 0 {
self.task_map.tasks.remove(&task.pid);
continue;
}

// Get task information if the task is already stored in the task map,
// otherwise create a new entry for it.
let task_info =
Expand Down Expand Up @@ -560,8 +552,6 @@ impl<'a> Scheduler<'a> {

// Print scheduler statistics every second.
if elapsed > Duration::from_secs(1) {
// Free up unused scheduler resources.
self.task_map.gc();
// Print scheduler statistics.
self.print_stats();

Expand Down