Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
CodeSandwich committed Dec 13, 2019
1 parent 7868354 commit 96c50b3
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 0 deletions.
179 changes: 179 additions & 0 deletions jormungandr/src/utils/fire_forget_scheduler.rs
@@ -0,0 +1,179 @@
use futures::{Async, Future, Poll, Stream};
use std::collections::VecDeque;
use std::convert::Infallible;
use std::hash::Hash;
use std::time::Duration;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::timer::delay_queue::{DelayQueue, Key};

const COMMAND_CHANNEL_SIZE: usize = 16;

/// never finishes, used only to drive scheduling
/// DATA should be consistent across calls, first TID completion clears all
/// TID - Task ID
/// WID - Worker ID
/// Must be run in Tokio context
/// Launcher must be fast and non-blocking
pub struct FireForgetSchedulerFuture<TID, WID, DATA, LAUNCHER>
where
TID: Clone + Hash + Eq,
WID: Clone + PartialEq,
LAUNCHER: Fn(TID, WID, DATA),
{
command_sender: FireForgetScheduler<TID, WID, DATA>,
command_receiver: Receiver<Command<TID, WID, DATA>>,
scheduled: VecDeque<(WorkerTask<TID, WID>, DATA)>, //TODO refactor into separate structs
running: Vec<(WorkerTask<TID, WID>, Key)>,
timeouts: DelayQueue<WorkerTask<TID, WID>>,
launcher: LAUNCHER,
max_running_same_task: usize,
timeout: Duration,
}

impl<TID, WID, DATA, LAUNCHER> FireForgetSchedulerFuture<TID, WID, DATA, LAUNCHER>
where
TID: Clone + Hash + Eq,
WID: Clone + PartialEq,
LAUNCHER: Fn(TID, WID, DATA),
{
pub fn new(
max_running: usize,
max_running_same_task: usize,
timeout: Duration,
launcher: LAUNCHER,
) -> Self {
let (sender, command_receiver) = mpsc::channel(COMMAND_CHANNEL_SIZE);
let command_sender = FireForgetScheduler { sender };
FireForgetSchedulerFuture {
command_sender,
command_receiver,
scheduled: VecDeque::new(),
running: Vec::with_capacity(max_running),
timeouts: DelayQueue::with_capacity(max_running),
launcher,
max_running_same_task,
timeout,
}
}

pub fn scheduler(&self) -> FireForgetScheduler<TID, WID, DATA> {
self.command_sender.clone()
}

fn schedule(&mut self, tid: TID, wid: WID, data: DATA) {
self.scheduled.push_back((WorkerTask { tid, wid }, data));
self.try_run_scheduled();
}

fn declare_completed(&mut self, tid: TID) {
self.scheduled.retain(|scheduled| scheduled.0.tid != tid);
let timeouts = &mut self.timeouts;
self.running.retain(|running| {
if running.0.tid == tid {
timeouts.remove(&running.1);
false
} else {
true
}
});
self.try_run_scheduled();
}

fn declare_timed_out(&mut self, assigned_task: WorkerTask<TID, WID>) {
self.running
.retain(|(ref assigned_task, _)| assigned_task != assigned_task);
self.try_run_scheduled();
}

fn try_run_scheduled(&mut self) {
while self.running.len() < self.running.capacity() {
let run_idx_opt = self.scheduled.iter().enumerate().find(|scheduled| {
let running_same_task = self
.running
.iter()
.filter(|running| running.0.tid == (scheduled.1).0.tid)
.count();
running_same_task < self.max_running_same_task
});
let (worker_task, data) = match run_idx_opt {
Some((run_idx, _)) => self.scheduled.remove(run_idx).unwrap(),
None => break,
};
let key = self.timeouts.insert(worker_task.clone(), self.timeout);
self.running.push((worker_task.clone(), key));
(self.launcher)(worker_task.tid, worker_task.wid, data);
}
}
}

impl<TID, WID, DATA, LAUNCHER> Future for FireForgetSchedulerFuture<TID, WID, DATA, LAUNCHER>
where
TID: Clone + Hash + Eq,
WID: Clone + PartialEq,
LAUNCHER: Fn(TID, WID, DATA),
{
type Item = Infallible;
type Error = Infallible;

fn poll(&mut self) -> Poll<Infallible, Infallible> {
loop {
match self.command_receiver.poll() {
Err(e) => panic!("FireForgetScheduler command queue failed: {}", e), // TODO PROPER ERRORS
Ok(Async::Ready(None)) => panic!("FireForgetScheduler command queue closed"), // TODO PROPER ERRORS
Ok(Async::Ready(Some(Command::Schedule { tid, wid, data }))) => {
self.schedule(tid, wid, data)
}
Ok(Async::Ready(Some(Command::DeclareCompleted { tid }))) => {
self.declare_completed(tid)
}
Ok(Async::NotReady) => break,
}
}
loop {
match self.timeouts.poll() {
Err(e) => panic!("FireForgetScheduler timeouts failed: {}", e), // TODO PROPER ERRORS
Ok(Async::Ready(None)) => panic!("FireForgetScheduler timeouts closed"), // TODO PROPER ERRORS
Ok(Async::Ready(Some(expired))) => self.declare_timed_out(expired.into_inner()),
Ok(Async::NotReady) => break,
}
}
Ok(Async::NotReady)
}
}

enum Command<TID, WID, DATA> {
Schedule { tid: TID, wid: WID, data: DATA },
DeclareCompleted { tid: TID },
}

#[derive(Clone, PartialEq)]
struct WorkerTask<TID, WID> {
pub tid: TID,
pub wid: WID,
}

pub struct FireForgetScheduler<TID, WID, DATA> {
sender: Sender<Command<TID, WID, DATA>>,
}

impl<TID, WID, DATA> Clone for FireForgetScheduler<TID, WID, DATA> {
fn clone(&self) -> Self {
FireForgetScheduler {
sender: self.sender.clone(),
}
}
}

impl<TID, WID, DATA> FireForgetScheduler<TID, WID, DATA> {
pub fn schedule(&mut self, tid: TID, wid: WID, data: DATA) {
self.sender
.try_send(Command::Schedule { tid, wid, data })
.map_err(|_| unimplemented!()); //TODO PROPER ERRORS
}

pub fn declare_completed(&mut self, tid: TID) {
self.sender
.try_send(Command::DeclareCompleted { tid })
.map_err(|_| unimplemented!()); //TODO PROPER ERRORS
}
}
1 change: 1 addition & 0 deletions jormungandr/src/utils/mod.rs
@@ -1,3 +1,4 @@
pub mod async_msg;
pub mod borrow;
pub mod fire_forget_scheduler;
pub mod task;

0 comments on commit 96c50b3

Please sign in to comment.