Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR Rollup #92

Merged
merged 16 commits into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ In addition to `config.kdl`, additional process scheduling profiles are stored i
- User-config: `/etc/system76-scheduler/process-scheduler/`
- Distribution: `/usr/share/system76-scheduler/process-scheduler/`

An [example configuration is provided here](./data/pop-os.kdl). It is parsed the same as the assignments and exceptions nodes in the main config, and profiles can inherit values from the previous assignment of the same name.
An [example configuration is provided here](./data/pop_os.kdl). It is parsed the same as the assignments and exceptions nodes in the main config, and profiles can inherit values from the previous assignment of the same name.

### Profile

Expand Down
100 changes: 63 additions & 37 deletions daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ mod pw;
mod service;
mod utils;

use cfs::paths::SchedPaths;
use clap::ArgMatches;
use dbus::{CpuMode, Server};
use std::time::{Duration, Instant};
use std::{
path::Path,
time::{Duration, Instant},
};
use tokio::sync::mpsc::Sender;
use upower_dbus::UPowerProxy;
use zbus::{Connection, PropertyStream};
Expand Down Expand Up @@ -158,7 +160,7 @@ async fn daemon(
return reload(connection).await;
}

let service = &mut service::Service::new(owner, SchedPaths::new()?);
let service = &mut service::Service::new(owner);
service.reload_configuration();

let (tx, mut rx) = tokio::sync::mpsc::channel(4);
Expand Down Expand Up @@ -194,39 +196,14 @@ async fn daemon(

// Use execsnoop-bpfcc to watch for new processes being created.
if service.config.process_scheduler.execsnoop {
tracing::debug!("monitoring process IDs in realtime with execsnoop");
let tx = tx.clone();
let (scheduled_tx, mut scheduled_rx) = tokio::sync::mpsc::unbounded_channel();
std::thread::spawn(move || {
if let Ok(mut watcher) = execsnoop::watch() {
// Listen for spawned process, scheduling them to be handled with a delay of 1 second after creation.
// The delay is to ensure that a process has been added to a cgroup
while let Some(process) = watcher.next() {
let Ok(cmdline) = std::str::from_utf8(process.cmd) else {
continue
};

let name = process::name(cmdline);

let _res = scheduled_tx.send((
Instant::now() + Duration::from_secs(2),
ExecCreate {
pid: process.pid,
parent_pid: process.parent_pid,
name: name.to_owned(),
cmdline: cmdline.to_owned(),
},
));
}
}
});

tokio::task::spawn_local(async move {
while let Some((delay, process)) = scheduled_rx.recv().await {
tokio::time::sleep_until(delay.into()).await;
let _res = tx.send(Event::ExecCreate(process)).await;
}
});
if Path::new(execsnoop::EXECSNOOP_PATH).exists() {
integrate_execsnoop(tx.clone());
} else {
tracing::warn!(
"install {} to monitor processes in realtime",
execsnoop::EXECSNOOP_PATH
);
}
}

// Monitors pipewire-connected processes.
Expand Down Expand Up @@ -266,6 +243,7 @@ async fn daemon(
}) => {
service.assign_new_process(&mut buffer, pid, parent_pid, name, cmdline);
service.assign_children(&mut buffer, pid);
service.garbage_clean(&mut buffer);
}

