Skip to content
Permalink
Browse files

working encode buffering

  • Loading branch information
syntacticsugarglider committed Nov 11, 2019
1 parent beea14b commit ff54362911c64d72a458cd6a0643c5ae99910001
Showing with 60 additions and 37 deletions.
  1. +5 −6 .vscode/launch.json
  2. +10 −4 examples/function_args.rs
  3. +12 −1 src/channel/id_channel/context.rs
  4. +4 −4 src/channel/id_channel/mod.rs
  5. +29 −22 src/format/mod.rs
@@ -44,20 +44,19 @@
{
"type": "lldb",
"request": "launch",
"name": "Debug executable 'native_core_test'",
"name": "Debug executable 'function_args'",
"cargo": {
"args": [
"build",
"--bin=native_core_test",
"--package=native_core_test",
"--example=function_args",
],
"filter": {
"name": "native_core_test",
"kind": "bin"
"name": "function_args",
"kind": "example"
}
},
"args": [],
"cwd": "${workspaceFolder}/examples/native_core_test"
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
@@ -2,19 +2,25 @@ use vessels::{
channel::IdChannel,
core,
core::{executor::Spawn, Executor},
format::{ApplyDecode, ApplyEncode, Cbor},
format::{ApplyDecode, ApplyEncode, Json},
kind::Future,
log, OnTo,
};

type Call = Box<dyn Fn(Vec<u8>) -> Future<String> + Send + Sync>;

use futures::StreamExt;

fn main() {
let call: Call = Box::new(|data| Box::pin(async move { format!("{:?}", data) }));

core::<dyn Executor>().unwrap().run(async move {
let encoded = call.on_to::<IdChannel>().await.encode::<Cbor>();
let decoded: Call = encoded.decode::<IdChannel, Cbor>().await.unwrap();
log!("{}", (decoded)(vec![2, 3]).await);
let encoded = call
.on_to::<IdChannel>()
.await
.encode::<Json>()
.inspect(|item| println!("{}", item));
let decoded: Call = encoded.decode::<IdChannel, Json>().await.unwrap();
log!("{}", (decoded)(vec![5, 6, 7]).await);
});
}
@@ -90,6 +90,17 @@ impl Context {
}
}

pub(crate) fn new_shim() -> Self {
Context {
state: Arc::new(RwLock::new(ContextState {
channel_types: HashMap::new(),
next_index: ForkHandle(1),
unused_indices: vec![],
})),
tasks: Arc::new(Mutex::new(HashMap::new())),
}
}

pub(crate) fn create<K: Kind>(&self) -> ForkHandle {
let mut state = self.state.write().unwrap();
let tasks = self.tasks.lock().unwrap();
@@ -99,7 +110,7 @@ impl Context {
id
} else {
let id = state.next_index;
state.next_index = ForkHandle(state.next_index.0 + 1);
state.next_index = ForkHandle(state.next_index.0 + 2);
state.channel_types.insert(id, d);
id
};
@@ -290,7 +290,7 @@ impl<'a, K: Kind> Target<'a, K> for IdChannel {

fn new_shim() -> Self::Shim {
REGISTRY.add_construct::<K>();
let context = Context::new();
let context = Context::new_shim();
context.add::<K>(ForkHandle(0));
Shim {
context,
@@ -379,8 +379,10 @@ impl<
let (oi, receiver): (UnboundedSender<O>, UnboundedReceiver<O>) = unbounded();
let mut in_channels = HashMap::new();
REGISTRY.add_deconstruct::<K>();
let context = Context::new();
let handle = context.create::<K>();
in_channels.insert(
ForkHandle(0),
handle,
Box::pin(
sender
.sink_map_err(|e| panic!(e))
@@ -392,8 +394,6 @@ impl<
}),
) as Pin<Box<dyn Sink<Box<dyn SerdeAny>, Error = ()> + Send>>,
);
let context = Context::new();
let handle = context.create::<K>();
let ct = context.clone();
let (csender, creceiver) = unbounded();
let channel = IdChannel {
@@ -15,6 +15,7 @@ pub mod bincode;
pub use bincode::Bincode;

use futures::{
channel::mpsc::{unbounded, UnboundedReceiver},
future::{ok, BoxFuture},
stream::BoxStream,
task::Context as FContext,
@@ -23,6 +24,8 @@ use futures::{

use crate::{
channel::{Context, Shim, Target, Waiter},
core,
core::{executor::Spawn, Executor},
Kind,
};

@@ -295,30 +298,34 @@ where
fn encode(input: C) -> Self::Output {
let ctx = input.context();
let (sink, stream) = input.split();
let (sender, receiver): (_, UnboundedReceiver<<Self as Format>::Representation>) =
unbounded();
let receiver = receiver
.map(move |item: <Self as Format>::Representation| {
let ct = ctx.clone();
Self::deserialize(item.clone(), ctx.clone()).or_else(move |e| {
let context = ct.clone();
let message = format!("{}", e);
let mut data = message.split(" ");
if data.next() == Some("ASYNC_WAIT") {
if let Some(data) = data.next() {
return context
.wait_for(data.to_owned())
.then(move |_| Self::deserialize(item, context.clone()));
}
}
panic!(format!("{:?}", e))
})
})
.buffer_unordered(std::usize::MAX);
core::<dyn Executor>().unwrap().spawn(
receiver
.forward(sink.sink_map_err(|_| panic!()))
.unwrap_or_else(|_| panic!()),
);
StreamSink(
Box::pin(stream.map(<Self as Format>::serialize)),
Box::pin(
sink.sink_map_err(EncodeError::from_sink_error)
.with_flat_map(move |item: <Self as Format>::Representation| {
let ctx = ctx.clone();
Self::deserialize(item.clone(), ctx.clone())
.or_else(move |e| {
let context = ctx.clone();
let message = format!("{}", e);
let mut data = message.split(" ");
if data.next() == Some("ASYNC_WAIT") {
if let Some(data) = data.next() {
return context.wait_for(data.to_owned()).then(move |_| {
Self::deserialize(item, context.clone())
});
}
}
panic!(format!("{}", e))
})
.map_err(EncodeError::from_format_error)
.into_stream()
}),
),
Box::pin(sender.sink_map_err(|_| panic!())),
)
}
}

0 comments on commit ff54362

Please sign in to comment.
You can’t perform that action at this time.