Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Resolution trait #606

Merged
merged 7 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions linkerd/proxy/api-resolve/src/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,10 @@ where
}
}

impl resolve::Resolution for Resolution {
type Endpoint = Metadata;
type Error = grpc::Status;
impl Stream for Resolution {
type Item = Result<resolve::Update<Metadata>, grpc::Status>;

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Update<Self::Endpoint>, Self::Error>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match ready!(this.inner.as_mut().poll_next(cx)) {
Expand All @@ -126,7 +122,7 @@ impl resolve::Resolution for Resolution {
.collect::<Vec<_>>();
if !addr_metas.is_empty() {
debug!(endpoints = %addr_metas.len(), "Add");
return Poll::Ready(Ok(Update::Add(addr_metas)));
return Poll::Ready(Some(Ok(Update::Add(addr_metas))));
}
}

Expand All @@ -137,7 +133,7 @@ impl resolve::Resolution for Resolution {
.collect::<Vec<_>>();
if !sock_addrs.is_empty() {
debug!(endpoints = %sock_addrs.len(), "Remove");
return Poll::Ready(Ok(Update::Remove(sock_addrs)));
return Poll::Ready(Some(Ok(Update::Remove(sock_addrs))));
}
}

Expand All @@ -148,14 +144,12 @@ impl resolve::Resolution for Resolution {
} else {
Update::DoesNotExist
};
return Poll::Ready(Ok(update.into()));
return Poll::Ready(Some(Ok(update.into())));
}

None => {} // continue
},
None => {
return Poll::Ready(Err(grpc::Status::new(grpc::Code::Ok, "end of stream")))
}
None => return Poll::Ready(None),
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

pub mod resolve;

pub use self::resolve::{Resolution, Resolve};
pub use self::{resolve::Resolve, resolve::Update};
33 changes: 6 additions & 27 deletions linkerd/proxy/core/src/resolve.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use futures::stream::TryStream;
use linkerd2_error::Error;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Resolves `T`-typed names/addresses as a `Resolution`.
/// Resolves `T`-typed names/addresses as an infinite stream of `Update<Self::Endpoint>`.
pub trait Resolve<T> {
type Endpoint;
type Error: Into<Error>;
type Resolution: Resolution<Endpoint = Self::Endpoint>;
type Resolution: TryStream<Ok = Update<Self::Endpoint>, Error = Self::Error>;
type Future: Future<Output = Result<Self::Resolution, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
Expand All @@ -23,27 +23,6 @@ pub trait Resolve<T> {
}
}

/// An infinite stream of endpoint updates.
pub trait Resolution {
type Endpoint;
type Error: Into<Error>;

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Update<Self::Endpoint>, Self::Error>>;

fn poll_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Update<Self::Endpoint>, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll(cx)
}
}

#[derive(Clone, Debug)]
pub struct Service<S>(S);

