diff --git a/src/asgi/http.rs b/src/asgi/http.rs index cb9de67..f47ee32 100644 --- a/src/asgi/http.rs +++ b/src/asgi/http.rs @@ -154,8 +154,8 @@ impl<'py> IntoPyObject<'py> for HttpConnectionScope { dict.set_item("method", self.method.into_pyobject(py)?)?; dict.set_item("scheme", self.scheme)?; dict.set_item("path", self.path)?; - dict.set_item("raw_path", self.raw_path)?; - dict.set_item("query_string", self.query_string)?; + dict.set_item("raw_path", self.raw_path.as_bytes())?; + dict.set_item("query_string", self.query_string.as_bytes())?; dict.set_item("root_path", self.root_path)?; dict.set_item("headers", self.headers.into_pyobject(py)?)?; if let Some((host, port)) = self.client { @@ -355,10 +355,7 @@ impl<'py> FromPyObject<'py> for HttpSendMessage { }) } "http.response.body" => { - let body: Vec = dict - .get_item("body")? - .ok_or_else(|| PyValueError::new_err("Missing 'body' key in HTTP response body message"))? - .extract()?; + let body: Vec = dict.get_item("body")?.map_or(Ok(vec![]), |v| v.extract())?; let more_body: bool = dict .get_item("more_body")? @@ -562,8 +559,8 @@ mod tests { ); assert_eq!(dict_extract!(py_scope, "scheme", String), "http"); assert_eq!(dict_extract!(py_scope, "path", String), ""); - assert_eq!(dict_extract!(py_scope, "raw_path", String), ""); - assert_eq!(dict_extract!(py_scope, "query_string", String), ""); + assert_eq!(dict_extract!(py_scope, "raw_path", Vec), b""); + assert_eq!(dict_extract!(py_scope, "query_string", Vec), b""); assert_eq!(dict_extract!(py_scope, "root_path", String), ""); assert_eq!( dict_extract!(py_scope, "headers", Vec<(String, String)>), diff --git a/src/asgi/mod.rs b/src/asgi/mod.rs index 01aa427..7cf34b8 100644 --- a/src/asgi/mod.rs +++ b/src/asgi/mod.rs @@ -237,25 +237,29 @@ impl Handler for Asgi { // Create response channel let (response_tx, response_rx) = oneshot::channel(); - // Spawn task to collect response - tokio::spawn(collect_response_messages(tx_receiver, response_tx)); - // Submit the ASGI app call to Python event loop - Python::attach(|py| { + let future = Python::attach(|py| { let scope_py = scope.into_pyobject(py)?; let coro = self .app_function .call1(py, (scope_py, rx_receiver, tx_sender))?; let asyncio = py.import("asyncio")?; - asyncio.call_method1( + let future = asyncio.call_method1( "run_coroutine_threadsafe", (coro, self.event_loop_handle.event_loop()), )?; - Ok::<(), HandlerError>(()) + Ok::, HandlerError>(future.unbind()) })?; + // Spawn task to collect response and monitor for Python exceptions + tokio::spawn(collect_response_with_exception_handling( + tx_receiver, + response_tx, + future, + )); + // Wait for response let (status, headers, body) = response_rx.await??; @@ -379,48 +383,92 @@ fn start_python_event_loop_thread(event_loop: Py) { }); } -/// Collect ASGI response messages -async fn collect_response_messages( +/// Collect ASGI response messages while monitoring for Python exceptions +async fn collect_response_with_exception_handling( mut tx_receiver: tokio::sync::mpsc::UnboundedReceiver>, response_tx: oneshot::Sender, + python_future: Py, ) { let mut status = 500u16; let mut headers = Vec::new(); let mut body = Vec::new(); let mut response_started = false; - while let Some(ack_msg) = tx_receiver.recv().await { - let AcknowledgedMessage { message, ack } = ack_msg; - - match message { - HttpSendMessage::HttpResponseStart { - status: s, - headers: h, - .. - } => { - status = s; - headers = h; - response_started = true; + // Spawn a task to monitor the Python future for exceptions + let future_clone = Python::attach(|py| python_future.clone_ref(py)); + let mut exception_handle = tokio::task::spawn_blocking(move || { + Python::attach(|py| { + let future_bound = future_clone.bind(py); + // Wait for the future to complete (with 30 second timeout) + match future_bound.call_method1("result", (30.0,)) { + Ok(_) => None, // Success - no exception + Err(e) => Some(e), // Exception occurred } - HttpSendMessage::HttpResponseBody { body: b, more_body } => { - if response_started { - body.extend_from_slice(&b); - if !more_body { + }) + }); + + loop { + tokio::select! { + // Check for messages from the ASGI app + msg = tx_receiver.recv() => { + match msg { + Some(ack_msg) => { + let AcknowledgedMessage { message, ack } = ack_msg; + + match message { + HttpSendMessage::HttpResponseStart { + status: s, + headers: h, + .. + } => { + status = s; + headers = h; + response_started = true; + } + HttpSendMessage::HttpResponseBody { body: b, more_body } => { + if response_started { + body.extend_from_slice(&b); + if !more_body { + let _ = ack.send(()); + let _ = response_tx.send(Ok((status, headers, body))); + return; + } + } + } + } + let _ = ack.send(()); - let _ = response_tx.send(Ok((status, headers, body))); + } + None => { + // Channel closed without a complete response + let _ = response_tx.send(Err(if response_started { + HandlerError::ResponseInterrupted + } else { + HandlerError::NoResponse + })); + return; + } + } + } + // Check if the Python coroutine raised an exception + exception_result = &mut exception_handle => { + match exception_result { + Ok(Some(py_err)) => { + // Python exception occurred + let _ = response_tx.send(Err(HandlerError::PythonError(py_err))); + return; + } + Ok(None) => { + // Python coroutine completed successfully + // Continue waiting for response messages + } + Err(e) => { + // Tokio task error + let _ = response_tx.send(Err(HandlerError::TokioError(e.to_string()))); return; } } } } - - let _ = ack.send(()); } - - // If we got here, the channel closed without a complete response - let _ = response_tx.send(Err(if response_started { - HandlerError::ResponseInterrupted - } else { - HandlerError::NoResponse - })); } diff --git a/src/lib.rs b/src/lib.rs index 648360f..2dea575 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -348,6 +348,10 @@ pub enum HandlerError { /// Error when a lock is poisoned #[error("Lock poisoned: {0}")] LockPoisoned(String), + + /// Error when a Tokio task fails + #[error("Tokio task error: {0}")] + TokioError(String), } impl From> for HandlerError {