Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
CodeSandwich committed Dec 13, 2019
1 parent 96c50b3 commit c37e8df
Showing 1 changed file with 119 additions and 58 deletions.
177 changes: 119 additions & 58 deletions jormungandr/src/utils/fire_forget_scheduler.rs
Expand Up @@ -3,11 +3,41 @@ use std::collections::VecDeque;
use std::convert::Infallible;
use std::hash::Hash;
use std::time::Duration;
use tokio::sync::mpsc::{self, Receiver, Sender};
use thiserror::Error;
use tokio::sync::mpsc::{
self,
error::{RecvError, TrySendError},
Receiver, Sender,
};
use tokio::timer::delay_queue::{DelayQueue, Key};

const COMMAND_CHANNEL_SIZE: usize = 16;

#[derive(Error, Debug)]
pub enum Error {
#[error("failed to send a command: {0}")]
CommandSend(&'static str),
#[error("failed to receive a command")]
CommandReceive(#[from] RecvError),
#[error("command queue closed")]
CommandQueueClosed,
#[error("timer error")]
Timer(#[from] tokio::timer::Error),
}

impl<T> From<TrySendError<T>> for Error {
fn from(error: TrySendError<T>) -> Self {
let cause = if error.is_closed() {
"channel closed"
} else if error.is_full() {
"no available capacity"
} else {
"unknown channel error"
};
Error::CommandSend(cause)
}
}

/// never finishes, used only to drive scheduling
/// DATA should be consistent across calls, first TID completion clears all
/// TID - Task ID
Expand All @@ -22,9 +52,9 @@ where
{
command_sender: FireForgetScheduler<TID, WID, DATA>,
command_receiver: Receiver<Command<TID, WID, DATA>>,
scheduled: VecDeque<(WorkerTask<TID, WID>, DATA)>, //TODO refactor into separate structs
running: Vec<(WorkerTask<TID, WID>, Key)>,
timeouts: DelayQueue<WorkerTask<TID, WID>>,
scheduled: VecDeque<ScheduledTask<TID, WID, DATA>>,
running: Vec<RunningTask<TID, WID>>,
timeouts: DelayQueue<TimedOutTask<TID, WID>>,
launcher: LAUNCHER,
max_running_same_task: usize,
timeout: Duration,
Expand Down Expand Up @@ -60,17 +90,17 @@ where
self.command_sender.clone()
}

fn schedule(&mut self, tid: TID, wid: WID, data: DATA) {
self.scheduled.push_back((WorkerTask { tid, wid }, data));
fn schedule(&mut self, task: ScheduledTask<TID, WID, DATA>) {
self.scheduled.push_back(task);
self.try_run_scheduled();
}

fn declare_completed(&mut self, tid: TID) {
self.scheduled.retain(|scheduled| scheduled.0.tid != tid);
fn declare_completed(&mut self, task: TID) {
self.scheduled.retain(|scheduled| scheduled.tid != task);
let timeouts = &mut self.timeouts;
self.running.retain(|running| {
if running.0.tid == tid {
timeouts.remove(&running.1);
if running.tid == task {
timeouts.remove(&running.timeout_key);
false
} else {
true
Expand All @@ -79,31 +109,37 @@ where
self.try_run_scheduled();
}

fn declare_timed_out(&mut self, assigned_task: WorkerTask<TID, WID>) {
fn declare_timed_out(&mut self, timed_out: TimedOutTask<TID, WID>) {
self.running
.retain(|(ref assigned_task, _)| assigned_task != assigned_task);
.retain(|running| !running.is_timed_out(&timed_out));
self.try_run_scheduled();
}

fn try_run_scheduled(&mut self) {
while self.running.len() < self.running.capacity() {
let run_idx_opt = self.scheduled.iter().enumerate().find(|scheduled| {
let running_same_task = self
.running
.iter()
.filter(|running| running.0.tid == (scheduled.1).0.tid)
.count();
running_same_task < self.max_running_same_task
});
let (worker_task, data) = match run_idx_opt {
Some((run_idx, _)) => self.scheduled.remove(run_idx).unwrap(),
let scheduled = match self.pop_next_runnable_task() {
Some(scheduled) => scheduled,
None => break,
};
let key = self.timeouts.insert(worker_task.clone(), self.timeout);
self.running.push((worker_task.clone(), key));
(self.launcher)(worker_task.tid, worker_task.wid, data);
let timeout_key = self.timeouts.insert(scheduled.to_timed_out(), self.timeout);
self.running.push(scheduled.to_running(timeout_key));
(self.launcher)(scheduled.tid, scheduled.wid, scheduled.data);
}
}

fn pop_next_runnable_task(&mut self) -> Option<ScheduledTask<TID, WID, DATA>> {
self.scheduled
.iter()
.position(|scheduled| self.task_run_count(&scheduled.tid) < self.max_running_same_task)
.and_then(|run_idx| self.scheduled.remove(run_idx))
}

fn task_run_count(&self, tid: &TID) -> usize {
self.running
.iter()
.filter(|running| running.tid == *tid)
.count()
}
}

impl<TID, WID, DATA, LAUNCHER> Future for FireForgetSchedulerFuture<TID, WID, DATA, LAUNCHER>
Expand All @@ -113,41 +149,64 @@ where
LAUNCHER: Fn(TID, WID, DATA),
{
type Item = Infallible;
type Error = Infallible;

fn poll(&mut self) -> Poll<Infallible, Infallible> {
loop {
match self.command_receiver.poll() {
Err(e) => panic!("FireForgetScheduler command queue failed: {}", e), // TODO PROPER ERRORS
Ok(Async::Ready(None)) => panic!("FireForgetScheduler command queue closed"), // TODO PROPER ERRORS
Ok(Async::Ready(Some(Command::Schedule { tid, wid, data }))) => {
self.schedule(tid, wid, data)
}
Ok(Async::Ready(Some(Command::DeclareCompleted { tid }))) => {
self.declare_completed(tid)
}
Ok(Async::NotReady) => break,
type Error = Error;

fn poll(&mut self) -> Poll<Infallible, Error> {
while let Async::Ready(command_opt) = self.command_receiver.poll()? {
match command_opt {
None => return Err(Error::CommandQueueClosed),
Some(Command::Schedule { task }) => self.schedule(task),
Some(Command::DeclareCompleted { task }) => self.declare_completed(task),
}
}
loop {
match self.timeouts.poll() {
Err(e) => panic!("FireForgetScheduler timeouts failed: {}", e), // TODO PROPER ERRORS
Ok(Async::Ready(None)) => panic!("FireForgetScheduler timeouts closed"), // TODO PROPER ERRORS
Ok(Async::Ready(Some(expired))) => self.declare_timed_out(expired.into_inner()),
Ok(Async::NotReady) => break,
}
while let Async::Ready(Some(expired)) = self.timeouts.poll()? {
self.declare_timed_out(expired.into_inner());
}
Ok(Async::NotReady)
}
}

enum Command<TID, WID, DATA> {
Schedule { tid: TID, wid: WID, data: DATA },
DeclareCompleted { tid: TID },
Schedule { task: ScheduledTask<TID, WID, DATA> },
DeclareCompleted { task: TID },
}

#[derive(Clone, PartialEq)]
struct WorkerTask<TID, WID> {
struct ScheduledTask<TID, WID, DATA> {
pub tid: TID,
pub wid: WID,
pub data: DATA,
}

impl<TID: Clone, WID: Clone, DATA> ScheduledTask<TID, WID, DATA> {
fn to_running(&self, timeout_key: Key) -> RunningTask<TID, WID> {
RunningTask {
tid: self.tid.clone(),
wid: self.wid.clone(),
timeout_key,
}
}

fn to_timed_out(&self) -> TimedOutTask<TID, WID> {
TimedOutTask {
tid: self.tid.clone(),
wid: self.wid.clone(),
}
}
}

struct RunningTask<TID, WID> {
pub tid: TID,
pub wid: WID,
pub timeout_key: Key,
}

impl<TID: PartialEq, WID: PartialEq> RunningTask<TID, WID> {
fn is_timed_out(&self, timed_out: &TimedOutTask<TID, WID>) -> bool {
self.tid == timed_out.tid && self.wid == timed_out.wid
}
}

struct TimedOutTask<TID, WID> {
pub tid: TID,
pub wid: WID,
}
Expand All @@ -165,15 +224,17 @@ impl<TID, WID, DATA> Clone for FireForgetScheduler<TID, WID, DATA> {
}

impl<TID, WID, DATA> FireForgetScheduler<TID, WID, DATA> {
pub fn schedule(&mut self, tid: TID, wid: WID, data: DATA) {
self.sender
.try_send(Command::Schedule { tid, wid, data })
.map_err(|_| unimplemented!()); //TODO PROPER ERRORS
pub fn schedule(&mut self, tid: TID, wid: WID, data: DATA) -> Result<(), Error> {
self.try_send(Command::Schedule {
task: ScheduledTask { tid, wid, data },
})
}

pub fn declare_completed(&mut self, task: TID) -> Result<(), Error> {
self.try_send(Command::DeclareCompleted { task })
}

pub fn declare_completed(&mut self, tid: TID) {
self.sender
.try_send(Command::DeclareCompleted { tid })
.map_err(|_| unimplemented!()); //TODO PROPER ERRORS
fn try_send(&mut self, command: Command<TID, WID, DATA>) -> Result<(), Error> {
self.sender.try_send(command).map_err(Into::into)
}
}

0 comments on commit c37e8df

Please sign in to comment.