Skip to content
This repository has been archived by the owner on Jun 11, 2022. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
qnikst committed Feb 5, 2019
1 parent eb37673 commit e0e4d39
Showing 1 changed file with 99 additions and 38 deletions.
137 changes: 99 additions & 38 deletions network-ntt/src/client.rs
Expand Up @@ -84,11 +84,60 @@ impl<T> Stream for StreamOnce<T> {
}
}

pub struct PullBlocksToTip<T: Block> {
chan: Receiver<(T::Id,T::Date)>,
from: T::Id,
request: mpsc::UnboundedSender<Request<T>>,
}

impl<T:Block> Future for PullBlocksToTip<T> {
type Item = PullBlocksToTipStream<T>;
type Error = core_client::Error;

fn poll(&mut self) -> Poll<PullBlocksToTipStream<T>, Self::Error> {
match self.chan.poll() {
Ok(Async::Ready((tip,_date))) => {
let (sender, receiver) = mpsc::unbounded();
self.request.unbounded_send(Request::Block(sender, self.from, tip)).unwrap();
let stream = PullBlocksToTipStream {
channel: receiver,
};
Ok(Async::Ready(stream))

},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(core_client::Error::new(core_client::ErrorKind::Rpc, e)),
}
}
}

pub struct PullBlocksToTipStream<T> {
channel: mpsc::UnboundedReceiver<Result<T, core_client::Error>>
}

impl<T:Block> Stream for PullBlocksToTipStream<T> {
type Item = T;
type Error = core_client::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.channel.poll() {
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::Ready(Some(Ok(block)))) =>
Ok(Async::Ready(Some(block))),
Ok(Async::Ready(Some(Err(err)))) =>
Err(err),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) =>
Err(core_client::Error::new(core_client::ErrorKind::Rpc, "error reading from unbounded channel")),
}
}
}

impl<T: Block, Tx> BlockService<T> for ClientHandle<T, Tx> {
type TipFuture = Receiver<(T::Id, T::Date)>;

type PullBlocksToTipStream = StreamOnce<T>;
type PullBlocksToTipFuture = Receiver<Self::GetBlocksStream>;
type PullBlocksToTipStream = PullBlocksToTipStream<T>;
type PullBlocksToTipFuture = PullBlocksToTip<T>;
type GetBlocksStream = StreamOnce<T>;
type GetBlocksFuture = Receiver<StreamOnce<T>>;

Expand All @@ -98,14 +147,20 @@ impl<T: Block, Tx> BlockService<T> for ClientHandle<T, Tx> {
Receiver(sink)
}

fn pull_blocks_to_tip(&mut self, _from: &[T::Id]) -> Self::PullBlocksToTipFuture {
unimplemented!("not yet implemented for the network transport")
fn pull_blocks_to_tip(&mut self, from: &[T::Id]) -> Self::PullBlocksToTipFuture {
let (source, sink) = oneshot::channel();
self.channel.unbounded_send(Request::Tip(source)).unwrap();
PullBlocksToTip{
chan: Receiver(sink),
from: from[0].clone(),
request: self.channel.clone(),
}
}
}

pub enum Request<T: Block> {
Tip(oneshot::Sender<Result<(T::Id, T::Date), core_client::Error>>),
Block(oneshot::Sender<Result<T, core_client::Error>>, T::Id),
Block(mpsc::UnboundedSender<Result<T, core_client::Error>>, T::Id, T::Id),
}

struct ConnectionState<B: Block + HasHeader> {
Expand Down Expand Up @@ -196,46 +251,52 @@ where
.map(|x| (x, cc)),
))
}
Command::Message(message) => future::Either::A(future::Either::B(
sink.send(message).map_err(|_err| ()).map(|x| (x, cc)),
Command::Message(message) =>
future::Either::A(future::Either::B(
sink.send(message)
.map_err(|_err| ()).map(|x| (x, cc)),
)),
Command::BlockHeaders(lwid, resp) => {
let request = cc.requests.remove(&lwid);
match request {
Some(Request::Tip(chan)) => match resp {
Response::Ok(x) => {
let id = x.0[0].id();
let date = x.0[0].date();
chan.send(Ok((id, date))).unwrap()
}
Response::Err(x) => chan
.send(Err(core_client::Error::new(
future::Either::B(future::Either::A(
match request {
Some(Request::Tip(chan)) => match resp {
Response::Ok(x) => {
let id = x.0[0].id();
let date = x.0[0].date();
chan.send(Ok((id, date))).unwrap();
future::Either::A(future::ok((sink,cc)))
},
Response::Err(x) => {
chan.send(Err(core_client::Error::new(
core_client::ErrorKind::Rpc,
x,
))).unwrap();
future::Either::A(future::ok((sink,cc)))
},
},
Some(Request::Block(chan, _, _)) =>
future::Either::B(
chan.send(Err(core_client::Error::new(
core_client::ErrorKind::Rpc,
x,
)))
.unwrap(),
},
Some(Request::Block(chan, _)) => chan
.send(Err(core_client::Error::new(
core_client::ErrorKind::Rpc,
"unexpected reply".to_string(),
)))
.unwrap(),
None => (),
}
future::Either::B(future::Either::A(future::ok((sink, cc))))
"unexpected reply".to_string(),
))).and_then(|_| future::ok((sink,cc)))
),
None =>
future::Either::A(future::ok((sink, cc)))
}
))
}
Command::Blocks(lwid, resp) => {
let val = cc.requests.remove(&lwid);
match val {
Some(Request::Block(chan, _)) => match resp {
Response::Ok(x) => chan.send(Ok(x)).unwrap(),
Some(Request::Block(chan, _, _)) => match resp {
Response::Ok(x) => chan.send(Ok(x)),
Response::Err(x) => chan
.send(Err(core_client::Error::new(
core_client::ErrorKind::Rpc,
x,
)))
.unwrap(),
))),
},
Some(Request::Tip(chan)) => {
chan.send(Err(core_client::Error::new(
Expand Down Expand Up @@ -272,14 +333,14 @@ where
.and_then(|sink| future::ok((sink, cc)))
})
}
Request::Block(t, x) => {
let x1 = x.clone();
let x2 = x.clone();
cc.requests.insert(lwcid, Request::Block(t, x));
Request::Block(t, from, to) => {
let from1 = from.clone();
let to1 = to.clone();
cc.requests.insert(lwcid, Request::Block(t, from1, to1));
future::Either::B({
sink.send(Message::GetBlocks(
lwcid,
GetBlocks { from: x1, to: x2 },
GetBlocks { from, to },
))
.and_then(|sink| future::ok((sink, cc)))
})
Expand Down

0 comments on commit e0e4d39

Please sign in to comment.