Skip to content

Commit

Permalink
Move resolve api to async-stream (#599)
Browse files Browse the repository at this point in the history
Now that the resolution API uses a `Stream`, we can use the
`try_stream` macro to simplify its implementations.
  • Loading branch information
zaharidichev committed Aug 20, 2020
1 parent 6d58ee7 commit 591191c
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 72 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,7 @@ dependencies = [
name = "linkerd2-proxy-api-resolve"
version = "0.1.0"
dependencies = [
"async-stream",
"futures 0.3.5",
"http 0.2.1",
"http-body",
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/tests/orig_proto.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![deny(warnings, rust_2018_idioms)]
#![type_length_limit = "3361329"]
#![type_length_limit = "16289823"]

use linkerd2_app_integration::*;

Expand Down
1 change: 1 addition & 0 deletions linkerd/proxy/api-resolve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Implements the Resolve trait using the proxy's gRPC API
"""

[dependencies]
async-stream = "0.2.1"
futures = "0.3"
linkerd2-identity = { path = "../../identity" }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.13" }
Expand Down
1 change: 1 addition & 0 deletions linkerd/proxy/api-resolve/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![deny(warnings, rust_2018_idioms)]
#![recursion_limit = "512"]

use linkerd2_identity as identity;
use linkerd2_proxy_api as api;
Expand Down
123 changes: 60 additions & 63 deletions linkerd/proxy/api-resolve/src/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ use crate::core::resolve::{self, Update};
use crate::metadata::Metadata;
use crate::pb;
use api::destination_client::DestinationClient;
use futures::{ready, Stream};
use async_stream::try_stream;
use futures::{future, stream::StreamExt, Stream};
use http_body::Body as HttpBody;
use pin_project::pin_project;
use std::error::Error;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tonic::{
Expand All @@ -25,13 +24,7 @@ pub struct Resolve<S> {
context_token: String,
}

#[pin_project]
pub struct Resolution {
#[pin]
inner: grpc::Streaming<api::Update>,
}

// === impl Resolver ===
// === impl Resolve ===

impl<S> Resolve<S>
where
Expand Down Expand Up @@ -65,6 +58,9 @@ where
}
}

type UpdatesStream =
Pin<Box<dyn Stream<Item = Result<Update<Metadata>, grpc::Status>> + Send + 'static>>;

impl<T, S> Service<T> for Resolve<S>
where
T: ToString,
Expand All @@ -75,10 +71,9 @@ where
<S::ResponseBody as HttpBody>::Error: Into<Box<dyn Error + Send + Sync + 'static>> + Send,
S::Future: Send,
{
type Response = Resolution;
type Response = UpdatesStream;
type Error = grpc::Status;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// The future returned by the Tonic generated `DestinationClient`'s `get` method will drive the service to readiness before calling it, so we can always return `Ready` here.
Expand All @@ -88,69 +83,71 @@ where
fn call(&mut self, target: T) -> Self::Future {
let path = target.to_string();
debug!(dst = %path, context = %self.context_token, "Resolving");
let mut svc = self.service.clone();
let req = api::GetDestination {
path,
scheme: self.scheme.clone(),
context_token: self.context_token.clone(),
};
Box::pin(async move {
let rsp = svc.get(grpc::Request::new(req)).await?;
trace!(metadata = ?rsp.metadata());
Ok(Resolution {
inner: rsp.into_inner(),
})
})

future::ok(Box::pin(resolution(self.service.clone(), req)))
}
}

impl Stream for Resolution {
type Item = Result<resolve::Update<Metadata>, grpc::Status>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match ready!(this.inner.as_mut().poll_next(cx)) {
Some(update) => match update?.update {
Some(api::update::Update::Add(api::WeightedAddrSet {
addrs,
metric_labels,
})) => {
let addr_metas = addrs
.into_iter()
.filter_map(|addr| pb::to_addr_meta(addr, &metric_labels))
.collect::<Vec<_>>();
if !addr_metas.is_empty() {
debug!(endpoints = %addr_metas.len(), "Add");
return Poll::Ready(Some(Ok(Update::Add(addr_metas))));
}
fn resolution<S>(
mut client: DestinationClient<S>,
req: api::GetDestination,
) -> impl Stream<Item = Result<resolve::Update<Metadata>, grpc::Status>>
where
S: GrpcService<BoxBody> + Clone + Send + 'static,
S::Error: Into<Box<dyn Error + Send + Sync>> + Send,
S::ResponseBody: Send,
<S::ResponseBody as Body>::Data: Send,
<S::ResponseBody as HttpBody>::Error: Into<Box<dyn Error + Send + Sync + 'static>> + Send,
S::Future: Send,
{
try_stream! {
let rsp = client.get(grpc::Request::new(req)).await?;
trace!(metadata = ?rsp.metadata());
let mut stream = rsp.into_inner();
while let Some(update) = stream.next().await {
match update?.update {
Some(api::update::Update::Add(api::WeightedAddrSet {
addrs,
metric_labels,
})) => {
let addr_metas = addrs
.into_iter()
.filter_map(|addr| pb::to_addr_meta(addr, &metric_labels))
.collect::<Vec<_>>();
if !addr_metas.is_empty() {
debug!(endpoints = %addr_metas.len(), "Add");
yield Update::Add(addr_metas);
}
}

Some(api::update::Update::Remove(api::AddrSet { addrs })) => {
let sock_addrs = addrs
.into_iter()
.filter_map(pb::to_sock_addr)
.collect::<Vec<_>>();
if !sock_addrs.is_empty() {
debug!(endpoints = %sock_addrs.len(), "Remove");
return Poll::Ready(Some(Ok(Update::Remove(sock_addrs))));
}
Some(api::update::Update::Remove(api::AddrSet { addrs })) => {
let sock_addrs = addrs
.into_iter()
.filter_map(pb::to_sock_addr)
.collect::<Vec<_>>();
if !sock_addrs.is_empty() {
debug!(endpoints = %sock_addrs.len(), "Remove");
yield Update::Remove(sock_addrs);
}
}

Some(api::update::Update::NoEndpoints(api::NoEndpoints { exists })) => {
info!("No endpoints");
let update = if exists {
Update::Empty
} else {
Update::DoesNotExist
};
return Poll::Ready(Some(Ok(update.into())));
}
Some(api::update::Update::NoEndpoints(api::NoEndpoints { exists })) => {
info!("No endpoints");
let update = if exists {
Update::Empty
} else {
Update::DoesNotExist
};
yield update.into();
}

None => {} // continue
},
None => return Poll::Ready(None),
};
None => {} // continue
}
}
}
}
18 changes: 11 additions & 7 deletions linkerd/proxy/discover/src/make_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,10 @@ where
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Change<D::Key, E::Response>, Error>>> {
if let Poll::Ready(key) = self.poll_removals(cx) {
return Poll::Ready(Some(Ok(Change::Remove(key?))));
if let Poll::Ready(result) = self.poll_removals(cx) {
if let Some(key) = result? {
return Poll::Ready(Some(Ok(Change::Remove(key))));
}
}

if let Poll::Ready(Some(res)) = self.project().make_futures.poll_next(cx) {
Expand All @@ -182,21 +184,21 @@ where
fn poll_removals(
self: &mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<D::Key, Error>> {
) -> Poll<Result<Option<D::Key>, Error>> {
loop {
let mut this = self.as_mut().project();
if let Some(key) = this.pending_removals.pop() {
this.make_futures.remove(&key);
return Poll::Ready(Ok(key));
return Poll::Ready(Ok(Some(key)));
}

// Before polling the resolution, where we could potentially receive
// an `Add`, poll_ready to ensure that `make` is ready to build new
// services. Don't process any updates until we can do so.
ready!(this.make_endpoint.poll_ready(cx)).map_err(Into::into)?;

if let Some(change) = ready!(this.discover.poll_discover(cx)) {
match change.map_err(Into::into)? {
match ready!(this.discover.poll_discover(cx)) {
Some(change) => match change.map_err(Into::into)? {
Change::Insert(key, target) => {
// Start building the service and continue. If a pending
// service exists for this addr, it will be canceled.
Expand All @@ -206,7 +208,9 @@ where
Change::Remove(key) => {
this.pending_removals.push(key);
}
}
},

None => return Poll::Ready(Ok(None)),
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion linkerd/proxy/resolve/src/recover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,12 @@ where
backoff: None,
}
}
None => return Poll::Ready(None),
None => {
this.inner.state = State::Recover {
error: Some(Eos(()).into()),
backoff: None,
}
}
}
}
// XXX(eliza): note that this match was originally an `if let`,
Expand Down

0 comments on commit 591191c

Please sign in to comment.