Skip to content

Commit

Permalink
fix: Don't loop forever on shutdown of the multiplexed connection
Browse files Browse the repository at this point in the history
The async decoder would return unexpected end of input errors forever
instead of `None` to indicate that the stream was done. Since the
`MultiplexedConnection` would just drop these errors if there was no
in_flight requests, ending up in an infinite loop inside `poll_read` on
shutdown.

cc #488
  • Loading branch information
Markus Westerlind committed Jun 16, 2021
1 parent 0e08189 commit ddecce9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -30,7 +30,7 @@ url = "2.1"
# We need this for script support
sha1 = { version = ">= 0.2, < 0.7", optional = true }

combine = { version = "4.5", default-features = false, features = ["std"] }
combine = { version = "4.6", default-features = false, features = ["std"] }

# Only needed for AIO
bytes = { version = "1", optional = true }
Expand Down
42 changes: 29 additions & 13 deletions src/parser.rs
@@ -1,12 +1,9 @@
use std::{
io::{self, Read},
str,
};
use std::{io::Read, str};

use crate::types::{make_extension_error, ErrorKind, RedisError, RedisResult, Value};

use combine::{
any, choice, eof,
any,
error::StreamError,
opaque,
parser::{
Expand Down Expand Up @@ -59,8 +56,8 @@ where
I: RangeStream<Token = u8, Range = &'a [u8]>,
I::Error: combine::ParseError<u8, &'a [u8], I::Position>,
{
opaque!(any_send_sync_partial_state(choice((
any().then_partial(move |&mut b| {
opaque!(any_send_sync_partial_state(any().then_partial(
move |&mut b| {
let line = || {
recognize(take_until_bytes(&b"\r\n"[..]).with(take(2).map(|_| ()))).and_then(
|line: &[u8]| {
Expand Down Expand Up @@ -147,11 +144,8 @@ where
b'-' => error().map(Err),
b => combine::unexpected_any(combine::error::Token(b))
)
}),
eof().map(|_| Err(RedisError::from(io::Error::from(
io::ErrorKind::UnexpectedEof,
))))
))))
}
)))
}

#[cfg(feature = "aio")]
Expand All @@ -173,7 +167,7 @@ mod aio_support {
let buffer = &bytes[..];
let mut stream =
combine::easy::Stream(combine::stream::MaybePartialStream(buffer, !eof));
match combine::stream::decode(value(), &mut stream, &mut self.state) {
match combine::stream::decode_tokio(value(), &mut stream, &mut self.state) {
Ok(x) => x,
Err(err) => {
let err = err
Expand Down Expand Up @@ -210,6 +204,7 @@ mod aio_support {
type Error = RedisError;

fn decode(&mut self, bytes: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
dbg!("decode");
self.decode_stream(bytes, false)
}

Expand Down Expand Up @@ -307,3 +302,24 @@ pub fn parse_redis_value(bytes: &[u8]) -> RedisResult<Value> {
let mut parser = Parser::new();
parser.parse_value(bytes)
}

#[cfg(test)]
mod tests {
#[cfg(feature = "aio")]
use super::*;

#[cfg(feature = "aio")]
#[test]
fn decode_eof_returns_none_at_eof() {
use tokio_util::codec::Decoder;
let mut codec = ValueCodec::default();

let mut bytes = bytes::BytesMut::from(&b"+GET 123\r\n"[..]);
assert_eq!(
codec.decode_eof(&mut bytes),
Ok(Some(parse_redis_value(b"+GET 123\r\n").unwrap()))
);
assert_eq!(codec.decode_eof(&mut bytes), Ok(None));
assert_eq!(codec.decode_eof(&mut bytes), Ok(None));
}
}

0 comments on commit ddecce9

Please sign in to comment.