From d318d7a19013f5901230ec60d4547b1a1c18d05f Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 23 Jul 2020 12:09:02 +0300 Subject: [PATCH 1/7] Remove `Resolution` trait Signed-off-by: Zahari Dichev --- linkerd/proxy/api-resolve/src/resolve.rs | 21 +++++----- linkerd/proxy/core/src/lib.rs | 2 +- linkerd/proxy/core/src/resolve.rs | 46 ++++++++++------------ linkerd/proxy/discover/src/from_resolve.rs | 45 ++++++++++++--------- linkerd/proxy/discover/src/lib.rs | 4 +- linkerd/proxy/resolve/src/make_unpin.rs | 15 +++---- linkerd/proxy/resolve/src/map_endpoint.rs | 38 ++++++++++-------- linkerd/proxy/resolve/src/recover.rs | 34 ++++++++-------- 8 files changed, 106 insertions(+), 99 deletions(-) diff --git a/linkerd/proxy/api-resolve/src/resolve.rs b/linkerd/proxy/api-resolve/src/resolve.rs index 78076229a5..13826e6b32 100644 --- a/linkerd/proxy/api-resolve/src/resolve.rs +++ b/linkerd/proxy/api-resolve/src/resolve.rs @@ -104,14 +104,10 @@ where } } -impl resolve::Resolution for Resolution { - type Endpoint = Metadata; - type Error = grpc::Status; +impl Stream for Resolution { + type Item = Result, grpc::Status>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); loop { match ready!(this.inner.as_mut().poll_next(cx)) { @@ -126,7 +122,7 @@ impl resolve::Resolution for Resolution { .collect::>(); if !addr_metas.is_empty() { debug!(endpoints = %addr_metas.len(), "Add"); - return Poll::Ready(Ok(Update::Add(addr_metas))); + return Poll::Ready(Some(Ok(Update::Add(addr_metas)))); } } @@ -137,7 +133,7 @@ impl resolve::Resolution for Resolution { .collect::>(); if !sock_addrs.is_empty() { debug!(endpoints = %sock_addrs.len(), "Remove"); - return Poll::Ready(Ok(Update::Remove(sock_addrs))); + return Poll::Ready(Some(Ok(Update::Remove(sock_addrs)))); } } @@ -148,13 +144,16 @@ impl resolve::Resolution for Resolution { } else { Update::DoesNotExist }; - return Poll::Ready(Ok(update.into())); + return Poll::Ready(Some(Ok(update.into()))); } None => {} // continue }, None => { - return Poll::Ready(Err(grpc::Status::new(grpc::Code::Ok, "end of stream"))) + return Poll::Ready(Some(Err(grpc::Status::new( + grpc::Code::Ok, + "end of stream", + )))) } }; } diff --git a/linkerd/proxy/core/src/lib.rs b/linkerd/proxy/core/src/lib.rs index 7102ead4cb..0e5a43e03b 100644 --- a/linkerd/proxy/core/src/lib.rs +++ b/linkerd/proxy/core/src/lib.rs @@ -2,4 +2,4 @@ pub mod resolve; -pub use self::resolve::{Resolution, Resolve}; +pub use self::{resolve::Resolve, resolve::Update}; diff --git a/linkerd/proxy/core/src/resolve.rs b/linkerd/proxy/core/src/resolve.rs index 7fe22fb5ef..4a22d7f99d 100644 --- a/linkerd/proxy/core/src/resolve.rs +++ b/linkerd/proxy/core/src/resolve.rs @@ -1,3 +1,4 @@ +use futures::stream::TryStream; use linkerd2_error::Error; use std::future::Future; use std::net::SocketAddr; @@ -8,7 +9,7 @@ use std::task::{Context, Poll}; pub trait Resolve { type Endpoint; type Error: Into; - type Resolution: Resolution; + type Resolution: TryStream, Error = Self::Error>; type Future: Future>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; @@ -23,27 +24,6 @@ pub trait Resolve { } } -/// An infinite stream of endpoint updates. -pub trait Resolution { - type Endpoint; - type Error: Into; - - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>>; - - fn poll_unpin( - &mut self, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> - where - Self: Unpin, - { - Pin::new(self).poll(cx) - } -} - #[derive(Clone, Debug)] pub struct Service(S); @@ -57,13 +37,13 @@ pub enum Update { // === impl Resolve === -impl Resolve for S +impl Resolve for S where S: tower::Service, S::Error: Into, - R: Resolution, + R: TryStream, Error = S::Error>, { - type Endpoint = ::Endpoint; + type Endpoint = E; type Error = S::Error; type Resolution = S::Response; type Future = S::Future; @@ -100,3 +80,19 @@ where self.0.resolve(target) } } + +impl, Error = T>> ResolutionStreamExt for S {} + +pub trait ResolutionStreamExt: TryStream, Error = T> { + fn next_update(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll, T>> { + self.try_poll_next(cx) + .map(|result| result.expect("resolution stream never ends")) + } + + fn next_update_pin(&mut self, cx: &mut Context<'_>) -> Poll, T>> + where + Self: Unpin, + { + Pin::new(self).next_update(cx) + } +} diff --git a/linkerd/proxy/discover/src/from_resolve.rs b/linkerd/proxy/discover/src/from_resolve.rs index a031b0fc88..79d6818431 100644 --- a/linkerd/proxy/discover/src/from_resolve.rs +++ b/linkerd/proxy/discover/src/from_resolve.rs @@ -1,6 +1,6 @@ -use futures::{ready, Stream, TryFuture}; +use futures::{ready, Stream, TryFuture, TryStream}; use indexmap::IndexSet; -use linkerd2_proxy_core::resolve::{Resolution, Resolve, Update}; +use linkerd2_proxy_core::resolve::{ResolutionStreamExt, Resolve, Update}; use pin_project::pin_project; use std::collections::VecDeque; use std::future::Future; @@ -10,45 +10,50 @@ use std::task::{Context, Poll}; use tower::discover::Change; #[derive(Clone, Debug)] -pub struct FromResolve { +pub struct FromResolve { resolve: R, + _marker: std::marker::PhantomData, } #[pin_project] #[derive(Debug)] -pub struct DiscoverFuture { +pub struct DiscoverFuture { #[pin] future: F, + _marker: std::marker::PhantomData, } /// Observes an `R`-typed resolution stream, using an `M`-typed endpoint stack to /// build a service for each endpoint. #[pin_project] -pub struct Discover { +pub struct Discover { #[pin] resolution: R, active: IndexSet, - pending: VecDeque>, + pending: VecDeque>, } // === impl FromResolve === -impl FromResolve { +impl FromResolve { pub fn new(resolve: R) -> Self where R: Resolve, { - Self { resolve } + Self { + resolve, + _marker: std::marker::PhantomData, + } } } -impl tower::Service for FromResolve +impl tower::Service for FromResolve where R: Resolve + Clone, { - type Response = Discover; + type Response = Discover; type Error = R::Error; - type Future = DiscoverFuture; + type Future = DiscoverFuture; #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { @@ -59,18 +64,19 @@ where fn call(&mut self, target: T) -> Self::Future { Self::Future { future: self.resolve.resolve(target), + _marker: std::marker::PhantomData, } } } // === impl DiscoverFuture === -impl Future for DiscoverFuture +impl Future for DiscoverFuture where F: TryFuture, - F::Ok: Resolution, + F::Ok: TryStream, { - type Output = Result, F::Error>; + type Output = Result, F::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let resolution = ready!(self.project().future.try_poll(cx))?; @@ -80,7 +86,7 @@ where // === impl Discover === -impl Discover { +impl Discover { pub fn new(resolution: R) -> Self { Self { resolution, @@ -90,8 +96,11 @@ impl Discover { } } -impl Stream for Discover { - type Item = Result, R::Error>; +impl Stream for Discover +where + R: TryStream>, +{ + type Item = Result, R::Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { @@ -100,7 +109,7 @@ impl Stream for Discover { return Poll::Ready(Some(Ok(change))); } - match ready!(this.resolution.poll(cx))? { + match ready!(this.resolution.next_update(cx))? { Update::Add(endpoints) => { for (addr, endpoint) in endpoints.into_iter() { this.active.insert(addr); diff --git a/linkerd/proxy/discover/src/lib.rs b/linkerd/proxy/discover/src/lib.rs index 798fde7ff3..cedc7faea7 100644 --- a/linkerd/proxy/discover/src/lib.rs +++ b/linkerd/proxy/discover/src/lib.rs @@ -43,7 +43,7 @@ where T: fmt::Display, R: Resolve + Send + Clone + 'static, R::Error: Into, - R::Endpoint: fmt::Debug + Clone + PartialEq + Send, + R::Endpoint: fmt::Debug + Clone + PartialEq + Send + 'static, R::Resolution: Send + 'static, R::Future: Send + 'static, M: tower::Service + Clone + Send + 'static, @@ -51,7 +51,7 @@ where M::Response: Send + 'static, M::Future: Send + 'static, { - type Service = Buffer, M>>; + type Service = Buffer, M>>; fn layer(&self, make_endpoint: M) -> Self::Service { let make_discover = diff --git a/linkerd/proxy/resolve/src/make_unpin.rs b/linkerd/proxy/resolve/src/make_unpin.rs index fef01e3aa5..fad1a82e6c 100644 --- a/linkerd/proxy/resolve/src/make_unpin.rs +++ b/linkerd/proxy/resolve/src/make_unpin.rs @@ -1,5 +1,6 @@ +use futures::stream::{Stream, TryStream}; use futures::TryFuture; -use linkerd2_proxy_core::resolve::{self, Resolution, Update}; +use linkerd2_proxy_core::resolve; use pin_project::pin_project; use std::future::Future; use std::pin::Pin; @@ -38,15 +39,11 @@ impl MakeUnpin { } } -impl Resolution for MakeUnpin { - type Endpoint = T::Endpoint; - type Error = T::Error; +impl Stream for MakeUnpin { + type Item = Result; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - self.project().0.as_mut().as_mut().poll(cx) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.as_mut().as_mut().try_poll_next(cx) } } diff --git a/linkerd/proxy/resolve/src/map_endpoint.rs b/linkerd/proxy/resolve/src/map_endpoint.rs index 6032ec93f2..01962c7a92 100644 --- a/linkerd/proxy/resolve/src/map_endpoint.rs +++ b/linkerd/proxy/resolve/src/map_endpoint.rs @@ -1,7 +1,11 @@ //! A middleware that wraps `Resolutions`, modifying their endpoint type. +use futures::stream::Stream; +use futures::stream::TryStream; use futures::{ready, TryFuture}; +use linkerd2_error::Error; use linkerd2_proxy_core::resolve; +use linkerd2_proxy_core::resolve::ResolutionStreamExt; use pin_project::pin_project; use std::future::Future; use std::net::SocketAddr; @@ -30,11 +34,12 @@ pub struct ResolveFuture { #[pin_project] #[derive(Clone, Debug)] -pub struct Resolution { +pub struct Resolution { #[pin] resolution: R, target: T, map: M, + _marker: std::marker::PhantomData, } // === impl Resolve === @@ -54,7 +59,7 @@ where R: resolve::Resolve, M: MapEndpoint + Clone, { - type Response = Resolution; + type Response = Resolution; type Error = R::Error; type Future = ResolveFuture; @@ -76,13 +81,14 @@ where // === impl ResolveFuture === -impl Future for ResolveFuture +impl Future for ResolveFuture where F: TryFuture, - F::Ok: resolve::Resolution, - M: MapEndpoint::Endpoint>, + F::Ok: TryStream>, + ::Error: Into, + M: MapEndpoint, { - type Output = Result, F::Error>; + type Output = Result, F::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -93,26 +99,24 @@ where resolution, target, map, + _marker: std::marker::PhantomData, })) } } // === impl Resolution === -impl resolve::Resolution for Resolution +impl Stream for Resolution where - R: resolve::Resolution, - M: MapEndpoint, + R: TryStream>, + R::Error: Into, + M: MapEndpoint, { - type Endpoint = M::Out; - type Error = R::Error; + type Item = Result, R::Error>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - let update = match ready!(this.resolution.poll(cx))? { + let update = match ready!(this.resolution.next_update(cx))? { resolve::Update::Add(eps) => { let mut update = Vec::new(); for (a, ep) in eps.into_iter() { @@ -126,7 +130,7 @@ where resolve::Update::DoesNotExist => resolve::Update::DoesNotExist, resolve::Update::Empty => resolve::Update::Empty, }; - Poll::Ready(Ok(update)) + Poll::Ready(Some(Ok(update))) } } diff --git a/linkerd/proxy/resolve/src/recover.rs b/linkerd/proxy/resolve/src/recover.rs index 0c562bfd20..b5f9e9113d 100644 --- a/linkerd/proxy/resolve/src/recover.rs +++ b/linkerd/proxy/resolve/src/recover.rs @@ -1,9 +1,10 @@ //! A middleware that recovers a resolution after some failures. -use futures::{prelude::*, ready}; +use futures::stream::TryStream; +use futures::{prelude::*, ready, FutureExt, Stream}; use indexmap::IndexMap; use linkerd2_error::{Error, Recover}; -use linkerd2_proxy_core::resolve::{self, Resolution as _, Update}; +use linkerd2_proxy_core::resolve::{self, ResolutionStreamExt, Update}; use pin_project::pin_project; use std::future::Future; use std::net::SocketAddr; @@ -42,7 +43,7 @@ struct Cache { } #[pin_project] -enum State { +enum State { Disconnected { backoff: Option, }, @@ -62,7 +63,7 @@ enum State { Connected { #[pin] resolution: R, - initial: Option>, + initial: Option, }, Recover { @@ -151,7 +152,7 @@ where // === impl Resolution === -impl resolve::Resolution for Resolution +impl Stream for Resolution where T: Clone, R: resolve::Resolve, @@ -161,20 +162,16 @@ where E: Recover, E::Backoff: Unpin, { - type Endpoint = R::Endpoint; - type Error = Error; + type Item = Result, Error>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); loop { // If a reconciliation update is buffered (i.e. after // reconcile_after_reconnect), process it immediately. if let Some(update) = this.reconcile.take() { this.update_active(&update); - return Poll::Ready(Ok(update)); + return Poll::Ready(Some(Ok(update))); } match this.inner.state { @@ -195,17 +192,18 @@ where { *this.reconcile = reconcile; this.update_active(&update); - return Poll::Ready(Ok(update)); + return Poll::Ready(Some(Ok(update))); } } // Process the resolution stream, updating the cache. // // Attempt recovery/backoff if the resolution fails. - match ready!(resolution.poll_unpin(cx)) { + + match ready!(resolution.next_update_pin(cx)) { Ok(update) => { this.update_active(&update); - return Poll::Ready(Ok(update)); + return Poll::Ready(Some(Ok(update))); } Err(e) => { this.inner.state = State::Recover { @@ -300,7 +298,11 @@ where State::Pending { ref mut resolution, ref mut backoff, - } => match ready!(resolution.as_mut().expect("illegal state").poll_unpin(cx)) { + } => match ready!(resolution + .as_mut() + .expect("illegal state") + .next_update_pin(cx)) + { Err(e) => State::Recover { error: Some(e.into()), backoff: backoff.take(), From 4a65a63d842b0db34bff38f51804dc50f1a7e5c3 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 23 Jul 2020 12:36:00 +0300 Subject: [PATCH 2/7] Move things around Signed-off-by: Zahari Dichev --- linkerd/proxy/core/src/resolve.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/linkerd/proxy/core/src/resolve.rs b/linkerd/proxy/core/src/resolve.rs index 4a22d7f99d..6bf95a758e 100644 --- a/linkerd/proxy/core/src/resolve.rs +++ b/linkerd/proxy/core/src/resolve.rs @@ -81,8 +81,6 @@ where } } -impl, Error = T>> ResolutionStreamExt for S {} - pub trait ResolutionStreamExt: TryStream, Error = T> { fn next_update(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll, T>> { self.try_poll_next(cx) @@ -96,3 +94,5 @@ pub trait ResolutionStreamExt: TryStream, Error = T> { Pin::new(self).next_update(cx) } } + +impl, Error = T>> ResolutionStreamExt for S {} From 0f846347eeab4340cd0ae8261e6133142181c920 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 23 Jul 2020 12:37:11 +0300 Subject: [PATCH 3/7] Update comment Signed-off-by: Zahari Dichev --- linkerd/proxy/core/src/resolve.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkerd/proxy/core/src/resolve.rs b/linkerd/proxy/core/src/resolve.rs index 6bf95a758e..e7eeec9ca5 100644 --- a/linkerd/proxy/core/src/resolve.rs +++ b/linkerd/proxy/core/src/resolve.rs @@ -5,7 +5,7 @@ use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; -/// Resolves `T`-typed names/addresses as a `Resolution`. +/// Resolves `T`-typed names/addresses as an infinite stream of `Update`. pub trait Resolve { type Endpoint; type Error: Into; From 7f5887e940b2cc6fc708d9e7303214395867062d Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 23 Jul 2020 13:19:19 +0300 Subject: [PATCH 4/7] Rename methods Signed-off-by: Zahari Dichev --- linkerd/proxy/core/src/resolve.rs | 6 +++--- linkerd/proxy/discover/src/from_resolve.rs | 2 +- linkerd/proxy/resolve/src/map_endpoint.rs | 2 +- linkerd/proxy/resolve/src/recover.rs | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/linkerd/proxy/core/src/resolve.rs b/linkerd/proxy/core/src/resolve.rs index e7eeec9ca5..2446b193eb 100644 --- a/linkerd/proxy/core/src/resolve.rs +++ b/linkerd/proxy/core/src/resolve.rs @@ -82,16 +82,16 @@ where } pub trait ResolutionStreamExt: TryStream, Error = T> { - fn next_update(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll, T>> { + fn poll_next_update(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll, T>> { self.try_poll_next(cx) .map(|result| result.expect("resolution stream never ends")) } - fn next_update_pin(&mut self, cx: &mut Context<'_>) -> Poll, T>> + fn poll_next_update_unpin(&mut self, cx: &mut Context<'_>) -> Poll, T>> where Self: Unpin, { - Pin::new(self).next_update(cx) + Pin::new(self).poll_next_update(cx) } } diff --git a/linkerd/proxy/discover/src/from_resolve.rs b/linkerd/proxy/discover/src/from_resolve.rs index 79d6818431..cc8bd11fa2 100644 --- a/linkerd/proxy/discover/src/from_resolve.rs +++ b/linkerd/proxy/discover/src/from_resolve.rs @@ -109,7 +109,7 @@ where return Poll::Ready(Some(Ok(change))); } - match ready!(this.resolution.next_update(cx))? { + match ready!(this.resolution.poll_next_update(cx))? { Update::Add(endpoints) => { for (addr, endpoint) in endpoints.into_iter() { this.active.insert(addr); diff --git a/linkerd/proxy/resolve/src/map_endpoint.rs b/linkerd/proxy/resolve/src/map_endpoint.rs index 01962c7a92..61ddc8607c 100644 --- a/linkerd/proxy/resolve/src/map_endpoint.rs +++ b/linkerd/proxy/resolve/src/map_endpoint.rs @@ -116,7 +116,7 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - let update = match ready!(this.resolution.next_update(cx))? { + let update = match ready!(this.resolution.poll_next_update(cx))? { resolve::Update::Add(eps) => { let mut update = Vec::new(); for (a, ep) in eps.into_iter() { diff --git a/linkerd/proxy/resolve/src/recover.rs b/linkerd/proxy/resolve/src/recover.rs index b5f9e9113d..cd59a1ff36 100644 --- a/linkerd/proxy/resolve/src/recover.rs +++ b/linkerd/proxy/resolve/src/recover.rs @@ -200,7 +200,7 @@ where // // Attempt recovery/backoff if the resolution fails. - match ready!(resolution.next_update_pin(cx)) { + match ready!(resolution.poll_next_update_unpin(cx)) { Ok(update) => { this.update_active(&update); return Poll::Ready(Some(Ok(update))); @@ -301,7 +301,7 @@ where } => match ready!(resolution .as_mut() .expect("illegal state") - .next_update_pin(cx)) + .poll_next_update_unpin(cx)) { Err(e) => State::Recover { error: Some(e.into()), From 02e7edb0a6b4f530c0296862ab9c5e63ab5872cb Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Fri, 24 Jul 2020 14:24:43 +0300 Subject: [PATCH 5/7] Avoid Ext trait and do not panic Signed-off-by: Zahari Dichev --- linkerd/proxy/api-resolve/src/resolve.rs | 7 +---- linkerd/proxy/core/src/resolve.rs | 17 ----------- linkerd/proxy/discover/src/from_resolve.rs | 35 ++++++++++++---------- linkerd/proxy/resolve/src/map_endpoint.rs | 28 +++++++++-------- linkerd/proxy/resolve/src/recover.rs | 22 +++++++++----- 5 files changed, 49 insertions(+), 60 deletions(-) diff --git a/linkerd/proxy/api-resolve/src/resolve.rs b/linkerd/proxy/api-resolve/src/resolve.rs index 13826e6b32..18823da0de 100644 --- a/linkerd/proxy/api-resolve/src/resolve.rs +++ b/linkerd/proxy/api-resolve/src/resolve.rs @@ -149,12 +149,7 @@ impl Stream for Resolution { None => {} // continue }, - None => { - return Poll::Ready(Some(Err(grpc::Status::new( - grpc::Code::Ok, - "end of stream", - )))) - } + None => return Poll::Ready(None), }; } } diff --git a/linkerd/proxy/core/src/resolve.rs b/linkerd/proxy/core/src/resolve.rs index 2446b193eb..16f9362fba 100644 --- a/linkerd/proxy/core/src/resolve.rs +++ b/linkerd/proxy/core/src/resolve.rs @@ -2,7 +2,6 @@ use futures::stream::TryStream; use linkerd2_error::Error; use std::future::Future; use std::net::SocketAddr; -use std::pin::Pin; use std::task::{Context, Poll}; /// Resolves `T`-typed names/addresses as an infinite stream of `Update`. @@ -80,19 +79,3 @@ where self.0.resolve(target) } } - -pub trait ResolutionStreamExt: TryStream, Error = T> { - fn poll_next_update(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll, T>> { - self.try_poll_next(cx) - .map(|result| result.expect("resolution stream never ends")) - } - - fn poll_next_update_unpin(&mut self, cx: &mut Context<'_>) -> Poll, T>> - where - Self: Unpin, - { - Pin::new(self).poll_next_update(cx) - } -} - -impl, Error = T>> ResolutionStreamExt for S {} diff --git a/linkerd/proxy/discover/src/from_resolve.rs b/linkerd/proxy/discover/src/from_resolve.rs index cc8bd11fa2..feb8370864 100644 --- a/linkerd/proxy/discover/src/from_resolve.rs +++ b/linkerd/proxy/discover/src/from_resolve.rs @@ -1,6 +1,6 @@ use futures::{ready, Stream, TryFuture, TryStream}; use indexmap::IndexSet; -use linkerd2_proxy_core::resolve::{ResolutionStreamExt, Resolve, Update}; +use linkerd2_proxy_core::resolve::{Resolve, Update}; use pin_project::pin_project; use std::collections::VecDeque; use std::future::Future; @@ -109,24 +109,27 @@ where return Poll::Ready(Some(Ok(change))); } - match ready!(this.resolution.poll_next_update(cx))? { - Update::Add(endpoints) => { - for (addr, endpoint) in endpoints.into_iter() { - this.active.insert(addr); - this.pending.push_back(Change::Insert(addr, endpoint)); + match ready!(this.resolution.try_poll_next(cx)) { + Some(update) => match update? { + Update::Add(endpoints) => { + for (addr, endpoint) in endpoints.into_iter() { + this.active.insert(addr); + this.pending.push_back(Change::Insert(addr, endpoint)); + } } - } - Update::Remove(addrs) => { - for addr in addrs.into_iter() { - if this.active.remove(&addr) { - this.pending.push_back(Change::Remove(addr)); + Update::Remove(addrs) => { + for addr in addrs.into_iter() { + if this.active.remove(&addr) { + this.pending.push_back(Change::Remove(addr)); + } } } - } - Update::DoesNotExist | Update::Empty => { - this.pending - .extend(this.active.drain(..).map(Change::Remove)); - } + Update::DoesNotExist | Update::Empty => { + this.pending + .extend(this.active.drain(..).map(Change::Remove)); + } + }, + None => return Poll::Ready(None), } } } diff --git a/linkerd/proxy/resolve/src/map_endpoint.rs b/linkerd/proxy/resolve/src/map_endpoint.rs index 61ddc8607c..5b31f74bf1 100644 --- a/linkerd/proxy/resolve/src/map_endpoint.rs +++ b/linkerd/proxy/resolve/src/map_endpoint.rs @@ -5,7 +5,6 @@ use futures::stream::TryStream; use futures::{ready, TryFuture}; use linkerd2_error::Error; use linkerd2_proxy_core::resolve; -use linkerd2_proxy_core::resolve::ResolutionStreamExt; use pin_project::pin_project; use std::future::Future; use std::net::SocketAddr; @@ -116,19 +115,22 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - let update = match ready!(this.resolution.poll_next_update(cx))? { - resolve::Update::Add(eps) => { - let mut update = Vec::new(); - for (a, ep) in eps.into_iter() { - let ep = this.map.map_endpoint(&this.target, a, ep); - update.push((a, ep)); + let update = match ready!(this.resolution.try_poll_next(cx)) { + Some(result) => match result? { + resolve::Update::Add(eps) => { + let mut update = Vec::new(); + for (a, ep) in eps.into_iter() { + let ep = this.map.map_endpoint(&this.target, a, ep); + update.push((a, ep)); + } + + resolve::Update::Add(update) } - - resolve::Update::Add(update) - } - resolve::Update::Remove(addrs) => resolve::Update::Remove(addrs), - resolve::Update::DoesNotExist => resolve::Update::DoesNotExist, - resolve::Update::Empty => resolve::Update::Empty, + resolve::Update::Remove(addrs) => resolve::Update::Remove(addrs), + resolve::Update::DoesNotExist => resolve::Update::DoesNotExist, + resolve::Update::Empty => resolve::Update::Empty, + }, + None => return Poll::Ready(None), }; Poll::Ready(Some(Ok(update))) } diff --git a/linkerd/proxy/resolve/src/recover.rs b/linkerd/proxy/resolve/src/recover.rs index cd59a1ff36..da38c855b5 100644 --- a/linkerd/proxy/resolve/src/recover.rs +++ b/linkerd/proxy/resolve/src/recover.rs @@ -4,7 +4,7 @@ use futures::stream::TryStream; use futures::{prelude::*, ready, FutureExt, Stream}; use indexmap::IndexMap; use linkerd2_error::{Error, Recover}; -use linkerd2_proxy_core::resolve::{self, ResolutionStreamExt, Update}; +use linkerd2_proxy_core::resolve::{self, Update}; use pin_project::pin_project; use std::future::Future; use std::net::SocketAddr; @@ -71,6 +71,8 @@ enum State { backoff: Option, }, + Eos, + Backoff(Option), } @@ -200,19 +202,22 @@ where // // Attempt recovery/backoff if the resolution fails. - match ready!(resolution.poll_next_update_unpin(cx)) { - Ok(update) => { + match ready!(resolution.try_poll_next_unpin(cx)) { + Some(Ok(update)) => { this.update_active(&update); return Poll::Ready(Some(Ok(update))); } - Err(e) => { + Some(Err(e)) => { this.inner.state = State::Recover { error: Some(e.into()), backoff: None, } } + None => return Poll::Ready(None), } } + + State::Eos => return Poll::Ready(None), // XXX(eliza): note that this match was originally an `if let`, // but that doesn't work with `#[project]` for some kinda reason _ => {} @@ -301,22 +306,23 @@ where } => match ready!(resolution .as_mut() .expect("illegal state") - .poll_next_update_unpin(cx)) + .try_poll_next_unpin(cx)) { - Err(e) => State::Recover { + Some(Err(e)) => State::Recover { error: Some(e.into()), backoff: backoff.take(), }, - Ok(initial) => { + Some(Ok(initial)) => { tracing::trace!("connected"); State::Connected { resolution: resolution.take().expect("illegal state"), initial: Some(initial), } } + None => State::Eos, }, - State::Connected { .. } => return Poll::Ready(Ok(())), + State::Connected { .. } | State::Eos => return Poll::Ready(Ok(())), // If any stage failed, try to recover. If the error is // recoverable, start (or continue) backing off... From c52a1f700d41692a3ec0e345039c53daf414b0de Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Fri, 7 Aug 2020 14:35:05 +0300 Subject: [PATCH 6/7] Do not expect a Some(_) Signed-off-by: Zahari Dichev --- linkerd/proxy/discover/src/make_endpoint.rs | 23 ++++++++++----------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/linkerd/proxy/discover/src/make_endpoint.rs b/linkerd/proxy/discover/src/make_endpoint.rs index 6d44e30fb7..ae67f80410 100644 --- a/linkerd/proxy/discover/src/make_endpoint.rs +++ b/linkerd/proxy/discover/src/make_endpoint.rs @@ -195,18 +195,17 @@ where // services. Don't process any updates until we can do so. ready!(this.make_endpoint.poll_ready(cx)).map_err(Into::into)?; - match ready!(this.discover.poll_discover(cx)) - .expect("XXX(eliza): can this ever be none???") - .map_err(Into::into)? - { - Change::Insert(key, target) => { - // Start building the service and continue. If a pending - // service exists for this addr, it will be canceled. - let fut = this.make_endpoint.call(target); - this.make_futures.push(key, fut); - } - Change::Remove(key) => { - this.pending_removals.push(key); + if let Some(change) = ready!(this.discover.poll_discover(cx)) { + match change.map_err(Into::into)? { + Change::Insert(key, target) => { + // Start building the service and continue. If a pending + // service exists for this addr, it will be canceled. + let fut = this.make_endpoint.call(target); + this.make_futures.push(key, fut); + } + Change::Remove(key) => { + this.pending_removals.push(key); + } } } } From e941ec98cc23a965e35bc2301afed4a052fc105c Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Sun, 9 Aug 2020 22:04:20 +0300 Subject: [PATCH 7/7] Fix hanging test Signed-off-by: Zahari Dichev --- linkerd/proxy/resolve/src/recover.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/linkerd/proxy/resolve/src/recover.rs b/linkerd/proxy/resolve/src/recover.rs index da38c855b5..0db7e2b66b 100644 --- a/linkerd/proxy/resolve/src/recover.rs +++ b/linkerd/proxy/resolve/src/recover.rs @@ -11,6 +11,9 @@ use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; +#[derive(Clone, Debug)] +pub struct Eos(()); + #[derive(Clone, Debug)] pub struct Resolve { resolve: R, @@ -71,8 +74,6 @@ enum State { backoff: Option, }, - Eos, - Backoff(Option), } @@ -216,8 +217,6 @@ where None => return Poll::Ready(None), } } - - State::Eos => return Poll::Ready(None), // XXX(eliza): note that this match was originally an `if let`, // but that doesn't work with `#[project]` for some kinda reason _ => {} @@ -319,10 +318,13 @@ where initial: Some(initial), } } - None => State::Eos, + None => State::Recover { + error: Some(Eos(()).into()), + backoff: backoff.take(), + }, }, - State::Connected { .. } | State::Eos => return Poll::Ready(Ok(())), + State::Connected { .. } => return Poll::Ready(Ok(())), // If any stage failed, try to recover. If the error is // recoverable, start (or continue) backing off... @@ -402,6 +404,14 @@ fn reconcile_after_connect( } } +impl std::fmt::Display for Eos { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "end of stream reached") + } +} + +impl std::error::Error for Eos {} + #[cfg(test)] mod tests { use super::*;