Event::RefreshProcessMap => {
Expand All @@ -275,10 +253,12 @@ async fn daemon(
Event::SetForegroundProcess(pid) => {
tracing::debug!("setting {pid} as foreground process");
service.set_foreground_process(&mut buffer, pid);
service.garbage_clean(&mut buffer);
}

Event::Pipewire(scheduler_pipewire::ProcessEvent::Add(process)) => {
service.set_pipewire_process(&mut buffer, process);
service.garbage_clean(&mut buffer);
}

Event::Pipewire(scheduler_pipewire::ProcessEvent::Remove(process)) => {
Expand Down Expand Up @@ -363,8 +343,54 @@ fn autogroup_set(enable: bool) {
let _res = std::fs::write(PATH, if enable { b"1" } else { b"0" });
}

/// Listens to exec events from the kernel to get process IDs in realtime.
fn integrate_execsnoop(tx: Sender<Event>) {
tracing::info!("monitoring process IDs in realtime with execsnoop");
let (scheduled_tx, mut scheduled_rx) = tokio::sync::mpsc::unbounded_channel();
std::thread::spawn(move || {
match execsnoop::watch() {
Ok(mut watcher) => {
// Listen for spawned process, scheduling them to be handled with a delay of 1 second after creation.
// The delay is to ensure that a process has been added to a cgroup
while let Some(process) = watcher.next() {
let Ok(cmdline) = std::str::from_utf8(process.cmd) else {
continue
};

let name = process::name(cmdline);

tracing::debug!(
"{:?} created by {:?} ({name})",
process.pid,
process.parent_pid
);
let _res = scheduled_tx.send((
Instant::now() + Duration::from_secs(2),
ExecCreate {
pid: process.pid,
parent_pid: process.parent_pid,
name: name.to_owned(),
cmdline: cmdline.to_owned(),
},
));
}
}
Err(error) => {
tracing::error!("failed to start execsnoop: {error}");
}
}
});

tokio::task::spawn_local(async move {
while let Some((delay, process)) = scheduled_rx.recv().await {
tokio::time::sleep_until(delay.into()).await;
let _res = tx.send(Event::ExecCreate(process)).await;
}
});
}

fn uptime() -> Option<u64> {
let uptime = std::fs::read_to_string("/proc/uptime").ok()?;
let seconds = uptime.split('.').next()?;
seconds.parse::<u64>().ok()
}
}
21 changes: 11 additions & 10 deletions daemon/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,20 @@ impl<'owner> Process<'owner> {
#[derive(Default)]
pub struct Map<'owner> {
pub map: HashMap<u64, Arc<LCell<'owner, Process<'owner>>>>,
pub pid_map: HashMap<u32, Arc<LCell<'owner, Process<'owner>>>>,
pub drain: HashSet<u64>,
}

impl<'owner> Map<'owner> {
/// Removes processes that remain in the drain filter.
pub fn drain_filter(&mut self) {
pub fn drain_filter(&mut self, owner: &LCellOwner<'owner>) {
for hash in self.drain.drain() {
self.map.remove(&hash);
if let Some(process) = self.map.remove(&hash) {
self.pid_map.remove(&process.ro(owner).id);
}
}

self.map.shrink_to(1024);
}

/// This will be used to keep track of what processes were destroyed since the last refresh.
Expand All @@ -82,14 +87,8 @@ impl<'owner> Map<'owner> {
}
}

pub fn get_pid(
&self,
token: &LCellOwner<'owner>,
pid: u32,
) -> Option<&Arc<LCell<'owner, Process<'owner>>>> {
self.map
.values()
.find(|&process| process.ro(token).id == pid)
pub fn get_pid(&self, pid: u32) -> Option<&Arc<LCell<'owner, Process<'owner>>>> {
self.pid_map.get(&pid)
}

pub fn insert(
Expand Down Expand Up @@ -117,9 +116,11 @@ impl<'owner> Map<'owner> {
entry.get().clone()
}
Entry::Vacant(entry) => {
let pid = process.id;
let process = Arc::new(LCell::new(process));

entry.insert(process.clone());
self.pid_map.insert(pid, process.clone());
process
}
}
Expand Down
24 changes: 20 additions & 4 deletions daemon/src/pw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ async fn pipewire_service(tx: Sender<ProcessEvent>) {
SocketEvent::Add(socket) => {
if !active_sessions.contains(&socket) {
if let Ok(stream) = UnixStream::connect(&socket) {
active_sessions.insert(socket.clone());
let tx = tx.clone();
let pw_tx = pw_tx.clone();
std::thread::spawn(move || {
Expand Down Expand Up @@ -106,23 +107,34 @@ pub(crate) async fn monitor(tx: Sender<Event>) {
loop {
tokio::time::sleep(Duration::from_secs(3)).await;

let result = std::process::Command::new("system76-scheduler")
let exe_link_target = std::fs::read_link("/proc/self/exe");
let Ok(exe) = exe_link_target else {
tracing::error!("failed to determine the daemon exe name: {:?}", exe_link_target.err());
break;
};

tracing::debug!("connected to pipewire");

let result = std::process::Command::new(exe)
.arg("pipewire")
.stdin(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.spawn();

let Ok(mut child) = result else {
continue;
tracing::error!("failed to spawn pipewire watcher: {:?}", result.err());
break;
};

let Some(stdout) = child.stdout.take() else {
continue;
tracing::error!("pipewire process is missing the stdout pipe");
break;
};

let Ok(stdout) = tokio::process::ChildStdout::from_std(stdout) else {
continue;
tracing::error!("failed to create tokio stdout from pipewire process");
break;
};

let mut stdout = tokio::io::BufReader::new(stdout);
Expand All @@ -142,16 +154,20 @@ pub(crate) async fn monitor(tx: Sender<Event>) {
if !managed.insert(pid) {
continue;
}
tracing::debug!("{pid} started using pipewire");
}
ProcessEvent::Remove(pid) => {
if !managed.remove(&pid) {
continue;
}
tracing::debug!("{pid} stopped using pipewire");
}
}

let _res = tx.send(Event::Pipewire(event)).await;
}
}
}

tracing::info!("stopped listening to pipewire");
}