New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DO NOT MERGE] constellation: Simplify the scheduler so it doesn't create a thread for each event #11272
Closed
+40
−132
Closed
[DO NOT MERGE] constellation: Simplify the scheduler so it doesn't create a thread for each event #11272
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.
Loading status checks…
[wip] constellation: Simplify the scheduler so it doesn't create a th…
…read for each event This can't land as-is, just wanted to make sure this idea is fine, and do a try run because I need to leave now.
- Loading branch information
emilio
committed
May 19, 2016
commit 28f0b0964b6b6b3368205434dd524f03b98c52e5
Unverified
This user has not uploaded their public key yet.
GPG key ID: 056B727BB9C1027C
Learn about signing commits
| @@ -5,78 +5,17 @@ | ||
| use euclid::length::Length; | ||
| use ipc_channel::ipc::{self, IpcSender}; | ||
| use ipc_channel::router::ROUTER; | ||
| use script_traits::{MsDuration, NsDuration, precise_time_ms, precise_time_ns}; | ||
| use script_traits::{NsDuration, precise_time_ns}; | ||
| use script_traits::{TimerEvent, TimerEventRequest}; | ||
| use std::cell::RefCell; | ||
| use std::cmp::{self, Ord}; | ||
| use std::collections::BinaryHeap; | ||
| use std::sync::Arc; | ||
| use std::sync::atomic::{self, AtomicBool}; | ||
| use std::sync::mpsc::{channel, Receiver, Select}; | ||
| use std::thread::{self, spawn, Thread}; | ||
| use std::time::Duration; | ||
| use std::sync::mpsc::Receiver; | ||
| use util::thread::spawn_named; | ||
|
|
||
| /// A quick hack to work around the removal of [`std::old_io::timer::Timer`]( | ||
| /// http://doc.rust-lang.org/1.0.0-beta/std/old_io/timer/struct.Timer.html ) | ||
| struct CancelableOneshotTimer { | ||
| thread: Thread, | ||
| canceled: Arc<AtomicBool>, | ||
| port: Receiver<()>, | ||
| } | ||
|
|
||
| impl CancelableOneshotTimer { | ||
| fn new(duration: MsDuration) -> CancelableOneshotTimer { | ||
| let (tx, rx) = channel(); | ||
| let canceled = Arc::new(AtomicBool::new(false)); | ||
| let canceled_clone = canceled.clone(); | ||
|
|
||
| let thread = spawn(move || { | ||
| let due_time = precise_time_ms() + duration; | ||
|
|
||
| let mut park_time = duration; | ||
|
|
||
| loop { | ||
| thread::park_timeout(Duration::from_millis(park_time.get())); | ||
|
|
||
| if canceled_clone.load(atomic::Ordering::Relaxed) { | ||
| return; | ||
| } | ||
|
|
||
| // park_timeout_ms does not guarantee parking for the | ||
| // given amout. We might have woken up early. | ||
| let current_time = precise_time_ms(); | ||
| if current_time >= due_time { | ||
| let _ = tx.send(()); | ||
| return; | ||
| } | ||
| park_time = due_time - current_time; | ||
| } | ||
| }).thread().clone(); | ||
|
|
||
| CancelableOneshotTimer { | ||
| thread: thread, | ||
| canceled: canceled, | ||
| port: rx, | ||
| } | ||
| } | ||
|
|
||
| fn port(&self) -> &Receiver<()> { | ||
| &self.port | ||
| } | ||
|
|
||
| fn cancel(&self) { | ||
| self.canceled.store(true, atomic::Ordering::Relaxed); | ||
| self.thread.unpark(); | ||
| } | ||
| } | ||
|
|
||
| pub struct TimerScheduler { | ||
| port: Receiver<TimerEventRequest>, | ||
|
|
||
| scheduled_events: RefCell<BinaryHeap<ScheduledEvent>>, | ||
|
|
||
| timer: RefCell<Option<CancelableOneshotTimer>>, | ||
| scheduled_events: BinaryHeap<ScheduledEvent>, | ||
| } | ||
|
|
||
| struct ScheduledEvent { | ||
| @@ -103,6 +42,18 @@ impl PartialEq for ScheduledEvent { | ||
| } | ||
| } | ||
|
|
||
| fn recv_with_timeout<T: Send>(port: &Receiver<T>, from: NsDuration, timeout: NsDuration) -> Option<T> { | ||
emilio
Author
Member
|
||
| loop { | ||
| if let Ok(ret) = port.try_recv() { | ||
| return Some(ret); | ||
| } | ||
| let now = precise_time_ns(); | ||
| if now - from >= timeout { | ||
| return None; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| enum Task { | ||
| HandleRequest(TimerEventRequest), | ||
| DispatchDueEvents, | ||
| @@ -112,12 +63,9 @@ impl TimerScheduler { | ||
| pub fn start() -> IpcSender<TimerEventRequest> { | ||
| let (chan, port) = ipc::channel().unwrap(); | ||
|
|
||
| let timer_scheduler = TimerScheduler { | ||
| let mut timer_scheduler = TimerScheduler { | ||
| port: ROUTER.route_ipc_receiver_to_new_mpsc_receiver(port), | ||
|
|
||
| scheduled_events: RefCell::new(BinaryHeap::new()), | ||
|
|
||
| timer: RefCell::new(None), | ||
| scheduled_events: BinaryHeap::new(), | ||
| }; | ||
|
|
||
| spawn_named("TimerScheduler".to_owned(), move || { | ||
| @@ -127,95 +75,55 @@ impl TimerScheduler { | ||
| chan | ||
| } | ||
|
|
||
| fn run_event_loop(&self) { | ||
| while let Some(thread) = self.receive_next_task() { | ||
| match thread { | ||
| Task::HandleRequest(request) => self.handle_request(request), | ||
| Task::DispatchDueEvents => self.dispatch_due_events(), | ||
| fn get_next_task(&mut self) -> Option<Task> { | ||
| let now = precise_time_ns(); | ||
| if let Some(event) = self.scheduled_events.peek() { | ||
| if event.for_time < now { | ||
| return Some(Task::DispatchDueEvents); | ||
| } | ||
|
|
||
| let timeout = now - event.for_time; | ||
emilio
Author
Member
|
||
| recv_with_timeout(&self.port, now, timeout).map(Task::HandleRequest) | ||
| } else { | ||
| self.port.recv().ok().map(Task::HandleRequest) | ||
| } | ||
| } | ||
|
|
||
| #[allow(unsafe_code)] | ||
| fn receive_next_task(&self) -> Option<Task> { | ||
| let port = &self.port; | ||
| let timer = self.timer.borrow(); | ||
| let timer_port = timer.as_ref().map(|timer| timer.port()); | ||
|
|
||
| if let Some(ref timer_port) = timer_port { | ||
| let sel = Select::new(); | ||
| let mut scheduler_handle = sel.handle(port); | ||
| let mut timer_handle = sel.handle(timer_port); | ||
|
|
||
| unsafe { | ||
| scheduler_handle.add(); | ||
| timer_handle.add(); | ||
| } | ||
|
|
||
| let ret = sel.wait(); | ||
| if ret == scheduler_handle.id() { | ||
| port.recv().ok().map(Task::HandleRequest) | ||
| } else if ret == timer_handle.id() { | ||
| timer_port.recv().ok().map(|_| Task::DispatchDueEvents) | ||
| } else { | ||
| panic!("unexpected select result!") | ||
| fn run_event_loop(&mut self) { | ||
| loop { | ||
| let task = self.get_next_task(); | ||
| if let Some(task) = task { | ||
| match task { | ||
| Task::DispatchDueEvents => self.dispatch_due_events(), | ||
| Task::HandleRequest(req) => self.add_request(req), | ||
| } | ||
| } | ||
| } else { | ||
| port.recv().ok().map(Task::HandleRequest) | ||
| } | ||
| } | ||
|
|
||
| fn handle_request(&self, request: TimerEventRequest) { | ||
| fn add_request(&mut self, request: TimerEventRequest) { | ||
| let TimerEventRequest(_, _, _, duration_ms) = request; | ||
| let duration_ns = Length::new(duration_ms.get() * 1000 * 1000); | ||
| let schedule_for = precise_time_ns() + duration_ns; | ||
|
|
||
| let previously_earliest = self.scheduled_events.borrow().peek() | ||
| .map_or(Length::new(u64::max_value()), |scheduled| scheduled.for_time); | ||
|
|
||
| self.scheduled_events.borrow_mut().push(ScheduledEvent { | ||
| self.scheduled_events.push(ScheduledEvent { | ||
| request: request, | ||
| for_time: schedule_for, | ||
| }); | ||
|
|
||
| if schedule_for < previously_earliest { | ||
| self.start_timer_for_next_event(); | ||
| } | ||
| } | ||
|
|
||
| fn dispatch_due_events(&self) { | ||
| fn dispatch_due_events(&mut self) { | ||
| let now = precise_time_ns(); | ||
|
|
||
| { | ||
| let mut events = self.scheduled_events.borrow_mut(); | ||
| let mut events = &mut self.scheduled_events; | ||
|
|
||
| { | ||
| while !events.is_empty() && events.peek().as_ref().unwrap().for_time <= now { | ||
| let event = events.pop().unwrap(); | ||
| let TimerEventRequest(chan, source, id, _) = event.request; | ||
|
|
||
| let _ = chan.send(TimerEvent(source, id)); | ||
| } | ||
| } | ||
|
|
||
| self.start_timer_for_next_event(); | ||
| } | ||
|
|
||
| fn start_timer_for_next_event(&self) { | ||
| let events = self.scheduled_events.borrow(); | ||
| let next_event = events.peek(); | ||
|
|
||
| let mut timer = self.timer.borrow_mut(); | ||
|
|
||
| if let Some(ref mut timer) = *timer { | ||
| timer.cancel(); | ||
| } | ||
|
|
||
| *timer = next_event.map(|next_event| { | ||
| let delay_ns = next_event.for_time.get().saturating_sub(precise_time_ns().get()); | ||
| // Round up, we'd rather be late than early… | ||
| let delay_ms = Length::new(delay_ns.saturating_add(999999) / (1000 * 1000)); | ||
|
|
||
| CancelableOneshotTimer::new(delay_ms) | ||
| }); | ||
| } | ||
| } | ||
ProTip!
Use n and p to navigate between commits in a pull request.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Taking from as an argument seems a little odd.