Skip to content

Commit

Permalink
Attempt to fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
  • Loading branch information
zaharidichev committed Jul 29, 2020
1 parent fa3c340 commit 4c7e488
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
15 changes: 9 additions & 6 deletions linkerd/app/integration/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,16 @@ impl pb::destination_server::Destination for Controller {
req: grpc::Request<pb::GetDestination>,
) -> Result<grpc::Response<Self::GetProfileStream>, grpc::Status> {
if let Ok(mut calls) = self.expect_profile_calls.lock() {
if let Some((dst, profile)) = calls.pop_front() {
if &dst == req.get_ref() {
return Ok(grpc::Response::new(Box::pin(profile.map(Ok))));
}
match calls.pop_front() {
Some((dst, profile)) => {
if &dst == req.get_ref() {
return Ok(grpc::Response::new(Box::pin(profile.map(Ok))));
}

calls.push_front((dst, profile));
return Err(grpc_unexpected_request());
calls.push_front((dst, profile));
return Err(grpc_unexpected_request());
}
None => return Ok(grpc::Response::new(Box::pin(futures::stream::empty()))),
}
}

Expand Down
20 changes: 11 additions & 9 deletions linkerd/proxy/discover/src/make_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ where
cx: &mut Context<'_>,
) -> Poll<Option<Result<Change<D::Key, E::Response>, Error>>> {
if let Poll::Ready(key) = self.poll_removals(cx) {
return Poll::Ready(Some(Ok(Change::Remove(key?))));
return match key {
Some(k) => Poll::Ready(Some(Ok(Change::Remove(k?)))),
None => Poll::Ready(None),
};
}

if let Poll::Ready(Some(res)) = self.project().make_futures.poll_next(cx) {
Expand All @@ -182,32 +185,31 @@ where
fn poll_removals(
self: &mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<D::Key, Error>> {
) -> Poll<Option<Result<D::Key, Error>>> {
loop {
let mut this = self.as_mut().project();
if let Some(key) = this.pending_removals.pop() {
this.make_futures.remove(&key);
return Poll::Ready(Ok(key));
return Poll::Ready(Some(Ok(key)));
}

// Before polling the resolution, where we could potentially receive
// an `Add`, poll_ready to ensure that `make` is ready to build new
// services. Don't process any updates until we can do so.
ready!(this.make_endpoint.poll_ready(cx)).map_err(Into::into)?;

match ready!(this.discover.poll_discover(cx))
.expect("XXX(eliza): can this ever be none???")
.map_err(Into::into)?
{
Change::Insert(key, target) => {
match ready!(this.discover.poll_discover(cx)) {
Some(Ok(Change::Insert(key, target))) => {
// Start building the service and continue. If a pending
// service exists for this addr, it will be canceled.
let fut = this.make_endpoint.call(target);
this.make_futures.push(key, fut);
}
Change::Remove(key) => {
Some(Ok(Change::Remove(key))) => {
this.pending_removals.push(key);
}
Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
None => return Poll::Ready(None),
}
}
}
Expand Down

0 comments on commit 4c7e488

Please sign in to comment.