Skip to content

Commit

Permalink
Upgrade to hyper/http v1.0 (#1674)
Browse files Browse the repository at this point in the history
Co-authored-by: Lalit Kumar Bhasin <labhas@microsoft.com>
  • Loading branch information
aumetra and lalitb committed Jul 12, 2024
1 parent 166a127 commit 621a5a9
Show file tree
Hide file tree
Showing 26 changed files with 187 additions and 137 deletions.
18 changes: 10 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,25 @@ criterion = "0.5"
futures-core = "0.3"
futures-executor = "0.3"
futures-util = { version = "0.3", default-features = false }
hyper = { version = "0.14", default-features = false }
http = { version = "0.2", default-features = false }
http = { version = "1.1", default-features = false, features = ["std"] }
http-body-util = "0.1"
hyper = { version = "1.3", default-features = false }
hyper-util = "0.1"
log = "0.4.21"
once_cell = "1.13"
ordered-float = "4.0"
pin-project-lite = "0.2"
prost = "0.12"
prost-build = "0.12"
prost-types = "0.12"
prost = "0.13"
prost-build = "0.13"
prost-types = "0.13"
rand = { version = "0.8", default-features = false }
reqwest = { version = "0.11", default-features = false }
reqwest = { version = "0.12", default-features = false }
serde = { version = "1.0", default-features = false }
serde_json = "1.0"
temp-env = "0.3.6"
thiserror = { version = "1", default-features = false }
tonic = { version = "0.11", default-features = false }
tonic-build = "0.11"
tonic = { version = "0.12", default-features = false }
tonic-build = "0.12"
tokio = { version = "1", default-features = false }
tokio-stream = "0.1.1"
tracing = { version = "0.1", default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions examples/tracing-http-propagator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ path = "src/client.rs"
doc = false

[dependencies]
http-body-util = { workspace = true }
hyper = { workspace = true, features = ["full"] }
hyper-util = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full"] }
opentelemetry = { path = "../../opentelemetry" }
opentelemetry_sdk = { path = "../../opentelemetry-sdk" }
Expand Down
9 changes: 5 additions & 4 deletions examples/tracing-http-propagator/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use hyper::{body::Body, Client};
use http_body_util::Full;
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use opentelemetry::{
global,
trace::{SpanKind, TraceContextExt, Tracer},
Context, KeyValue,
};
use opentelemetry_http::HeaderInjector;
use opentelemetry_http::{Bytes, HeaderInjector};
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider};
use opentelemetry_stdout::SpanExporter;

Expand All @@ -24,7 +25,7 @@ async fn send_request(
body_content: &str,
span_name: &str,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let client = Client::new();
let client = Client::builder(TokioExecutor::new()).build_http();
let tracer = global::tracer("example/client");
let span = tracer
.span_builder(String::from(span_name))
Expand All @@ -37,7 +38,7 @@ async fn send_request(
propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut().unwrap()))
});
let res = client
.request(req.body(Body::from(String::from(body_content)))?)
.request(req.body(Full::new(Bytes::from(body_content.to_string())))?)
.await?;

cx.span().add_event(
Expand Down
56 changes: 37 additions & 19 deletions examples/tracing-http-propagator/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,64 @@
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
};
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::{body::Incoming, service::service_fn, Request, Response, StatusCode};
use hyper_util::rt::{TokioExecutor, TokioIo};
use opentelemetry::{
global,
trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer},
Context, KeyValue,
};
use opentelemetry_http::HeaderExtractor;
use opentelemetry_http::{Bytes, HeaderExtractor};
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider};
use opentelemetry_semantic_conventions::trace;
use opentelemetry_stdout::SpanExporter;
use std::{convert::Infallible, net::SocketAddr};
use tokio::net::TcpListener;

// Utility function to extract the context from the incoming request headers
fn extract_context_from_request(req: &Request<Body>) -> Context {
fn extract_context_from_request(req: &Request<Incoming>) -> Context {
global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(req.headers()))
})
}

