From fbce21d70c3c5f74110d294640ec5822faea68d0 Mon Sep 17 00:00:00 2001 From: Xinzhao Xu Date: Thu, 16 May 2024 09:57:35 +0800 Subject: [PATCH] Refactor body-related method of the Response --- src/response.rs | 60 +++++++++++++++++++++++----------------- tests/program/src/lib.rs | 25 +++++++++++++---- 2 files changed, 53 insertions(+), 32 deletions(-) diff --git a/src/response.rs b/src/response.rs index a8c302f..cba17ce 100644 --- a/src/response.rs +++ b/src/response.rs @@ -2,13 +2,15 @@ use anyhow::{anyhow, Result}; #[cfg(feature = "json")] use serde::de::DeserializeOwned; use std::collections::HashMap; -use wasi::http::types::{IncomingResponse, StatusCode}; -use wasi::io::streams::StreamError; +use wasi::http::types::{IncomingBody, IncomingResponse, StatusCode}; +use wasi::io::streams::{InputStream, StreamError}; pub struct Response { status: StatusCode, headers: HashMap, - body: Vec, + // input-stream resource is a child: it must be dropped before the parent incoming-body is dropped + input_stream: InputStream, + _incoming_body: IncomingBody, } impl Response { @@ -22,42 +24,48 @@ impl Response { } drop(headers_handle); - let incoming_body = incoming_response - .consume() - .map_err(|()| anyhow!("incoming response has no body stream"))?; + // The consume() method can only be called once + let incoming_body = incoming_response.consume().unwrap(); drop(incoming_response); + // The stream() method can only be called once let input_stream = incoming_body.stream().unwrap(); - let mut body = vec![]; - loop { - let mut body_chunk = match input_stream.read(1024 * 1024) { - Ok(c) => c, - Err(StreamError::Closed) => break, - Err(e) => Err(anyhow!("input_stream read failed: {e:?}"))?, - }; - - if !body_chunk.is_empty() { - body.append(&mut body_chunk); - } - } - Ok(Self { status, headers, - body, + input_stream, + _incoming_body: incoming_body, }) } - pub fn status(&self) -> &StatusCode { - &self.status + pub fn status(&self) -> StatusCode { + self.status } pub fn headers(&self) -> &HashMap { &self.headers } - pub fn body(&self) -> &Vec { - &self.body + /// Get a chunk of the response body. + /// + /// It will block until at least one byte can be read or the stream is closed. + pub fn chunk(&self, len: u64) -> Result>> { + match self.input_stream.blocking_read(len) { + Ok(c) => Ok(Some(c)), + Err(StreamError::Closed) => Ok(None), + Err(e) => Err(anyhow!("input_stream read failed: {e:?}"))?, + } + } + + /// Get the full response body. + /// + /// It will block until the stream is closed. + pub fn body(self) -> Result> { + let mut body = Vec::new(); + while let Some(mut chunk) = self.chunk(1024 * 1024)? { + body.append(&mut chunk); + } + Ok(body) } /// Deserialize the response body as JSON. @@ -67,7 +75,7 @@ impl Response { /// This requires the `json` feature enabled. #[cfg(feature = "json")] #[cfg_attr(docsrs, doc(cfg(feature = "json")))] - pub fn json(&self) -> Result { - Ok(serde_json::from_slice(&self.body)?) + pub fn json(self) -> Result { + Ok(serde_json::from_slice(self.body()?.as_ref())?) } } diff --git a/tests/program/src/lib.rs b/tests/program/src/lib.rs index 3fad9c7..728c436 100644 --- a/tests/program/src/lib.rs +++ b/tests/program/src/lib.rs @@ -27,17 +27,30 @@ impl Guest for Component { println!( "GET https://httpbin.org/get, status code: {}, body:\n{}", resp.status(), - String::from_utf8_lossy(resp.body()) + String::from_utf8(resp.body().unwrap()).unwrap() ); // get with json response let resp = Client::new().get("https://httpbin.org/get").send().unwrap(); + let status = resp.status(); let json_data = resp.json::().unwrap(); println!( "GET https://httpbin.org/get, status code: {}, body:\n{:?}\n", - resp.status(), - json_data, + status, json_data, + ); + + let resp = Client::new() + .get("https://httpbin.org/range/20?duration=5&chunk_size=10") + .send() + .unwrap(); + println!( + "GET https://httpbin.org/range, status code: {}, body:", + resp.status() ); + while let Some(chunk) = resp.chunk(1024).unwrap() { + println!("{}", String::from_utf8(chunk).unwrap()); + } + println!(); // post with json data let resp = Client::new() @@ -51,7 +64,7 @@ impl Guest for Component { println!( "POST https://httpbin.org/post, status code: {}, body:\n{}", resp.status(), - String::from_utf8_lossy(resp.body()) + String::from_utf8(resp.body().unwrap()).unwrap() ); // post with form data @@ -66,7 +79,7 @@ impl Guest for Component { println!( "POST https://httpbin.org/post, status code: {}, body:\n{}", resp.status(), - String::from_utf8_lossy(resp.body()) + String::from_utf8(resp.body().unwrap()).unwrap() ); // post with file form data @@ -93,7 +106,7 @@ hello println!( "POST https://httpbin.org/post, status code: {}, body:\n{}", resp.status(), - String::from_utf8_lossy(resp.body()) + String::from_utf8(resp.body().unwrap()).unwrap() ); Ok(()) }