Skip to content

Commit

Permalink
HttpRecord pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
lrowe committed Oct 3, 2023
1 parent 3cec588 commit 3502a2c
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 41 deletions.
22 changes: 11 additions & 11 deletions ext/http/http_next.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::service::handle_request;
use crate::service::http_trace;
use crate::service::HttpRecord;
use crate::service::HttpRequestBodyAutocloser;
use crate::service::RefCount;
use crate::service::HttpServerState;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
use cache_control::CacheControl;
Expand Down Expand Up @@ -816,13 +816,13 @@ fn serve_https(
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>> {
let HttpLifetime {
refcount,
server_state,
connection_cancel_handle,
listen_cancel_handle,
} = lifetime;

let svc = service_fn(move |req: Request| {
handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
handle_request(req, request_info.clone(), server_state.clone(), tx.clone())
});
spawn(
async {
Expand Down Expand Up @@ -853,13 +853,13 @@ fn serve_http(
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>> {
let HttpLifetime {
refcount,
server_state,
connection_cancel_handle,
listen_cancel_handle,
} = lifetime;

let svc = service_fn(move |req: Request| {
handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
handle_request(req, request_info.clone(), server_state.clone(), tx.clone())
});
spawn(
serve_http2_autodetect(io, svc, listen_cancel_handle)
Expand Down Expand Up @@ -899,15 +899,15 @@ where
struct HttpLifetime {
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
refcount: RefCount,
server_state: Rc<HttpServerState>,
}

struct HttpJoinHandle {
join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>,
refcount: RefCount,
server_state: Rc<HttpServerState>,
}

impl HttpJoinHandle {
Expand All @@ -917,15 +917,15 @@ impl HttpJoinHandle {
connection_cancel_handle: CancelHandle::new_rc(),
listen_cancel_handle: CancelHandle::new_rc(),
rx: AsyncRefCell::new(rx),
refcount: RefCount::default(),
server_state: HttpServerState::new(),
}
}

fn lifetime(self: &Rc<Self>) -> HttpLifetime {
HttpLifetime {
connection_cancel_handle: self.connection_cancel_handle.clone(),
listen_cancel_handle: self.listen_cancel_handle.clone(),
refcount: self.refcount.clone(),
server_state: self.server_state.clone(),
}
}

Expand Down Expand Up @@ -1161,8 +1161,8 @@ pub async fn op_http_close(
join_handle.connection_cancel_handle().cancel();
}

// Async spin on the refcount while we wait for everything to drain
while Rc::strong_count(&join_handle.refcount.0) > 1 {
// Async spin on the server_state while we wait for everything to drain
while Rc::strong_count(&join_handle.server_state) > 1 {
tokio::time::sleep(Duration::from_millis(10)).await;
}

Expand Down
99 changes: 69 additions & 30 deletions ext/http/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::cell::RefCell;
use std::cell::RefMut;
use std::future::Future;
use std::rc::Rc;
use std::rc::Weak;

pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<ResponseBytes>;
Expand All @@ -37,9 +38,23 @@ macro_rules! http_trace {

pub(crate) use http_trace;

#[repr(transparent)]
#[derive(Clone, Default)]
pub struct RefCount(pub Rc<()>);
struct HttpServerStateInner {
pool: Vec<Rc<HttpRecord>>,
}

pub struct HttpServerState(RefCell<HttpServerStateInner>);

impl HttpServerState {
pub fn new() -> Rc<Self> {
Rc::new(Self(RefCell::new(HttpServerStateInner {
pool: Vec::new(),
})))
}

fn recycle(&self, record: Rc<HttpRecord>) {
self.0.borrow_mut().pool.push(record);
}
}

enum RequestBodyState {
Incoming(Incoming),
Expand Down Expand Up @@ -72,15 +87,17 @@ impl Drop for HttpRequestBodyAutocloser {
pub async fn handle_request(
request: Request,
request_info: HttpConnectionProperties,
_refcount: RefCount, // Keep server alive for duration of this future.
server_info: Rc<HttpServerState>, // Keep server alive for duration of this future.
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> Result<Response, hyper::Error> {
// If the underlying TCP connection is closed, this future will be dropped
// and execution could stop at any await point.
// The HttpRecord must live until JavaScript is done processing so is wrapped
// in an Rc. The guard ensures unneeded resources are freed at cancellation.
let guarded_record =
guard(HttpRecord::new(request, request_info), HttpRecord::cancel);
let guarded_record = guard(
HttpRecord::new(request, request_info, &server_info),
HttpRecord::cancel,
);

// Clone HttpRecord and send to JavaScript for processing.
// Safe to unwrap as channel receiver is never closed.
Expand All @@ -97,11 +114,14 @@ pub async fn handle_request(
Rc::strong_count(&record) == 1,
"HTTP state error: Expected to be last strong reference (handle_request)"
);
let response = record.take_response();
let inner = record.0.borrow_mut().take().unwrap();
let response = inner.response.unwrap();
server_info.recycle(record);
Ok(response)
}

struct HttpRecordInner {
server_info: Weak<HttpServerState>,
request_info: HttpConnectionProperties,
request_parts: Parts,
request_body: Option<RequestBodyState>,
Expand All @@ -113,15 +133,20 @@ struct HttpRecordInner {
been_dropped: bool,
}

pub struct HttpRecord(RefCell<HttpRecordInner>);
pub struct HttpRecord(RefCell<Option<HttpRecordInner>>);

impl HttpRecord {
fn new(request: Request, request_info: HttpConnectionProperties) -> Rc<Self> {
fn new(
request: Request,
request_info: HttpConnectionProperties,
server_info: &Rc<HttpServerState>,
) -> Rc<Self> {
let (request_parts, request_body) = request.into_parts();
let body = ResponseBytes::default();
let trailers = body.trailers();
let request_body = Some(request_body.into());
let inner = HttpRecordInner {
server_info: Rc::downgrade(server_info),
request_info,
request_parts,
request_body,
Expand All @@ -131,18 +156,24 @@ impl HttpRecord {
trailers,
been_dropped: false,
};
#[allow(clippy::let_and_return)]
let record = Rc::new(Self(RefCell::new(inner)));
http_trace!(record, "HttpRecord::new");
record
if let Some(record) = server_info.0.borrow_mut().pool.pop() {
http_trace!(record, "HttpRecord::recycled");
*record.0.borrow_mut() = Some(inner);
record
} else {
#[allow(clippy::let_and_return)]
let record = Rc::new(Self(RefCell::new(Some(inner))));
http_trace!(record, "HttpRecord::new");
record
}
}

fn self_ref(&self) -> Ref<'_, HttpRecordInner> {
self.0.borrow()
Ref::map(self.0.borrow(), |option| option.as_ref().unwrap())
}

fn self_mut(&self) -> RefMut<'_, HttpRecordInner> {
self.0.borrow_mut()
RefMut::map(self.0.borrow_mut(), |option| option.as_mut().unwrap())
}

/// Perform the Hyper upgrade on this record.
Expand Down Expand Up @@ -191,7 +222,7 @@ impl HttpRecord {
/// Cleanup resources not needed after the future is dropped.
fn cancel(self: Rc<Self>) {
http_trace!(self, "HttpRecord::cancel");
let mut inner = self.0.borrow_mut();
let mut inner = self.self_mut();
inner.been_dropped = true;
// The request body might include actual resources.
inner.request_body.take();
Expand All @@ -201,14 +232,22 @@ impl HttpRecord {
pub fn complete(self: Rc<Self>) {
http_trace!(self, "HttpRecord::complete");
let mut inner = self.self_mut();
assert!(
!inner.been_dropped || Rc::strong_count(&self) == 1,
"HTTP state error: Expected to be last strong reference (been_dropped)"
);
assert!(
!inner.response_ready,
"HTTP state error: Entry has already been completed"
);
if inner.been_dropped {
assert!(
Rc::strong_count(&self) == 1,
"HTTP state error: Expected to be last strong reference (been_dropped)"
);
if let Some(server_info) = inner.server_info.upgrade() {
drop(inner);
self.0.borrow_mut().take();
server_info.recycle(self);
}
return;
}
inner.response_ready = true;
if let Some(waker) = inner.response_waker.take() {
drop(inner);
Expand All @@ -232,11 +271,6 @@ impl HttpRecord {
Ref::map(self.self_ref(), |inner| &inner.trailers)
}

/// Take the response.
fn take_response(&self) -> Response {
self.self_mut().response.take().unwrap()
}

/// Get a reference to the connection properties.
pub fn request_info(&self) -> Ref<'_, HttpConnectionProperties> {
Ref::map(self.self_ref(), |inner| &inner.request_info)
Expand All @@ -258,7 +292,7 @@ impl HttpRecord {
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut mut_self = self.0 .0.borrow_mut();
let mut mut_self = self.0.self_mut();
if mut_self.response_ready {
return std::task::Poll::Ready(());
}
Expand Down Expand Up @@ -333,16 +367,21 @@ mod tests {
#[tokio::test]
async fn test_handle_request() -> Result<(), AnyError> {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let refcount = RefCount::default();
let refcount_check = refcount.clone();
let server_state = HttpServerState::new();
let refcount_check = server_state.clone();
let request_info = HttpConnectionProperties {
peer_address: "".into(),
peer_port: None,
local_port: None,
stream_type: NetworkStreamType::Tcp,
};
let svc = service_fn(move |req: hyper1::Request<Incoming>| {
handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
handle_request(
req,
request_info.clone(),
server_state.clone(),
tx.clone(),
)
});

let client_req = http::Request::builder().uri("/").body("".to_string())?;
Expand Down Expand Up @@ -376,7 +415,7 @@ mod tests {
.await
},
)?;
assert_eq!(Rc::strong_count(&refcount_check.0), 1);
assert_eq!(Rc::strong_count(&refcount_check), 1);
Ok(())
}
}

0 comments on commit 3502a2c

Please sign in to comment.