Skip to content

Commit

Permalink
Fix timeout behaviour, add a test
Browse files Browse the repository at this point in the history
As described in issue #22, minreq does not handle timeouts as it implies
in the documentation. This commit fixes it by setting the right timeout*
for each byte read, if a timeout is specified.

* To be specific, it sets each read's timeout to:
  `initial_request_time + timeout - current_time`

Note: this commit might cause some slowdown, as set_read_timeout is
called for each byte read. This only applies to requests with a timeout,
however.
  • Loading branch information
neonmoe committed Jan 11, 2020
1 parent 267cf39 commit 41ef16e
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 43 deletions.
82 changes: 43 additions & 39 deletions src/connection.rs
Expand Up @@ -3,10 +3,10 @@ use crate::{Error, Method, Request, ResponseLazy};
use rustls::{self, ClientConfig, ClientSession, StreamOwned};
use std::env;
use std::io::{self, BufReader, BufWriter, Read, Write};
use std::net::{TcpStream, ToSocketAddrs};
use std::net::TcpStream;
#[cfg(feature = "https")]
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
#[cfg(feature = "https")]
use webpki::DNSNameRef;
#[cfg(feature = "https")]
Expand All @@ -28,28 +28,49 @@ type UnsecuredStream = BufReader<TcpStream>;
type SecuredStream = StreamOwned<ClientSession, TcpStream>;

pub(crate) enum HttpStream {
Unsecured(UnsecuredStream),
Unsecured(UnsecuredStream, Option<Instant>),
#[cfg(feature = "https")]
Secured(Box<SecuredStream>),
Secured(Box<SecuredStream>, Option<Instant>),
}

impl HttpStream {
fn create_unsecured(reader: UnsecuredStream) -> HttpStream {
HttpStream::Unsecured(reader)
fn create_unsecured(reader: UnsecuredStream, timeout_at: Option<Instant>) -> HttpStream {
HttpStream::Unsecured(reader, timeout_at)
}

#[cfg(feature = "https")]
fn create_secured(reader: SecuredStream) -> HttpStream {
HttpStream::Secured(Box::new(reader))
fn create_secured(reader: SecuredStream, timeout_at: Option<Instant>) -> HttpStream {
HttpStream::Secured(Box::new(reader), timeout_at)
}
}

impl Read for HttpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let timeout = |tcp: &TcpStream, timeout_at: Option<Instant>| {
if let Some(timeout_at) = timeout_at {
let now = Instant::now();
if timeout_at <= now {
return Err(io::Error::new(
io::ErrorKind::TimedOut,
"The request's timeout was reached.",
));
} else {
tcp.set_read_timeout(Some(timeout_at - now)).ok();
}
}
Ok(())
};

match self {
HttpStream::Unsecured(inner) => inner.read(buf),
HttpStream::Unsecured(inner, timeout_at) => {
timeout(inner.get_ref(), *timeout_at)?;
inner.read(buf)
}
#[cfg(feature = "https")]
HttpStream::Secured(inner) => inner.read(buf),
HttpStream::Secured(inner, timeout_at) => {
timeout(inner.get_ref(), *timeout_at)?;
inner.read(buf)
}
}
}
}
Expand Down Expand Up @@ -81,26 +102,24 @@ impl Connection {
pub(crate) fn send_https(mut self) -> Result<ResponseLazy, Error> {
self.request.host = ensure_ascii_host(self.request.host)?;
let bytes = self.request.as_bytes();
let timeout_duration = self.timeout.map(|d| Duration::from_secs(d));
let timeout_at = timeout_duration.map(|d| Instant::now() + d);

// Rustls setup
let dns_name = &self.request.host;
let dns_name = dns_name.split(':').next().unwrap();
let dns_name = DNSNameRef::try_from_ascii_str(dns_name).unwrap();
let sess = ClientSession::new(&CONFIG, dns_name);

let tcp = match create_tcp_stream(&self.request.host, self.timeout) {
Ok(tcp) => tcp,
Err(err) => return Err(Error::IoError(err)),
};
let tcp = TcpStream::connect(&self.request.host)?;

// Send request
let mut tls = StreamOwned::new(sess, tcp);
if let Err(err) = tls.write(&bytes) {
return Err(Error::IoError(err));
}
tls.get_ref().set_write_timeout(timeout_duration).ok();
tls.write(&bytes)?;

// Receive request
let response = ResponseLazy::from_stream(HttpStream::create_secured(tls))?;
let response = ResponseLazy::from_stream(HttpStream::create_secured(tls, timeout_at))?;
handle_redirects(self, response)
}

Expand All @@ -109,17 +128,15 @@ impl Connection {
pub(crate) fn send(mut self) -> Result<ResponseLazy, Error> {
self.request.host = ensure_ascii_host(self.request.host)?;
let bytes = self.request.as_bytes();
let timeout_duration = self.timeout.map(|d| Duration::from_secs(d));
let timeout_at = timeout_duration.map(|d| Instant::now() + d);

let tcp = match create_tcp_stream(&self.request.host, self.timeout) {
Ok(tcp) => tcp,
Err(err) => return Err(Error::IoError(err)),
};
let tcp = TcpStream::connect(&self.request.host)?;

// Send request
let mut stream = BufWriter::new(tcp);
if let Err(err) = stream.write_all(&bytes) {
return Err(Error::IoError(err));
}
stream.get_ref().set_write_timeout(timeout_duration).ok();
stream.write_all(&bytes)?;

// Receive response
let tcp = match stream.into_inner() {
Expand All @@ -130,7 +147,7 @@ impl Connection {
));
}
};
let stream = HttpStream::create_unsecured(BufReader::new(tcp));
let stream = HttpStream::create_unsecured(BufReader::new(tcp), timeout_at);
let response = ResponseLazy::from_stream(stream)?;
handle_redirects(self, response)
}
Expand Down Expand Up @@ -179,19 +196,6 @@ fn get_redirect(
}
}

