Skip to content
This repository has been archived by the owner on May 6, 2018. It is now read-only.

Commit

Permalink
Remove generic Encoder/Decoder error
Browse files Browse the repository at this point in the history
We'll consider adding this later in a future PR
  • Loading branch information
alexcrichton committed Feb 28, 2017
1 parent e73d2c0 commit cddbdb3
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 43 deletions.
21 changes: 10 additions & 11 deletions src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ pub struct Framed<T, U> {

pub struct Fuse<T, U>(pub T, pub U);

pub fn framed<T, U>(inner: T, codec: U) -> Framed<T, U> {
pub fn framed<T, U>(inner: T, codec: U) -> Framed<T, U>
where T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
{
Framed {
inner: framed_read2(framed_write2(Fuse(inner, codec))),
}
Expand All @@ -26,10 +29,9 @@ pub fn framed<T, U>(inner: T, codec: U) -> Framed<T, U> {
impl<T, U> Stream for Framed<T, U>
where T: AsyncRead,
U: Decoder,
U::Error: From<io::Error>
{
type Item = U::Item;
type Error = U::Error;
type Error = io::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.poll()
Expand All @@ -39,10 +41,9 @@ impl<T, U> Stream for Framed<T, U>
impl<T, U> Sink for Framed<T, U>
where T: AsyncWrite,
U: Encoder,
U::Error: From<io::Error>
{
type SinkItem = U::Item;
type SinkError = U::Error;
type SinkError = io::Error;

fn start_send(&mut self,
item: Self::SinkItem)
Expand Down Expand Up @@ -85,22 +86,20 @@ impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> {

impl<T, U: Decoder> Decoder for Fuse<T, U> {
type Item = U::Item;
type Error = U::Error;

fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
fn decode(&mut self, buffer: &mut BytesMut) -> io::Result<Option<Self::Item>> {
self.1.decode(buffer)
}

fn eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
self.1.eof(buffer)
fn decode_eof(&mut self, buffer: &mut BytesMut) -> io::Result<Self::Item> {
self.1.decode_eof(buffer)
}
}

impl<T, U: Encoder> Encoder for Fuse<T, U> {
type Item = U::Item;
type Error = U::Error;

fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> io::Result<()> {
self.1.encode(item, dst)
}
}
33 changes: 16 additions & 17 deletions src/framed_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,24 @@ pub trait Decoder {
/// The type of decoded frames.
type Item;

/// The type of fatal decoding errors.
///
/// Non-fatal errors should be encoded in Item values.
type Error;

/// Attempts to decode a message from the provided buffer of bytes.
///
/// This method is called by `FramedRead` whenever new data becomes
/// available. If a complete message is available, its constituent bytes
/// should be consumed (for example, with `BytesMut::drain_to`) and
/// Ok(Some(message)) returned.
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>>;

/// A method that can optionally be overridden to handle EOF specially.
///
/// This method will never be provided with bytes that have not previously
/// been provided to `decode`.
#[allow(unused_variables)]
fn eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Ok(None)
fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Self::Item> {
match try!(self.decode(buf)) {
Some(frame) => Ok(frame),
None => Err(io::Error::new(io::ErrorKind::Other,
"bytes remaining on stream")),
}
}
}

Expand All @@ -59,7 +57,10 @@ const INITIAL_CAPACITY: usize = 8 * 1024;

// ===== impl FramedRead =====

impl<T, D> FramedRead<T, D> {
impl<T, D> FramedRead<T, D>
where T: AsyncRead,
D: Decoder,
{
/// Creates a new `FramedRead` with the given `decoder`.
pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
FramedRead {
Expand Down Expand Up @@ -110,12 +111,11 @@ impl<T, D> FramedRead<T, D> {
impl<T, D> Stream for FramedRead<T, D>
where T: AsyncRead,
D: Decoder,
D::Error: From<io::Error>
{
type Item = D::Item;
type Error = D::Error;
type Error = io::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
self.inner.poll()
}
}
Expand Down Expand Up @@ -157,10 +157,9 @@ impl<T> FramedRead2<T> {

impl<T> Stream for FramedRead2<T>
where T: AsyncRead + Decoder,
T::Error: From<io::Error>
{
type Item = T::Item;
type Error = T::Error;
type Error = io::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
Expand All @@ -172,8 +171,8 @@ impl<T> Stream for FramedRead2<T>
if self.buffer.is_empty() {
return Ok(None.into())
} else {
let frame = try!(self.inner.eof(&mut self.buffer));
return Ok(Async::Ready(frame));
let frame = try!(self.inner.decode_eof(&mut self.buffer));
return Ok(Async::Ready(Some(frame)));
}
}

Expand Down
28 changes: 13 additions & 15 deletions src/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ pub trait Encoder {
/// The type of items consumed by the `Encoder`
type Item;

/// Encoding error
type Error;

/// Encode a complete Item into a buffer
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut)
-> io::Result<()>;
}

/// A `Sink` of frames encoded to an `AsyncWrite`.
Expand All @@ -41,7 +39,10 @@ pub struct FramedWrite2<T> {
const INITIAL_CAPACITY: usize = 8 * 1024;
const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;

impl<T, E> FramedWrite<T, E> {
impl<T, E> FramedWrite<T, E>
where T: AsyncWrite,
E: Encoder,
{
/// Creates a new `FramedWrite` with the given `encoder`.
pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
FramedWrite {
Expand Down Expand Up @@ -92,12 +93,11 @@ impl<T, E> FramedWrite<T, E> {
impl<T, E> Sink for FramedWrite<T, E>
where T: AsyncWrite,
E: Encoder,
E::Error: From<io::Error>,
{
type SinkItem = E::Item;
type SinkError = E::Error;
type SinkError = io::Error;

fn start_send(&mut self, item: E::Item) -> StartSend<E::Item, E::Error> {
fn start_send(&mut self, item: E::Item) -> StartSend<E::Item, io::Error> {
self.inner.start_send(item)
}

Expand Down Expand Up @@ -128,12 +128,11 @@ pub fn framed_write2<T>(inner: T) -> FramedWrite2<T> {

impl<T> Sink for FramedWrite2<T>
where T: AsyncWrite + Encoder,
T::Error: From<io::Error>,
{
type SinkItem = T::Item;
type SinkError = T::Error;
type SinkError = io::Error;

fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, T::Error> {
fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, io::Error> {
// If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's
// *still* over 8KiB, then apply backpressure (reject the send).
if self.buffer.len() >= BACKPRESSURE_BOUNDARY {
Expand Down Expand Up @@ -177,14 +176,13 @@ impl<T> Sink for FramedWrite2<T>

impl<T: Decoder> Decoder for FramedWrite2<T> {
type Item = T::Item;
type Error = T::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> {
fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<T::Item>> {
self.inner.decode(src)
}

fn eof(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> {
self.inner.eof(src)
fn decode_eof(&mut self, src: &mut BytesMut) -> io::Result<T::Item> {
self.inner.decode_eof(src)
}
}

Expand Down

0 comments on commit cddbdb3

Please sign in to comment.