Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions src/asgi/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ impl<'py> IntoPyObject<'py> for HttpConnectionScope {
dict.set_item("method", self.method.into_pyobject(py)?)?;
dict.set_item("scheme", self.scheme)?;
dict.set_item("path", self.path)?;
dict.set_item("raw_path", self.raw_path)?;
dict.set_item("query_string", self.query_string)?;
dict.set_item("raw_path", self.raw_path.as_bytes())?;
dict.set_item("query_string", self.query_string.as_bytes())?;
dict.set_item("root_path", self.root_path)?;
dict.set_item("headers", self.headers.into_pyobject(py)?)?;
if let Some((host, port)) = self.client {
Expand Down Expand Up @@ -355,10 +355,7 @@ impl<'py> FromPyObject<'py> for HttpSendMessage {
})
}
"http.response.body" => {
let body: Vec<u8> = dict
.get_item("body")?
.ok_or_else(|| PyValueError::new_err("Missing 'body' key in HTTP response body message"))?
.extract()?;
let body: Vec<u8> = dict.get_item("body")?.map_or(Ok(vec![]), |v| v.extract())?;

let more_body: bool = dict
.get_item("more_body")?
Expand Down Expand Up @@ -562,8 +559,8 @@ mod tests {
);
assert_eq!(dict_extract!(py_scope, "scheme", String), "http");
assert_eq!(dict_extract!(py_scope, "path", String), "");
assert_eq!(dict_extract!(py_scope, "raw_path", String), "");
assert_eq!(dict_extract!(py_scope, "query_string", String), "");
assert_eq!(dict_extract!(py_scope, "raw_path", Vec<u8>), b"");
assert_eq!(dict_extract!(py_scope, "query_string", Vec<u8>), b"");
assert_eq!(dict_extract!(py_scope, "root_path", String), "");
assert_eq!(
dict_extract!(py_scope, "headers", Vec<(String, String)>),
Expand Down
116 changes: 82 additions & 34 deletions src/asgi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,25 +237,29 @@ impl Handler for Asgi {
// Create response channel
let (response_tx, response_rx) = oneshot::channel();

// Spawn task to collect response
tokio::spawn(collect_response_messages(tx_receiver, response_tx));

// Submit the ASGI app call to Python event loop
Python::attach(|py| {
let future = Python::attach(|py| {
let scope_py = scope.into_pyobject(py)?;
let coro = self
.app_function
.call1(py, (scope_py, rx_receiver, tx_sender))?;

let asyncio = py.import("asyncio")?;
asyncio.call_method1(
let future = asyncio.call_method1(
"run_coroutine_threadsafe",
(coro, self.event_loop_handle.event_loop()),
)?;

Ok::<(), HandlerError>(())
Ok::<Py<PyAny>, HandlerError>(future.unbind())
})?;

// Spawn task to collect response and monitor for Python exceptions
tokio::spawn(collect_response_with_exception_handling(
tx_receiver,
response_tx,
future,
));

// Wait for response
let (status, headers, body) = response_rx.await??;

Expand Down Expand Up @@ -379,48 +383,92 @@ fn start_python_event_loop_thread(event_loop: Py<PyAny>) {
});
}

/// Collect ASGI response messages
async fn collect_response_messages(
/// Collect ASGI response messages while monitoring for Python exceptions
async fn collect_response_with_exception_handling(
mut tx_receiver: tokio::sync::mpsc::UnboundedReceiver<AcknowledgedMessage<HttpSendMessage>>,
response_tx: oneshot::Sender<HttpResponseResult>,
python_future: Py<PyAny>,
) {
let mut status = 500u16;
let mut headers = Vec::new();
let mut body = Vec::new();
let mut response_started = false;

while let Some(ack_msg) = tx_receiver.recv().await {
let AcknowledgedMessage { message, ack } = ack_msg;

match message {
HttpSendMessage::HttpResponseStart {
status: s,
headers: h,
..
} => {
status = s;
headers = h;
response_started = true;
// Spawn a task to monitor the Python future for exceptions
let future_clone = Python::attach(|py| python_future.clone_ref(py));
let mut exception_handle = tokio::task::spawn_blocking(move || {
Python::attach(|py| {
let future_bound = future_clone.bind(py);
// Wait for the future to complete (with 30 second timeout)
match future_bound.call_method1("result", (30.0,)) {
Ok(_) => None, // Success - no exception
Err(e) => Some(e), // Exception occurred
}
HttpSendMessage::HttpResponseBody { body: b, more_body } => {
if response_started {
body.extend_from_slice(&b);
if !more_body {
})
});

loop {
tokio::select! {
// Check for messages from the ASGI app
msg = tx_receiver.recv() => {
match msg {
Some(ack_msg) => {
let AcknowledgedMessage { message, ack } = ack_msg;

match message {
HttpSendMessage::HttpResponseStart {
status: s,
headers: h,
..
} => {
status = s;
headers = h;
response_started = true;
}
HttpSendMessage::HttpResponseBody { body: b, more_body } => {
if response_started {
body.extend_from_slice(&b);
if !more_body {
let _ = ack.send(());
let _ = response_tx.send(Ok((status, headers, body)));
return;
}
}
}
}

let _ = ack.send(());
let _ = response_tx.send(Ok((status, headers, body)));
}
None => {
// Channel closed without a complete response
let _ = response_tx.send(Err(if response_started {
HandlerError::ResponseInterrupted
} else {
HandlerError::NoResponse
}));
return;
}
}
}
// Check if the Python coroutine raised an exception
exception_result = &mut exception_handle => {
match exception_result {
Ok(Some(py_err)) => {
// Python exception occurred
let _ = response_tx.send(Err(HandlerError::PythonError(py_err)));
return;
}
Ok(None) => {
// Python coroutine completed successfully
// Continue waiting for response messages
}
Err(e) => {
// Tokio task error
let _ = response_tx.send(Err(HandlerError::TokioError(e.to_string())));
return;
}
}
}
}

let _ = ack.send(());
}

// If we got here, the channel closed without a complete response
let _ = response_tx.send(Err(if response_started {
HandlerError::ResponseInterrupted
} else {
HandlerError::NoResponse
}));
}
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ pub enum HandlerError {
/// Error when a lock is poisoned
#[error("Lock poisoned: {0}")]
LockPoisoned(String),

/// Error when a Tokio task fails
#[error("Tokio task error: {0}")]
TokioError(String),
}

impl<T> From<std::sync::PoisonError<T>> for HandlerError {
Expand Down
Loading