fn create_tcp_stream<A>(host: A, timeout: Option<u64>) -> Result<TcpStream, std::io::Error>
where
A: ToSocketAddrs,
{
let stream = TcpStream::connect(host)?;
if let Some(secs) = timeout {
let dur = Some(Duration::from_secs(secs));
stream.set_read_timeout(dur)?;
stream.set_write_timeout(dur)?;
}
Ok(stream)
}

fn ensure_ascii_host(host: String) -> Result<String, Error> {
if host.is_ascii() {
Ok(host)
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Expand Up @@ -86,3 +86,9 @@ impl error::Error for Error {
}
}
}

impl From<io::Error> for Error {
fn from(other: io::Error) -> Error {
Error::IoError(other)
}
}
2 changes: 1 addition & 1 deletion src/request.rs
Expand Up @@ -134,7 +134,7 @@ impl Request {
}
}

/// Sets the request timeout.
/// Sets the request timeout in seconds.
pub fn with_timeout(mut self, timeout: u64) -> Request {
self.timeout = Some(timeout);
self
Expand Down
16 changes: 13 additions & 3 deletions tests/main.rs
Expand Up @@ -42,15 +42,25 @@ fn test_json_using_serde() {
}

#[test]
fn test_latency() {
fn test_timeout_too_low() {
setup();
let result = minreq::get(url("/slow_a"))
.with_body("Q".to_string())
.with_timeout(1)
.send();
assert!(result.is_err());
}

#[test]
fn test_timeout_high_enough() {
setup();
let body = get_body(
minreq::get(url("/slow_a"))
.with_body("Q".to_string())
.with_timeout(1)
.with_timeout(3)
.send(),
);
assert_ne!(body, "j: Q");
assert_eq!(body, "j: Q");
}

#[test]
Expand Down

0 comments on commit 41ef16e

Please sign in to comment.