diff --git a/Cargo.toml b/Cargo.toml index be3c81b70..71239babe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ url = "2.1" # We need this for script support sha1 = { version = ">= 0.2, < 0.7", optional = true } -combine = { version = "4.5", default-features = false, features = ["std"] } +combine = { version = "4.6", default-features = false, features = ["std"] } # Only needed for AIO bytes = { version = "1", optional = true } diff --git a/src/parser.rs b/src/parser.rs index 17e3fc708..d715528a0 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -1,12 +1,9 @@ -use std::{ - io::{self, Read}, - str, -}; +use std::{io::Read, str}; use crate::types::{make_extension_error, ErrorKind, RedisError, RedisResult, Value}; use combine::{ - any, choice, eof, + any, error::StreamError, opaque, parser::{ @@ -59,8 +56,8 @@ where I: RangeStream, I::Error: combine::ParseError, { - opaque!(any_send_sync_partial_state(choice(( - any().then_partial(move |&mut b| { + opaque!(any_send_sync_partial_state(any().then_partial( + move |&mut b| { let line = || { recognize(take_until_bytes(&b"\r\n"[..]).with(take(2).map(|_| ()))).and_then( |line: &[u8]| { @@ -147,11 +144,8 @@ where b'-' => error().map(Err), b => combine::unexpected_any(combine::error::Token(b)) ) - }), - eof().map(|_| Err(RedisError::from(io::Error::from( - io::ErrorKind::UnexpectedEof, - )))) - )))) + } + ))) } #[cfg(feature = "aio")] @@ -173,7 +167,7 @@ mod aio_support { let buffer = &bytes[..]; let mut stream = combine::easy::Stream(combine::stream::MaybePartialStream(buffer, !eof)); - match combine::stream::decode(value(), &mut stream, &mut self.state) { + match combine::stream::decode_tokio(value(), &mut stream, &mut self.state) { Ok(x) => x, Err(err) => { let err = err @@ -210,6 +204,7 @@ mod aio_support { type Error = RedisError; fn decode(&mut self, bytes: &mut BytesMut) -> Result, Self::Error> { + dbg!("decode"); self.decode_stream(bytes, false) } @@ -307,3 +302,24 @@ pub fn parse_redis_value(bytes: &[u8]) -> RedisResult { let mut parser = Parser::new(); parser.parse_value(bytes) } + +#[cfg(test)] +mod tests { + #[cfg(feature = "aio")] + use super::*; + + #[cfg(feature = "aio")] + #[test] + fn decode_eof_returns_none_at_eof() { + use tokio_util::codec::Decoder; + let mut codec = ValueCodec::default(); + + let mut bytes = bytes::BytesMut::from(&b"+GET 123\r\n"[..]); + assert_eq!( + codec.decode_eof(&mut bytes), + Ok(Some(parse_redis_value(b"+GET 123\r\n").unwrap())) + ); + assert_eq!(codec.decode_eof(&mut bytes), Ok(None)); + assert_eq!(codec.decode_eof(&mut bytes), Ok(None)); + } +}