Skip to content

Commit

Permalink
Trying to fix race-conditions of /proc traversal
Browse files Browse the repository at this point in the history
  • Loading branch information
tailhook committed Aug 2, 2018
1 parent b51d670 commit 131de34
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 48 deletions.
10 changes: 10 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ Lithos Changes By Release
=========================


.. _changelog 0.18.3:

v0.18.3
=======

* Bugfix: it looks like that reading through ``/proc/`` is inherently racy,
i.e. some process may be skipped. This commit fixes walk faster and traverse
directory twice. More elaborate fix will be implemented in future.


.. _changelog 0.18.2:

v0.18.2
Expand Down
106 changes: 58 additions & 48 deletions src/bin/lithos_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::mem::replace;
use std::fs::{File, OpenOptions, metadata, remove_file, rename};
use std::io::{self, stderr, Read, Write};
use std::str::{FromStr};
use std::fs::{remove_dir};
use std::fs::{remove_dir, read_dir};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, Instant, Duration};
Expand Down Expand Up @@ -301,66 +301,76 @@ fn recover_sockets(sockets: &mut HashMap<InetAddr, Socket>) {
}).map_err(|e| error!("Error enumerating my fds: {}", e)).ok();
}

fn list_proc(pids: &mut HashSet<Pid>) -> Result<(), io::Error> {
for item in read_dir("/proc")? {
item?.file_name().to_str()
.and_then(|pid| FromStr::from_str(&pid).map(Pid::from_raw).ok())
.map(|pid| pids.insert(pid));
}
Ok(())
}

fn recover_processes(children: &mut HashMap<Pid, Child>,
configs: &mut HashMap<String, Process>,
queue: &mut Queue<Timeout>, metrics: &metrics::Metrics, config_file: &Path)
{
let mypid = getpid();
let now = Instant::now();

// Recover old workers
scan_dir::ScanDir::all().read("/proc", |iter| {
let pids = iter.filter_map(|(_, pid)| {
FromStr::from_str(&pid).map(Pid::from_raw).ok()
});
for pid in pids {
if !_is_child(pid, mypid) {
continue;
}
if let Ok((name, cfg_text)) = _read_args(pid, config_file) {
match configs.remove(&name) {
Some(child) => {
if &child.config[..] != &cfg_text[..] {
warn!("Config mismatch: {}, pid: {}. Upgrading...",
name, pid);
kill(pid, Signal::SIGTERM)
.map_err(|e|
error!("Error sending TERM to {}: {:?}",
pid, e)).ok();
queue.add(now +
duration(child.inner_config.kill_timeout),
Kill(pid));
}
metrics.processes[&child.base_name].running.incr(1);
metrics.running.incr(1);
children.insert(pid, Child::Process(child));
}
None => {
warn!("Undefined child name: {}, pid: {}. \
Sending SIGTERM...", name, pid);
children.insert(pid, Child::Unidentified(name));
let mut pids = HashSet::new();
// Read pids ast fast as possible,...
list_proc(&mut pids).expect("can read /proc");
// .. then read again to ensure that we didn't miss any process because
// of process reordering
list_proc(&mut pids).expect("can read /proc");

for pid in pids {
if !_is_child(pid, mypid) {
continue;
}
if let Ok((name, cfg_text)) = _read_args(pid, config_file) {
match configs.remove(&name) {
Some(child) => {
if &child.config[..] != &cfg_text[..] {
warn!("Config mismatch: {}, pid: {}. Upgrading...",
name, pid);
kill(pid, Signal::SIGTERM)
.map_err(|e| error!("Error sending TERM to {}: {:?}",
pid, e)).ok();
queue.add(
now + duration(DEFAULT_KILL_TIMEOUT),
.map_err(|e|
error!("Error sending TERM to {}: {:?}",
pid, e)).ok();
queue.add(now +
duration(child.inner_config.kill_timeout),
Kill(pid));
metrics.unknown.incr(1);
}
};
} else {
warn!("Undefined child, pid: {}. Sending SIGTERM...", pid);
kill(pid, Signal::SIGTERM)
metrics.processes[&child.base_name].running.incr(1);
metrics.running.incr(1);
children.insert(pid, Child::Process(child));
}
None => {
warn!("Undefined child name: {}, pid: {}. \
Sending SIGTERM...", name, pid);
children.insert(pid, Child::Unidentified(name));
kill(pid, Signal::SIGTERM)
.map_err(|e| error!("Error sending TERM to {}: {:?}",
pid, e)).ok();
queue.add(
now + duration(DEFAULT_KILL_TIMEOUT),
Kill(pid));
metrics.unknown.incr(1);
continue;
}
queue.add(
now + duration(DEFAULT_KILL_TIMEOUT),
Kill(pid));
metrics.unknown.incr(1);
}
};
} else {
warn!("Undefined child, pid: {}. Sending SIGTERM...", pid);
kill(pid, Signal::SIGTERM)
.map_err(|e| error!("Error sending TERM to {}: {:?}",
pid, e)).ok();
queue.add(
now + duration(DEFAULT_KILL_TIMEOUT),
Kill(pid));
metrics.unknown.incr(1);
continue;
}
}).map_err(|e| error!("Error reading /proc: {}", e)).ok();
}
}

fn remove_dangling_state_dirs(names: &HashSet<&str>, master: &MasterConfig)
Expand Down

0 comments on commit 131de34

Please sign in to comment.