Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion edge-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> {
.bind(addr.parse().unwrap())
.await?;

server.run(acceptor, HttpHandler).await?;
server.run(None, acceptor, HttpHandler).await?;

Ok(())
}
Expand Down
65 changes: 52 additions & 13 deletions edge-http/src/io/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use core::fmt::{self, Debug, Display};
use core::mem::{self, MaybeUninit};
use core::pin::pin;

use edge_nal::{with_timeout, Close, TcpShutdown, WithTimeout, WithTimeoutError};
use edge_nal::{with_timeout, Close, Readable, TcpShutdown, WithTimeout, WithTimeoutError};

use embassy_sync::blocking_mutex::raw::NoopRawMutex;
use embassy_sync::mutex::Mutex;
Expand Down Expand Up @@ -437,22 +437,51 @@ where
/// The socket stream will be closed only in case of error, or until the client explicitly requests that
/// either with a hard socket close, or with a `Connection: Close` header.
///
/// A note on timeouts:
/// - The function does NOT - by default - establish any timeouts on the IO operations _except_
/// an optional timeout for detecting idle connections, so that they can be closed and thus make
/// the server available for accepting new connections.
/// It is up to the caller to wrap the acceptor type with `edge_nal::WithTimeout` to establish
/// timeouts on the socket produced by the acceptor.
/// - Similarly, the server does NOT establish any timeouts on the complete request-response cycle.
/// It is up to the caller to wrap their complete or partial handling logic with
/// `edge_nal::with_timeout`, or its whole handler with `edge_nal::WithTimeout`, so as to establish
/// a global or semi-global request-response timeout.
///
/// Parameters:
/// - `io`: A socket stream
/// - `buf`: A work-area buffer used by the implementation
/// - `keepalive_timeout_ms`: An optional timeout in milliseconds for detecting an idle keepalive connection
/// that should be closed. If not provided, the server will not close idle connections.
/// - `task_id`: An identifier for the task, used for logging purposes
/// - `handler`: An implementation of `Handler` to handle incoming requests
pub async fn handle_connection<H, T, const N: usize>(
mut io: T,
buf: &mut [u8],
keepalive_timeout_ms: Option<u32>,
task_id: impl Display + Copy,
handler: H,
) where
H: Handler,
T: Read + Write + TcpShutdown,
T: Read + Write + Readable + TcpShutdown,
{
let close = loop {
debug!("Handler task {task_id}: Waiting for new request");
debug!("Handler task {task_id}: Waiting for a new request");

if let Some(keepalive_timeout_ms) = keepalive_timeout_ms {
let wait_data = with_timeout(keepalive_timeout_ms, io.readable()).await;
match wait_data {
Err(WithTimeoutError::Timeout) => {
info!("Handler task {task_id}: Closing connection due to inactivity");
break true;
}
Err(e) => {
warn!("Handler task {task_id}: Error when handling request: {e:?}");
break true;
}
Ok(_) => {}
}
}

let result = handle_request::<_, _, N>(buf, &mut io, task_id, &handler).await;

Expand Down Expand Up @@ -598,22 +627,31 @@ impl<const P: usize, const B: usize, const N: usize> Server<P, B, N> {

/// Run the server with the specified acceptor and handler
///
/// A note on timeouts:
/// - The function does NOT - by default - establish any timeouts on the IO operations _except_
/// an optional timeout on idle connections, so that they can be closed.
/// It is up to the caller to wrap the acceptor type with `edge_nal::WithTimeout` to establish
/// timeouts on the socket produced by the acceptor.
/// - Similarly, the function does NOT establish any timeouts on the complete request-response cycle.
/// It is up to the caller to wrap their complete or partial handling logic with
/// `edge_nal::with_timeout`, or its whole handler with `edge_nal::WithTimeout`, so as to establish
/// a global or semi-global request-response timeout.
///
/// Parameters:
/// - `keepalive_timeout_ms`: An optional timeout in milliseconds for detecting an idle keepalive
/// connection that should be closed. If not provided, the function will not close idle connections
/// and the connection - in the absence of other timeouts - will remain active forever.
/// - `acceptor`: An implementation of `edge_nal::TcpAccept` to accept incoming connections
/// - `handler`: An implementation of `Handler` to handle incoming requests
/// If not provided, a default timeout of 50 seconds is used.
///
/// Note that the server does NOT - by default - establish any timeouts on the IO operations.
/// It is up to the caller to wrap the acceptor type with `edge_nal::WithTimeout` to establish
/// timeouts on the socket produced by the acceptor.
///
/// Similarly, the server does NOT establish any timeouts on the complete request-response cycle.
/// It is up to the caller to wrap their complete or partial handling logic with
/// `edge_nal::with_timeout`, or its whole handler with `edge_nal::WithTimeout`, so as to establish
/// a global or semi-global request-response timeout.
#[inline(never)]
#[cold]
pub async fn run<A, H>(&mut self, acceptor: A, handler: H) -> Result<(), Error<A::Error>>
pub async fn run<A, H>(
&mut self,
keepalive_timeout_ms: Option<u32>,
acceptor: A,
handler: H,
) -> Result<(), Error<A::Error>>
where
A: edge_nal::TcpAccept,
H: Handler,
Expand Down Expand Up @@ -649,6 +687,7 @@ impl<const P: usize, const B: usize, const N: usize> Server<P, B, N> {
handle_connection::<_, _, N>(
io,
unsafe { buf.as_mut() }.unwrap(),
keepalive_timeout_ms,
task_id,
handler,
)
Expand Down
2 changes: 1 addition & 1 deletion edge-ws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> {
.bind(addr.parse().unwrap())
.await?;

server.run(acceptor, WsHandler).await?;
server.run(None, acceptor, WsHandler).await?;

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion examples/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> {
.bind(addr.parse().unwrap())
.await?;

server.run(acceptor, HttpHandler).await?;
server.run(None, acceptor, HttpHandler).await?;

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion examples/ws_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub async fn run(server: &mut DefaultServer) -> Result<(), anyhow::Error> {
.bind(addr.parse().unwrap())
.await?;

server.run(acceptor, WsHandler).await?;
server.run(None, acceptor, WsHandler).await?;

Ok(())
}
Expand Down
Loading