// Separate async function for the handle endpoint
async fn handle_health_check(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
async fn handle_health_check(
_req: Request<Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {
let tracer = global::tracer("example/server");
let mut span = tracer
.span_builder("health_check")
.with_kind(SpanKind::Internal)
.start(&tracer);
span.add_event("Health check accessed", vec![]);
let res = Response::new(Body::from("Server is up and running!"));

let res = Response::new(
Full::new(Bytes::from_static(b"Server is up and running!"))
.map_err(|err| match err {})
.boxed(),
);

Ok(res)
}

// Separate async function for the echo endpoint
async fn handle_echo(req: Request<Body>) -> Result<Response<Body>, Infallible> {
async fn handle_echo(
req: Request<Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {
let tracer = global::tracer("example/server");
let mut span = tracer
.span_builder("echo")
.with_kind(SpanKind::Internal)
.start(&tracer);
span.add_event("Echoing back the request", vec![]);
let res = Response::new(req.into_body());

let res = Response::new(req.into_body().boxed());

Ok(res)
}

async fn router(req: Request<Body>) -> Result<Response<Body>, Infallible> {
async fn router(
req: Request<Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {
// Extract the context from the incoming request headers
let parent_cx = extract_context_from_request(&req);
let response = {
Expand All @@ -64,12 +78,13 @@ async fn router(req: Request<Body>) -> Result<Response<Body>, Infallible> {
_ => {
cx.span()
.set_attribute(KeyValue::new(trace::HTTP_RESPONSE_STATUS_CODE, 404));
let mut not_found = Response::default();
let mut not_found = Response::new(BoxBody::default());
*not_found.status_mut() = StatusCode::NOT_FOUND;
Ok(not_found)
}
}
};

response
}

Expand All @@ -87,15 +102,18 @@ fn init_tracer() {

#[tokio::main]
async fn main() {
use hyper_util::server::conn::auto::Builder;

init_tracer();
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = TcpListener::bind(addr).await.unwrap();

let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(router)) });

let server = Server::bind(&addr).serve(make_svc);

println!("Listening on {addr}");
if let Err(e) = server.await {
eprintln!("server error: {e}");
while let Ok((stream, _addr)) = listener.accept().await {
if let Err(err) = Builder::new(TokioExecutor::new())
.serve_connection(TokioIo::new(stream), service_fn(router))
.await
{
eprintln!("{err}");
}
}
}
1 change: 0 additions & 1 deletion opentelemetry-appender-tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ rust-version = "1.65"

[dependencies]
log = { workspace = true, optional = true }
once_cell = { workspace = true }
opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["logs"] }
tracing = { workspace = true, features = ["std"]}
tracing-core = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-http/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- **Breaking** Correct the misspelling of "webkpi" to "webpki" in features [#1842](https://github.com/open-telemetry/opentelemetry-rust/pull/1842)
- **Breaking** Remove support for the `isahc` HTTP client [#1924](https://github.com/open-telemetry/opentelemetry-rust/pull/1924)
- Update to `http` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674)

## v0.12.0

Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ edition = "2021"
rust-version = "1.65"

[features]
hyper = ["dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:tokio"]
reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"]
reqwest-rustls-webpki-roots = ["reqwest", "reqwest/rustls-tls-webpki-roots"]

[dependencies]
async-trait = { workspace = true }
bytes = { workspace = true }
http = { workspace = true }
hyper = { workspace = true, features = ["http2", "client", "tcp"], optional = true }
http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, optional = true }
hyper-util = { workspace = true, features = ["client-legacy", "http2"], optional = true }
opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["trace"] }
reqwest = { workspace = true, features = ["blocking"], optional = true }
tokio = { workspace = true, features = ["time"], optional = true }
44 changes: 37 additions & 7 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,24 @@ pub mod hyper {

use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};
use http::HeaderValue;
use hyper::client::connect::Connect;
use hyper::Client;
use http_body_util::{BodyExt, Full};
use hyper::body::{Body as HttpBody, Frame};
use hyper_util::client::legacy::{connect::Connect, Client};
use std::fmt::Debug;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::Duration;
use tokio::time;

#[derive(Debug, Clone)]
pub struct HyperClient<C> {
inner: Client<C>,
inner: Client<C, Body>,
timeout: Duration,
authorization: Option<HeaderValue>,
}

impl<C> HyperClient<C> {
pub fn new_with_timeout(inner: Client<C>, timeout: Duration) -> Self {
pub fn new_with_timeout(inner: Client<C, Body>, timeout: Duration) -> Self {
Self {
inner,
timeout,
Expand All @@ -128,7 +131,7 @@ pub mod hyper {
}

pub fn new_with_timeout_and_authorization_header(
inner: Client<C>,
inner: Client<C, Body>,
timeout: Duration,
authorization: HeaderValue,
) -> Self {
Expand All @@ -147,22 +150,49 @@ pub mod hyper {
{
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let (parts, body) = request.into_parts();
let mut request = Request::from_parts(parts, body.into());
let mut request = Request::from_parts(parts, Body(Full::from(body)));
if let Some(ref authorization) = self.authorization {
request
.headers_mut()
.insert(http::header::AUTHORIZATION, authorization.clone());
}
let mut response = time::timeout(self.timeout, self.inner.request(request)).await??;
let headers = std::mem::take(response.headers_mut());

let mut http_response = Response::builder()
.status(response.status())
.body(hyper::body::to_bytes(response.into_body()).await?)?;
.body(response.into_body().collect().await?.to_bytes())?;
*http_response.headers_mut() = headers;

Ok(http_response.error_for_status()?)
}
}

pub struct Body(Full<Bytes>);

impl HttpBody for Body {
type Data = Bytes;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

#[inline]
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let inner_body = unsafe { self.map_unchecked_mut(|b| &mut b.0) };
inner_body.poll_frame(cx).map_err(Into::into)
}

#[inline]
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}

#[inline]
fn size_hint(&self) -> hyper::body::SizeHint {
self.0.size_hint()
}
}
}

/// Methods to make working with responses from the [`HttpClient`] trait easier.
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ now use `.with_resource(RESOURCE::default())` to configure Resource when using
previous release.
- **Breaking** [1869](https://github.com/open-telemetry/opentelemetry-rust/pull/1869) The OTLP logs exporter now overrides the [InstrumentationScope::name](https://github.com/open-telemetry/opentelemetry-proto/blob/b3060d2104df364136d75a35779e6bd48bac449a/opentelemetry/proto/common/v1/common.proto#L73) field with the `target` from `LogRecord`, if target is populated.
- Groups batch of `LogRecord` and `Span` by their resource and instrumentation scope before exporting, for better efficiency [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873).

- **Breaking** Update to `http` v1 and `tonic` v0.12 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674)

## v0.16.0

Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ publish = false
[features]
default = ["reqwest"]
reqwest = ["opentelemetry-otlp/reqwest-client"]
hyper = ["dep:async-trait", "dep:http", "dep:hyper", "dep:opentelemetry-http", "dep:bytes"]
hyper = ["dep:async-trait", "dep:http", "dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:opentelemetry-http", "dep:bytes"]


[dependencies]
Expand All @@ -23,7 +23,9 @@ opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-c
async-trait = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
http = { workspace = true, optional = true }
http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, features = ["client"], optional = true }
hyper-util = { workspace = true, features = ["client-legacy"], optional = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true, features = ["std"]}
tracing-core = { workspace = true }
Expand Down
18 changes: 11 additions & 7 deletions opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
use async_trait::async_trait;
use bytes::Bytes;
use http::{Request, Response};
use hyper::{
client::{connect::Connect, HttpConnector},
Body, Client,
use http_body_util::{BodyExt, Full};
use hyper_util::{
client::legacy::{
connect::{Connect, HttpConnector},
Client,
},
rt::TokioExecutor,
};
use opentelemetry_http::{HttpClient, HttpError, ResponseExt};

pub struct HyperClient<C> {
inner: hyper::Client<C>,
inner: hyper_util::client::legacy::Client<C, Full<Bytes>>,
}

impl Default for HyperClient<HttpConnector> {
fn default() -> Self {
Self {
inner: Client::new(),
inner: Client::builder(TokioExecutor::new()).build_http(),
}
}
}
Expand All @@ -30,15 +34,15 @@ impl<C> std::fmt::Debug for HyperClient<C> {
#[async_trait]
impl<C: Connect + Clone + Send + Sync + 'static> HttpClient for HyperClient<C> {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let request = request.map(Body::from);
let request = request.map(|body| Full::new(Bytes::from(body)));

let (parts, body) = self
.inner
.request(request)
.await?
.error_for_status()?
.into_parts();
let body = hyper::body::to_bytes(body).await?;
let body = body.collect().await?.to_bytes();

Ok(Response::from_parts(parts, body))
}
Expand Down
Loading

0 comments on commit 621a5a9

Please sign in to comment.