Skip to content

Commit

Permalink
Merge pull request #198 from tottoto/update-to-hyper-1
Browse files Browse the repository at this point in the history
Update to hyper 1
  • Loading branch information
lipanski committed Mar 23, 2024
2 parents 3ce41f0 + e8694ae commit f477e54
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 44 deletions.
9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ appveyor = { repository = "lipanski/mockito", branch = "master", service = "gith

[dependencies]
assert-json-diff = "2.0"
bytes = "1"
colored = { version = "2.0", optional = true }
futures-core = "0.3"
hyper = { version = "0.14", features = ["http1", "http2", "server", "stream"] }
http = "1"
http-body = "1"
http-body-util = "0.1"
hyper = "1"
hyper-util = { version = "0.1", features = ["server-auto", "tokio"] }
log = "0.4"
rand = "0.8"
regex = "1.7"
Expand All @@ -34,7 +39,7 @@ tokio = { version = "1.25", features = ["net", "parking_lot", "rt", "sync"] }
env_logger = "0.8"
testing_logger = "0.1"
futures = { version = "0.3", default-features = false, features = ["alloc", "async-await"] }
reqwest = "0.11"
reqwest = { version = "0.12", default-features = false, features = ["http2"] }
tokio = { version = "1.25", features = ["macros", "rt-multi-thread"] }

[features]
Expand Down
2 changes: 1 addition & 1 deletion src/matcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use assert_json_diff::{assert_json_matches_no_panic, CompareMode};
use hyper::header::HeaderValue;
use http::header::HeaderValue;
use regex::Regex;
use std::collections::HashMap;
use std::convert::From;
Expand Down
13 changes: 7 additions & 6 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use crate::server::RemoteMock;
use crate::server::State;
use crate::Request;
use crate::{Error, ErrorKind};
use hyper::header::HeaderName;
use hyper::HeaderMap;
use hyper::StatusCode;
use bytes::Bytes;
use http::{HeaderMap, HeaderName, StatusCode};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::convert::Into;
Expand Down Expand Up @@ -356,7 +355,7 @@ impl Mock {
/// ```
///
pub fn with_body<StrOrBytes: AsRef<[u8]>>(mut self, body: StrOrBytes) -> Self {
self.inner.response.body = Body::Bytes(body.as_ref().to_owned());
self.inner.response.body = Body::Bytes(Bytes::from(body.as_ref().to_owned()));
self
}

Expand Down Expand Up @@ -425,7 +424,8 @@ impl Mock {
mut self,
callback: impl Fn(&Request) -> Vec<u8> + Send + Sync + 'static,
) -> Self {
self.inner.response.body = Body::FnWithRequest(Arc::new(callback));
self.inner.response.body =
Body::FnWithRequest(Arc::new(move |req| Bytes::from(callback(req))));
self
}

Expand All @@ -446,7 +446,8 @@ impl Mock {
self.inner.response.body = Body::Bytes(
std::fs::read(path)
.map_err(|_| Error::new(ErrorKind::FileNotFound))
.unwrap(),
.unwrap()
.into(),
);
self
}
Expand Down
25 changes: 13 additions & 12 deletions src/request.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
use crate::{Error, ErrorKind};
use hyper::body;
use hyper::body::Buf;
use hyper::header::AsHeaderName;
use hyper::header::HeaderValue;
use hyper::Body as HyperBody;
use hyper::Request as HyperRequest;
use http::header::{AsHeaderName, HeaderValue};
use http::Request as HttpRequest;
use http_body_util::BodyExt;
use hyper::body::Incoming;

///
/// Stores a HTTP request
///
#[derive(Debug)]
pub struct Request {
inner: HyperRequest<HyperBody>,
inner: HttpRequest<Incoming>,
body: Option<Vec<u8>>,
}

impl Request {
pub(crate) fn new(request: HyperRequest<HyperBody>) -> Self {
pub(crate) fn new(request: HttpRequest<Incoming>) -> Self {
Request {
inner: request,
body: None,
Expand Down Expand Up @@ -64,12 +62,15 @@ impl Request {
pub(crate) async fn read_body(&mut self) -> &Vec<u8> {
if self.body.is_none() {
let raw_body = self.inner.body_mut();
let mut buf = body::aggregate(raw_body)

let bytes = raw_body
.collect()
.await
.map_err(|err| Error::new_with_context(ErrorKind::RequestBodyFailure, err))
.unwrap();
let bytes = buf.copy_to_bytes(buf.remaining()).to_vec();
self.body = Some(bytes);
.unwrap()
.to_bytes();

self.body = Some(bytes.to_vec());
}

self.body.as_ref().unwrap()
Expand Down
19 changes: 12 additions & 7 deletions src/response.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::error::Error;
use crate::Request;
use bytes::Bytes;
use futures_core::stream::Stream;
use hyper::HeaderMap;
use hyper::StatusCode;
use http::{HeaderMap, StatusCode};
use http_body::Frame;
use std::fmt;
use std::io;
use std::sync::Arc;
Expand All @@ -18,11 +19,11 @@ pub(crate) struct Response {
}

type BodyFnWithWriter = dyn Fn(&mut dyn io::Write) -> io::Result<()> + Send + Sync + 'static;
type BodyFnWithRequest = dyn Fn(&Request) -> Vec<u8> + Send + Sync + 'static;
type BodyFnWithRequest = dyn Fn(&Request) -> Bytes + Send + Sync + 'static;

#[derive(Clone)]
pub(crate) enum Body {
Bytes(Vec<u8>),
Bytes(Bytes),
FnWithWriter(Arc<BodyFnWithWriter>),
FnWithRequest(Arc<BodyFnWithRequest>),
}
Expand Down Expand Up @@ -61,7 +62,7 @@ impl Default for Response {
Self {
status: StatusCode::OK,
headers,
body: Body::Bytes(Vec::new()),
body: Body::Bytes(Bytes::new()),
}
}
}
Expand Down Expand Up @@ -116,15 +117,19 @@ impl Drop for ChunkedStream {
}

impl Stream for ChunkedStream {
type Item = io::Result<Box<[u8]>>;
type Item = io::Result<Frame<Bytes>>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
self.receiver
.as_mut()
.map(move |r| r.poll_recv(cx))
.map(move |receiver| {
receiver.poll_recv(cx).map(|received| {
received.map(|result| result.map(|data| Frame::data(Bytes::from(data))))
})
})
.unwrap_or(Poll::Ready(None))
}
}
53 changes: 37 additions & 16 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ use crate::request::Request;
use crate::response::{Body as ResponseBody, ChunkedStream};
use crate::ServerGuard;
use crate::{Error, ErrorKind, Matcher, Mock};
use hyper::server::conn::Http;
use http::{Request as HttpRequest, Response, StatusCode};
use http_body_util::{BodyExt, Empty, Full, StreamBody};
use hyper::body::Incoming;
use hyper::service::service_fn;
use hyper::{Body, Request as HyperRequest, Response, StatusCode};
use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server::conn::auto::Builder as ConnectionBuilder;
use std::default::Default;
use std::error::Error as StdError;
use std::fmt;
use std::net::{IpAddr, SocketAddr};
use std::ops::Drop;
Expand Down Expand Up @@ -352,10 +356,10 @@ impl Server {
let mutex = state.clone();

spawn_local(async move {
let _ = Http::new()
let _ = ConnectionBuilder::new(TokioExecutor::new())
.serve_connection(
stream,
service_fn(move |request: HyperRequest<Body>| {
TokioIo::new(stream),
service_fn(move |request: HttpRequest<Incoming>| {
handle_request(request, mutex.clone())
}),
)
Expand Down Expand Up @@ -441,10 +445,27 @@ impl fmt::Display for Server {
}
}

type BoxError = Box<dyn StdError + Send + Sync>;
type BoxBody = http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, BoxError>;

trait IntoBoxBody {
fn into_box_body(self) -> BoxBody;
}

impl<B> IntoBoxBody for B
where
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
{
fn into_box_body(self) -> BoxBody {
self.map_err(Into::into).boxed_unsync()
}
}

async fn handle_request(
hyper_request: HyperRequest<Body>,
hyper_request: HttpRequest<Incoming>,
state: Arc<RwLock<State>>,
) -> Result<Response<Body>, Error> {
) -> Result<Response<BoxBody>, Error> {
let mut request = Request::new(hyper_request);
request.read_body().await;
log::debug!("Request received: {}", request.formatted());
Expand Down Expand Up @@ -477,7 +498,7 @@ async fn handle_request(
}
}

fn respond_with_mock(request: Request, mock: &RemoteMock) -> Result<Response<Body>, Error> {
fn respond_with_mock(request: Request, mock: &RemoteMock) -> Result<Response<BoxBody>, Error> {
let status: StatusCode = mock.inner.response.status;
let mut response = Response::builder().status(status);

Expand All @@ -491,32 +512,32 @@ fn respond_with_mock(request: Request, mock: &RemoteMock) -> Result<Response<Bod
if !request.has_header("content-length") {
response = response.header("content-length", bytes.len());
}
Body::from(bytes.clone())
Full::new(bytes.to_owned()).into_box_body()
}
ResponseBody::FnWithWriter(body_fn) => {
let stream = ChunkedStream::new(Arc::clone(body_fn))?;
Body::wrap_stream(stream)
StreamBody::new(stream).into_box_body()
}
ResponseBody::FnWithRequest(body_fn) => {
let bytes = body_fn(&request);
Body::from(bytes)
Full::new(bytes.to_owned()).into_box_body()
}
}
} else {
Body::empty()
Empty::new().into_box_body()
};

let response: Response<Body> = response
let response: Response<BoxBody> = response
.body(body)
.map_err(|err| Error::new_with_context(ErrorKind::ResponseFailure, err))?;

Ok(response)
}

fn respond_with_mock_not_found() -> Result<Response<Body>, Error> {
let response: Response<Body> = Response::builder()
fn respond_with_mock_not_found() -> Result<Response<BoxBody>, Error> {
let response: Response<BoxBody> = Response::builder()
.status(StatusCode::NOT_IMPLEMENTED)
.body(Body::empty())
.body(Empty::new().into_box_body())
.map_err(|err| Error::new_with_context(ErrorKind::ResponseFailure, err))?;

Ok(response)
Expand Down

0 comments on commit f477e54

Please sign in to comment.