Skip to content
Permalink
Browse files

WIP ordering

  • Loading branch information
syntacticsugarglider committed Jan 27, 2020
1 parent 2413ce4 commit 8ecf1ea5840a2128cdfd9d56627257fd2087e797
Showing with 39 additions and 6 deletions.
  1. +1 −0 Cargo.toml
  2. +16 −3 src/channel/id_channel/context.rs
  3. +5 −0 src/channel/id_channel/mod.rs
  4. +9 −1 src/channel/mod.rs
  5. +8 −2 src/format/mod.rs
@@ -29,6 +29,7 @@ url = "2.1.0"
thiserror = "1.0.9"
anyhow = "1.0.26"
setwaker = { git = "https://github.com/noocene/setwaker" }
predicated_ordered = { git = "https://github.com/noocene/predicated-ordered" }
parking_lot = "0.10.0"

[target.wasm32-unknown-unknown.dependencies]
@@ -1,7 +1,7 @@
use alloc::sync::{Arc, Weak};
use core::{any::TypeId, pin::Pin};
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
sync::{Mutex, RwLock},
};

@@ -17,6 +17,7 @@ use futures::{
struct ContextState {
channel_types: HashMap<ForkHandle, TypeId>,
unused_indices: Vec<ForkHandle>,
failed: HashSet<ForkHandle>,
next_index: ForkHandle,
}

@@ -74,16 +75,27 @@ impl Context {
}
}

pub(crate) fn failed(&self) -> bool {
self.state.read().unwrap().failed.len() != 0
}

pub(crate) fn get(&self, id: ForkHandle) -> Option<TypeId> {
let state = self.state.read().unwrap();
state.channel_types.get(&id).cloned()
let mut state = self.state.write().unwrap();
let ty = state.channel_types.get(&id).cloned();
if ty.is_none() {
state.failed.insert(id);
} else {
state.failed.remove(&id);
}
ty
}

pub(crate) fn new() -> Self {
Context {
state: Arc::new(RwLock::new(ContextState {
channel_types: HashMap::new(),
next_index: ForkHandle(0),
failed: HashSet::new(),
unused_indices: vec![],
})),
tasks: Arc::new(Mutex::new(HashMap::new())),
@@ -94,6 +106,7 @@ impl Context {
Context {
state: Arc::new(RwLock::new(ContextState {
channel_types: HashMap::new(),
failed: HashSet::new(),
next_index: ForkHandle(1),
unused_indices: vec![],
})),
@@ -218,9 +218,14 @@ impl ISink<Item> for IdChannel {
}

impl Waiter for Context {
type Item = Item;

fn wait_for(&self, data: String) -> Future<()> {
Box::pin(self.wait_for(ForkHandle(data.parse().unwrap())))
}
fn predicate(&self, item: &Self::Item) -> bool {
self.failed()
}
}

impl<'de> IContext<'de> for IdChannel {
@@ -72,12 +72,20 @@ pub trait Target<'a, K: Kind>: Context<'a> + Sized + Send + Sync {
}

pub trait Waiter {
type Item;

fn wait_for(&self, data: String) -> Future<()>;
fn predicate(&self, item: &Self::Item) -> bool;
}

pub trait Context<'de> {
type Item: Serialize + Sync + Send + 'static;
type Target: Waiter + DeserializeSeed<'de, Value = Self::Item> + Clone + Sync + Send + 'static;
type Target: Waiter<Item = Self::Item>
+ DeserializeSeed<'de, Value = Self::Item>
+ Clone
+ Sync
+ Send
+ 'static;

fn context(&self) -> Self::Target;
}
@@ -12,6 +12,8 @@ pub mod bincode;
#[doc(inline)]
pub use bincode::Bincode;

use predicated_ordered::BufferedPredicatedExt;

use futures::{
channel::mpsc::{unbounded, UnboundedReceiver},
future::ok,
@@ -144,6 +146,7 @@ where
{
let shim = U::new_shim();
let context = shim.context();
let ctx = context.clone();
let (sink, stream) = input.split();
Box::pin(
shim.complete(SinkStream::new(
@@ -168,7 +171,7 @@ where
})
.unwrap_or_else(|e| panic!(format!("{:?}", e.0)))
})
.buffer_unordered(core::usize::MAX),
.buffered_predicated(core::usize::MAX, move |i| ctx.predicate(i)),
)),
)
}
@@ -234,6 +237,7 @@ where
let (sink, stream) = input.split();
let (sender, receiver): (_, UnboundedReceiver<<Self as Format>::Representation>) =
unbounded();
let predicate_ctx = ctx.clone();
let receiver = receiver
.map(move |item: <Self as Format>::Representation| {
let ct = ctx.clone();
@@ -251,7 +255,9 @@ where
panic!(format!("{:?}", e))
})
})
.buffer_unordered(core::usize::MAX);
.buffered_predicated(core::usize::MAX, move |i| {
predicate_ctx.predicate(i.as_ref().unwrap_or_else(|_| panic!()))
});
spawn(
receiver
.forward(sink.sink_map_err(|e| panic!(format!("{}", e))))

0 comments on commit 8ecf1ea

Please sign in to comment.
You can’t perform that action at this time.