Expand All @@ -57,13 +36,13 @@ pub enum Update<T> {

// === impl Resolve ===

impl<S, T, R> Resolve<T> for S
impl<S, T, R, E> Resolve<T> for S
where
S: tower::Service<T, Response = R>,
S::Error: Into<Error>,
R: Resolution,
R: TryStream<Ok = Update<E>, Error = S::Error>,
{
type Endpoint = <R as Resolution>::Endpoint;
type Endpoint = E;
type Error = S::Error;
type Resolution = S::Response;
type Future = S::Future;
Expand Down
76 changes: 44 additions & 32 deletions linkerd/proxy/discover/src/from_resolve.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::{ready, Stream, TryFuture};
use futures::{ready, Stream, TryFuture, TryStream};
use indexmap::IndexSet;
use linkerd2_proxy_core::resolve::{Resolution, Resolve, Update};
use linkerd2_proxy_core::resolve::{Resolve, Update};
use pin_project::pin_project;
use std::collections::VecDeque;
use std::future::Future;
Expand All @@ -10,45 +10,50 @@ use std::task::{Context, Poll};
use tower::discover::Change;

#[derive(Clone, Debug)]
pub struct FromResolve<R> {
pub struct FromResolve<R, E> {
resolve: R,
_marker: std::marker::PhantomData<fn(E)>,
}

#[pin_project]
#[derive(Debug)]
pub struct DiscoverFuture<F> {
pub struct DiscoverFuture<F, E> {
#[pin]
future: F,
_marker: std::marker::PhantomData<fn(E)>,
}

/// Observes an `R`-typed resolution stream, using an `M`-typed endpoint stack to
/// build a service for each endpoint.
#[pin_project]
pub struct Discover<R: Resolution> {
pub struct Discover<R: TryStream, E> {
#[pin]
resolution: R,
active: IndexSet<SocketAddr>,
pending: VecDeque<Change<SocketAddr, R::Endpoint>>,
pending: VecDeque<Change<SocketAddr, E>>,
}

// === impl FromResolve ===

impl<R> FromResolve<R> {
impl<R, E> FromResolve<R, E> {
pub fn new<T>(resolve: R) -> Self
where
R: Resolve<T>,
{
Self { resolve }
Self {
resolve,
_marker: std::marker::PhantomData,
}
}
}

impl<T, R> tower::Service<T> for FromResolve<R>
impl<T, R, E> tower::Service<T> for FromResolve<R, E>
where
R: Resolve<T> + Clone,
{
type Response = Discover<R::Resolution>;
type Response = Discover<R::Resolution, E>;
type Error = R::Error;
type Future = DiscoverFuture<R::Future>;
type Future = DiscoverFuture<R::Future, E>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand All @@ -59,18 +64,19 @@ where
fn call(&mut self, target: T) -> Self::Future {
Self::Future {
future: self.resolve.resolve(target),
_marker: std::marker::PhantomData,
}
}
}

// === impl DiscoverFuture ===

impl<F> Future for DiscoverFuture<F>
impl<F, E> Future for DiscoverFuture<F, E>
where
F: TryFuture,
F::Ok: Resolution,
F::Ok: TryStream,
{
type Output = Result<Discover<F::Ok>, F::Error>;
type Output = Result<Discover<F::Ok, E>, F::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let resolution = ready!(self.project().future.try_poll(cx))?;
Expand All @@ -80,7 +86,7 @@ where

// === impl Discover ===

impl<R: Resolution> Discover<R> {
impl<R: TryStream, E> Discover<R, E> {
pub fn new(resolution: R) -> Self {
Self {
resolution,
Expand All @@ -90,8 +96,11 @@ impl<R: Resolution> Discover<R> {
}
}

impl<R: Resolution> Stream for Discover<R> {
type Item = Result<Change<SocketAddr, R::Endpoint>, R::Error>;
impl<R, E> Stream for Discover<R, E>
where
R: TryStream<Ok = Update<E>>,
{
type Item = Result<Change<SocketAddr, E>, R::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
Expand All @@ -100,24 +109,27 @@ impl<R: Resolution> Stream for Discover<R> {
return Poll::Ready(Some(Ok(change)));
}

match ready!(this.resolution.poll(cx))? {
Update::Add(endpoints) => {
for (addr, endpoint) in endpoints.into_iter() {
this.active.insert(addr);
this.pending.push_back(Change::Insert(addr, endpoint));
match ready!(this.resolution.try_poll_next(cx)) {
Some(update) => match update? {
Update::Add(endpoints) => {
for (addr, endpoint) in endpoints.into_iter() {
this.active.insert(addr);
this.pending.push_back(Change::Insert(addr, endpoint));
}
}
}
Update::Remove(addrs) => {
for addr in addrs.into_iter() {
if this.active.remove(&addr) {
this.pending.push_back(Change::Remove(addr));
Update::Remove(addrs) => {
for addr in addrs.into_iter() {
if this.active.remove(&addr) {
this.pending.push_back(Change::Remove(addr));
}
}
}
}
Update::DoesNotExist | Update::Empty => {
this.pending
.extend(this.active.drain(..).map(Change::Remove));
}
Update::DoesNotExist | Update::Empty => {
this.pending
.extend(this.active.drain(..).map(Change::Remove));
}
},
None => return Poll::Ready(None),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions linkerd/proxy/discover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ where
T: fmt::Display,
R: Resolve<T> + Send + Clone + 'static,
R::Error: Into<Error>,
R::Endpoint: fmt::Debug + Clone + PartialEq + Send,
R::Endpoint: fmt::Debug + Clone + PartialEq + Send + 'static,
R::Resolution: Send + 'static,
R::Future: Send + 'static,
M: tower::Service<R::Endpoint> + Clone + Send + 'static,
M::Error: Into<Error>,
M::Response: Send + 'static,
M::Future: Send + 'static,
{
type Service = Buffer<MakeEndpoint<FromResolve<R>, M>>;
type Service = Buffer<MakeEndpoint<FromResolve<R, R::Endpoint>, M>>;

fn layer(&self, make_endpoint: M) -> Self::Service {
let make_discover =
Expand Down
23 changes: 11 additions & 12 deletions linkerd/proxy/discover/src/make_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,17 @@ where
// services. Don't process any updates until we can do so.
ready!(this.make_endpoint.poll_ready(cx)).map_err(Into::into)?;

match ready!(this.discover.poll_discover(cx))
.expect("XXX(eliza): can this ever be none???")
.map_err(Into::into)?
{
Change::Insert(key, target) => {
// Start building the service and continue. If a pending
// service exists for this addr, it will be canceled.
let fut = this.make_endpoint.call(target);
this.make_futures.push(key, fut);
}
Change::Remove(key) => {
this.pending_removals.push(key);
if let Some(change) = ready!(this.discover.poll_discover(cx)) {
match change.map_err(Into::into)? {
Change::Insert(key, target) => {
// Start building the service and continue. If a pending
// service exists for this addr, it will be canceled.
let fut = this.make_endpoint.call(target);
this.make_futures.push(key, fut);
}
Change::Remove(key) => {
this.pending_removals.push(key);
}
}
}
}
Expand Down
15 changes: 6 additions & 9 deletions linkerd/proxy/resolve/src/make_unpin.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures::stream::{Stream, TryStream};
use futures::TryFuture;
use linkerd2_proxy_core::resolve::{self, Resolution, Update};
use linkerd2_proxy_core::resolve;
use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
Expand Down Expand Up @@ -38,15 +39,11 @@ impl<T> MakeUnpin<T> {
}
}

impl<T: Resolution> Resolution for MakeUnpin<T> {
type Endpoint = T::Endpoint;
type Error = T::Error;
impl<T: TryStream> Stream for MakeUnpin<T> {
type Item = Result<T::Ok, T::Error>;

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Update<Self::Endpoint>, Self::Error>> {
self.project().0.as_mut().as_mut().poll(cx)
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().0.as_mut().as_mut().try_poll_next(cx)
}
}

Expand Down
Loading