Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question - getting last byte latency metric with trace layer #119

Closed
lkts opened this issue Aug 10, 2021 · 5 comments
Closed

Question - getting last byte latency metric with trace layer #119

lkts opened this issue Aug 10, 2021 · 5 comments

Comments

@lkts
Copy link

lkts commented Aug 10, 2021

First of all, thanks everyone involved for amazing work on trace layer.

I want to get last-byte latency metric from it to understand how much overall time clients spend getting response from server (i use on_response to get first-byte latency now but it does not tell the full picture).
Using on_body_chunk i don't have enough information to decide if it's the last chunk in stream. Initially I thought that i can get it out of the box thanks to on_eos. Later however i have discovered that classifier from new_for_http never classifies responses to invoke it and i don't have enough context to understand why.

What would be the way to get this working? Can i just have a classifier that returns ClassifiedResponse::RequiresEos for successful responses?

@lkts
Copy link
Author

lkts commented Aug 14, 2021

After some digging i believe i understand the reason for this implementation - poll_trailers is never actually called (at least by hyper) in HTTP1 context. So RequiresEos will not result in on_eos being called.

I think my initial expectation was that on_eos fires here

return Poll::Ready(None);

Does it make sense to expose additional callback from there (something like on_response_end)?

@davidpdrsn
Copy link
Member

davidpdrsn commented Aug 18, 2021

After some digging i believe i understand the reason for this implementation - poll_trailers is never actually called (at least by hyper) in HTTP1 context. So RequiresEos will not result in on_eos being called.

Right. I've wondered about this myself. Not sure how its supposed to work.

Later however i have discovered that classifier from new_for_http never classifies responses to invoke it and i don't have enough context to understand why.

The main reason for this is that there is no standard way to classify the end of a stream based on the status code, because well there is no status code. It could be classified as success always, but that feels kinda arbitrary.

I think this can be achieved by writing a custom classifier and abusing the fact that the classifier is dropped when the body has been fully sent:

use axum::body::Bytes;
use http::{HeaderMap, Request, Response};
use hyper::Body;
use std::convert::Infallible;
use std::time::Instant;
use std::{net::SocketAddr, time::Duration};
use tower::ServiceBuilder;
use tower_http::classify::{ClassifiedResponse, ClassifyEos, ClassifyResponse, MakeClassifier};
use tower_http::trace::TraceLayer;

#[tokio::main]
async fn main() {
    let svc = ServiceBuilder::new()
        .layer(TraceLayer::new(MyMakeClassifier))
        .service_fn(|request: Request<Body>| async move {
            let (mut tx, body) = Body::channel();

            tokio::spawn(async move {
                // simulate sending a slow response
                tokio::time::sleep(Duration::from_secs(1)).await;
                tx.send_data(Bytes::from("foo")).await.unwrap();
                tx.send_data(Bytes::from("bar")).await.unwrap();
                tx.send_data(Bytes::from("baz")).await.unwrap();
            });

            let response = Response::new(body);
            Ok::<_, Infallible>(response)
        });

    // run it
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    tracing::debug!("listening on {}", addr);
    axum::Server::bind(&addr)
        .serve(tower::make::Shared::new(svc))
        .await
        .unwrap();
}

#[derive(Clone)]
struct MyMakeClassifier;

impl MakeClassifier for MyMakeClassifier {
    type Classifier = MyClassifier;
    type FailureClass = Infallible;
    type ClassifyEos = MyClassifier;

    fn make_classifier<B>(&self, req: &Request<B>) -> Self::Classifier {
        MyClassifier {
            request_received_at: Instant::now(),
        }
    }
}

#[derive(Clone)]
struct MyClassifier {
    request_received_at: Instant,
}

impl ClassifyResponse for MyClassifier {
    type FailureClass = Infallible;
    type ClassifyEos = Self;

    fn classify_response<B>(
        self,
        res: &http::Response<B>,
    ) -> ClassifiedResponse<Self::FailureClass, Self::ClassifyEos> {
        ClassifiedResponse::RequiresEos(self)
    }

    fn classify_error<E>(self, error: &E) -> Self::FailureClass
    where
        E: std::fmt::Display + 'static,
    {
        unimplemented!()
    }
}

impl ClassifyEos for MyClassifier {
    type FailureClass = Infallible;

    fn classify_eos(self, trailers: Option<&HeaderMap>) -> Result<(), Self::FailureClass> {
        Ok(())
    }

    fn classify_error<E>(self, error: &E) -> Self::FailureClass
    where
        E: std::fmt::Display + 'static,
    {
        unimplemented!()
    }
}

impl Drop for MyClassifier {
    fn drop(&mut self) {
        // or whatever else you need to do
        println!(
            "response sent after {:?}",
            self.request_received_at.elapsed()
        );
    }
}

@lkts
Copy link
Author

lkts commented Aug 18, 2021

Thanks for your response. I have similar approach - i wrapped a Stream that i produce and send as a response which considers moment where Drop for stream is called as end of response.

Regarding my previous suggestion for on_response_end - i have discovered that it may not actually work too because hyper may not poll stream till Poll::Ready(None) if it knows that it already got all the bytes specified in content-length header. I am not sure if something generic can be done here in tower.

@davidpdrsn
Copy link
Member

Thanks for your response. I have similar approach - i wrapped a Stream that i produce and send as a response which considers moment where Drop for stream is called as end of response.

That sounds easier than what I did actually.

I'll close this for now. Feel free to re-open if there is more to discuss.

@livioribeiro
Copy link

Thanks for your response. I have similar approach - i wrapped a Stream that i produce and send as a response which considers moment where Drop for stream is called as end of response.

That sounds easier than what I did actually.

I'll close this for now. Feel free to re-open if there is more to discuss.

I defined a response extension with a struct holding a closure that calls the closure when it is dropped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants