Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] constellation: Simplify the scheduler so it doesn't create a thread for each event #11272

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

@@ -5,78 +5,17 @@
use euclid::length::Length;
use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
use script_traits::{MsDuration, NsDuration, precise_time_ms, precise_time_ns};
use script_traits::{NsDuration, precise_time_ns};
use script_traits::{TimerEvent, TimerEventRequest};
use std::cell::RefCell;
use std::cmp::{self, Ord};
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::sync::atomic::{self, AtomicBool};
use std::sync::mpsc::{channel, Receiver, Select};
use std::thread::{self, spawn, Thread};
use std::time::Duration;
use std::sync::mpsc::Receiver;
use util::thread::spawn_named;

/// A quick hack to work around the removal of [`std::old_io::timer::Timer`](
/// http://doc.rust-lang.org/1.0.0-beta/std/old_io/timer/struct.Timer.html )
struct CancelableOneshotTimer {
thread: Thread,
canceled: Arc<AtomicBool>,
port: Receiver<()>,
}

impl CancelableOneshotTimer {
fn new(duration: MsDuration) -> CancelableOneshotTimer {
let (tx, rx) = channel();
let canceled = Arc::new(AtomicBool::new(false));
let canceled_clone = canceled.clone();

let thread = spawn(move || {
let due_time = precise_time_ms() + duration;

let mut park_time = duration;

loop {
thread::park_timeout(Duration::from_millis(park_time.get()));

if canceled_clone.load(atomic::Ordering::Relaxed) {
return;
}

// park_timeout_ms does not guarantee parking for the
// given amout. We might have woken up early.
let current_time = precise_time_ms();
if current_time >= due_time {
let _ = tx.send(());
return;
}
park_time = due_time - current_time;
}
}).thread().clone();

CancelableOneshotTimer {
thread: thread,
canceled: canceled,
port: rx,
}
}

fn port(&self) -> &Receiver<()> {
&self.port
}

fn cancel(&self) {
self.canceled.store(true, atomic::Ordering::Relaxed);
self.thread.unpark();
}
}

pub struct TimerScheduler {
port: Receiver<TimerEventRequest>,

scheduled_events: RefCell<BinaryHeap<ScheduledEvent>>,

timer: RefCell<Option<CancelableOneshotTimer>>,
scheduled_events: BinaryHeap<ScheduledEvent>,
}

struct ScheduledEvent {
@@ -103,6 +42,18 @@ impl PartialEq for ScheduledEvent {
}
}

fn recv_with_timeout<T: Send>(port: &Receiver<T>, from: NsDuration, timeout: NsDuration) -> Option<T> {

This comment has been minimized.

@metajack

metajack May 19, 2016

Contributor

Taking from as an argument seems a little odd.

This comment has been minimized.

@emilio

emilio May 19, 2016

Author Member

Agreed, it was to avoid another precise_time_ns call. I'll see how I can refactor it so it's cleaner :)

loop {
if let Ok(ret) = port.try_recv() {
return Some(ret);
}
let now = precise_time_ns();
if now - from >= timeout {
return None;
}
}
}

enum Task {
HandleRequest(TimerEventRequest),
DispatchDueEvents,
@@ -112,12 +63,9 @@ impl TimerScheduler {
pub fn start() -> IpcSender<TimerEventRequest> {
let (chan, port) = ipc::channel().unwrap();

let timer_scheduler = TimerScheduler {
let mut timer_scheduler = TimerScheduler {
port: ROUTER.route_ipc_receiver_to_new_mpsc_receiver(port),

scheduled_events: RefCell::new(BinaryHeap::new()),

timer: RefCell::new(None),
scheduled_events: BinaryHeap::new(),
};

spawn_named("TimerScheduler".to_owned(), move || {
@@ -127,95 +75,55 @@ impl TimerScheduler {
chan
}

fn run_event_loop(&self) {
while let Some(thread) = self.receive_next_task() {
match thread {
Task::HandleRequest(request) => self.handle_request(request),
Task::DispatchDueEvents => self.dispatch_due_events(),
fn get_next_task(&mut self) -> Option<Task> {
let now = precise_time_ns();
if let Some(event) = self.scheduled_events.peek() {
if event.for_time < now {
return Some(Task::DispatchDueEvents);
}

let timeout = now - event.for_time;

This comment has been minimized.

@emilio

emilio May 19, 2016

Author Member

This is wrong, and should be the inverse, I'll fix it when I'm in front of a computer

recv_with_timeout(&self.port, now, timeout).map(Task::HandleRequest)
} else {
self.port.recv().ok().map(Task::HandleRequest)
}
}

#[allow(unsafe_code)]
fn receive_next_task(&self) -> Option<Task> {
let port = &self.port;
let timer = self.timer.borrow();
let timer_port = timer.as_ref().map(|timer| timer.port());

if let Some(ref timer_port) = timer_port {
let sel = Select::new();
let mut scheduler_handle = sel.handle(port);
let mut timer_handle = sel.handle(timer_port);

unsafe {
scheduler_handle.add();
timer_handle.add();
}

let ret = sel.wait();
if ret == scheduler_handle.id() {
port.recv().ok().map(Task::HandleRequest)
} else if ret == timer_handle.id() {
timer_port.recv().ok().map(|_| Task::DispatchDueEvents)
} else {
panic!("unexpected select result!")
fn run_event_loop(&mut self) {
loop {
let task = self.get_next_task();
if let Some(task) = task {
match task {
Task::DispatchDueEvents => self.dispatch_due_events(),
Task::HandleRequest(req) => self.add_request(req),
}
}
} else {
port.recv().ok().map(Task::HandleRequest)
}
}

fn handle_request(&self, request: TimerEventRequest) {
fn add_request(&mut self, request: TimerEventRequest) {
let TimerEventRequest(_, _, _, duration_ms) = request;
let duration_ns = Length::new(duration_ms.get() * 1000 * 1000);
let schedule_for = precise_time_ns() + duration_ns;

let previously_earliest = self.scheduled_events.borrow().peek()
.map_or(Length::new(u64::max_value()), |scheduled| scheduled.for_time);

self.scheduled_events.borrow_mut().push(ScheduledEvent {
self.scheduled_events.push(ScheduledEvent {
request: request,
for_time: schedule_for,
});

if schedule_for < previously_earliest {
self.start_timer_for_next_event();
}
}

fn dispatch_due_events(&self) {
fn dispatch_due_events(&mut self) {
let now = precise_time_ns();

{
let mut events = self.scheduled_events.borrow_mut();
let mut events = &mut self.scheduled_events;

{
while !events.is_empty() && events.peek().as_ref().unwrap().for_time <= now {
let event = events.pop().unwrap();
let TimerEventRequest(chan, source, id, _) = event.request;

let _ = chan.send(TimerEvent(source, id));
}
}

self.start_timer_for_next_event();
}

fn start_timer_for_next_event(&self) {
let events = self.scheduled_events.borrow();
let next_event = events.peek();

let mut timer = self.timer.borrow_mut();

if let Some(ref mut timer) = *timer {
timer.cancel();
}

*timer = next_event.map(|next_event| {
let delay_ns = next_event.for_time.get().saturating_sub(precise_time_ns().get());
// Round up, we'd rather be late than early…
let delay_ms = Length::new(delay_ns.saturating_add(999999) / (1000 * 1000));

CancelableOneshotTimer::new(delay_ms)
});
}
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.