New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix AsyncStream leak in kubernetes watch (#1694) #1714

Merged
merged 1 commit into from Nov 30, 2017

Conversation

Projects
None yet
5 participants
@sgrankin
Copy link
Contributor

sgrankin commented Nov 27, 2017

k8s watch returned an AsyncStream, the head of which was leaking (as
determined via heap dumps), leading to significant leaks over 24-48hrs.

Since the AsyncStream was not strictly necessary, rewrite watch as
an update function suitable for Var.async (as used by activity).

  • The change attempts to preserve previous bug fixes and comments
    documenting them.
  • The AsyncStream enrichments and their tests have been removed as they
    are no longer used.
  • A watch helper was added to ApiTest to hide the more complicated
    usage pattern around .watch.
  • Fix orphaned infinite-retry watches in ingress tests by adding
    no-data /watch/ handlers.

Fixes #1694

@siggy

This comment has been minimized.

Copy link
Member

siggy commented Nov 27, 2017

Ran this through our integration test suite. Results look good, memory is stable:
screen shot 2017-11-27 at 12 11 49 pm

@olix0r

olix0r approved these changes Nov 28, 2017

Copy link
Member

olix0r left a comment

Thank you for this high-quality PR!

I'd like to let @adleong take look at this before merging, since he's a little more familiar with this part of the code than I am; but the change looks excellent from my point of view.

* the watch to be terminated.
* @return An update function that may be passed to [[Var.async]] to create stream of [[Watch]] objects containing
* additions, modifications, and deletions to the items in the list being watched, and a Closable handle
* allowing for the watch to be terminated.

This comment has been minimized.

@olix0r

olix0r Nov 28, 2017

Member

nit: i suspect these comment lines are a big longer than the usual 100 char limit we like to apply.

Would you mind wrapping comments at 100 chars here & throughout? Thanks!

*/
def watch(
labelSelector: Option[String] = None,
fieldSelector: Option[String] = None,
resourceVersion: Option[String] = None
): (AsyncStream[W], Closable) = {
): (Activity.State[W] => Unit) => Closable = { (output: Activity.State[W] => Unit) =>

This comment has been minimized.

@olix0r

olix0r Nov 28, 2017

Member

nit: probably most idiomatic to call output state

@sgrankin sgrankin force-pushed the sgrankin:sgrankin/#1694-k8s-leak branch from ade9583 to caab312 Nov 28, 2017

@siggy

This comment has been minimized.

Copy link
Member

siggy commented Nov 28, 2017

memory usage flat after 24 hours. that drop at the end is me forcing a gc with jmap:
screen shot 2017-11-28 at 10 46 03 am

@hawkw hawkw added this to the 1.3.3 milestone Nov 28, 2017

@hawkw
Copy link
Member

hawkw left a comment

This looks great to me --- thank you so much for making this fix! I have a handful of style and documentation nits to pick, but feel free to ignore any of them.

.increasingOnly
}

def _processEventStream(

This comment has been minimized.

@hawkw

hawkw Nov 28, 2017

Member

A comment summarizing this function's behaviour might be nice.

previousEvent: Option[W] = None
): Future[Unit] = {
stream().uncons.flatMap {
case Some((w, ws)) =>

This comment has been minimized.

@hawkw

hawkw Nov 28, 2017

Member

Take it or leave it, but I think this might be a little clearer if we collapsed the nested match expression into one match with guards, like:

// special case to handle Kubernetes bug where "too old
// resource version" errors are returned with status code 200
// rather than status code 410.
// see https://github.com/kubernetes/kubernetes/issues/35068
// for details
case Some((e: Watch.Error[0], ws))  if e.status.code.contains(410) =>
  log.debug(
   "k8s returned 'too old resource version' error with " +
     "incorrect HTTP status code, restarting watch"
  )
  _resourceVersionTooOld()
  
case Some((event, ws)) =>
  import Ordering.Implicits._
  if (previousEvent.forall((_: W) < event)) {
    state(Activity.Ok(event))
    _processEventStream(ws, event.resourceVersion, Some(event))
  } else {
    // Ignore any events where we receive a resource version lower
    // than the last received resource version.
    _processEventStream(ws, resourceVersion, previousEvent)
  }

// if the stream ends (k8s will kill connections after ~30m), restart it.
case None if resourceVersion.isDefined => _watch(resourceVersion)

// In this case, we want to try loading the initial information instead before
// watching again.
case None => _resourceVersionTooOld()

This comment has been minimized.

@sgrankin

sgrankin Nov 29, 2017

Contributor

(Also rephrased the comments slightly for clarity)

import io.buoyant.test.FunSuite
import org.scalatest.prop.GeneratorDrivenPropertyChecks

class WatchableTest extends FunSuite

This comment has been minimized.

@hawkw

hawkw Nov 28, 2017

Member

Since IncreasingOnlyAsyncStream was removed, I'd like to see another test somewhere that asserts that the events on Watchable.Activity have monotonically-increasing resource versions even if the Kubernetes API sends events out of order...

This comment has been minimized.

@sgrankin

sgrankin Nov 29, 2017

Contributor

Good catch!

@sgrankin sgrankin force-pushed the sgrankin:sgrankin/#1694-k8s-leak branch 3 times, most recently from bed8946 to 9a22ffd Nov 29, 2017

@sgrankin

This comment has been minimized.

Copy link
Contributor

sgrankin commented Nov 29, 2017

Updated with CR fixes and rebased on latest master.
Note that io.buoyant.grpc.interop.NetworkedInteropTest seems to be flaky (failed, then passed on re-run).

@hawkw

This comment has been minimized.

Copy link
Member

hawkw commented Nov 29, 2017

@sgrankin yeah, that's one of a handful of integration tests that are known to be somewhat flaky on CI (#1504), please feel free to restart any builds failed by that test!

@hawkw

hawkw approved these changes Nov 29, 2017

Copy link
Member

hawkw left a comment

I'd like to hear from @adleong before merging this PR, but it looks good to me! I appreciate that you made all the style changes I requested so quickly. :)

// Special case to handle Kubernetes bug where "too old resource version" errors are
// returned with status code 200 rather than status code 410.
// see https://github.com/kubernetes/kubernetes/issues/35068 for details
case Some((e: Watch.Error[O], _)) if e.status.code.contains(410) =>

This comment has been minimized.

@hawkw

hawkw Nov 29, 2017

Member

Thanks for flattening the match arms like I asked, this looks great. The rephrased comments are much clearer too!

await(w.write(modified0))
poll { case None => }

// write an earlier event: no update since resource version is too low

This comment has been minimized.

@hawkw

hawkw Nov 29, 2017

Member

Thanks for adding this test!

@adleong
Copy link
Member

adleong left a comment

This is a great improvement! I've left just a few non-blocking stylistic comments/questions. If you have time to address these, that would be great. If not, we can go ahead and merge this PR and address any style issues in a follow-up. Either way, we should have this merged by the end of the week.

*/
def watch(
labelSelector: Option[String] = None,
fieldSelector: Option[String] = None,
resourceVersion: Option[String] = None
): (AsyncStream[W], Closable) = {
): (Activity.State[W] => Unit) => Closable = { (state: Activity.State[W] => Unit) =>

This comment has been minimized.

@adleong

adleong Nov 29, 2017

Member

Could this return type just be Activity[W] instead? I think this should be as simple as moving Activity(Var.async(...) { ... }) from the caller into this method. That would make this method signature a little easier to understand.

This comment has been minimized.

@sgrankin

sgrankin Nov 29, 2017

Contributor

IIRC I actually started this change with watch returning a Event[]. It was indeed cleaner, and worked in deployment, but the tests failed:

  • some of the existing tests write multiple events into the response to be processed before actually testing what came out on the other end.
  • the only way to consume a stream of events from a Var or Activity seems to be Var.changes or Activity.states (which then calls Var.changes)
  • in the observe method of the Var created in Var.async, I think multiple updates were managing to sneak in between the synchronized creation of the nested Var and the call to .observe which would register the Observer, so the first update would not necessarily be seen
  • and so the tests would not necessarily see the first event and would fail.

I thought this behavior of missing some initial events should be ok in the wild (we'd get the latest event eventually), but wanted to make sure the tests (that test the watch function directly) saw the complete stream of events, as they did previously with AsyncStream. I was also trying to avoid changing the logic of the tests. So I ended up with this function thus this function and the concurrent-queue machinery in the tests.

I don't see a way of extracting the complete stream of events out of a Var (otherwise we'd likely run into the same issue as with AsyncStream, right?). Anything I may be missing about Vars/Activities?

Actually... another option to improve the readability without changing the signature much (obvious in retrospect):

def watch(
    labelSelector: Option[String] = None,
    fieldSelector: Option[String] = None,
    resourceVersion: Option[String] = None,
    state: Activity.State[W] => Unit
  ) 

The watchable call-site doesn't actually benefit from being able to partially watch, and the ApiTest callsite is easily changed to work with this signature as well.

Thoughts? It'd still keep the weird state type but make the watch signature plainer, and the tests can still avoid dealing with Var quirks.

This comment has been minimized.

@sgrankin

sgrankin Nov 30, 2017

Contributor

per discussion, went with the l updated signature... and also changed it from Activity.State[W] => Unit to Updatable[Activity.State[W]]. It makes the intent clear and simplifies the non-test callsite.

def _processEventStream(
stream: () => AsyncStream[W],
resourceVersion: Option[String],
previousEvent: Option[W] = None

This comment has been minimized.

@adleong

adleong Nov 29, 2017

Member

consider calling this "largestVersion" or something since it may not actually be the previous event

This comment has been minimized.

@sgrankin

sgrankin Nov 30, 2017

Contributor

updated to 'largestVersion' and 'largestEvent'.

Aside: a good cleanup to do afterwards may be to get rid of the event argument here. I think the resource version comparison from ResourceVersionOrdering can be factored out and used directly with the resource version here, avoiding the need to keep a whole event around. (I think this may be the only use case of that Ordering type class ... I didn't do the change wanting to limit the diff size.)

import Ordering.Implicits._
// Register the update only if its resource version is larger than the largest version
// seen so far.
if (previousEvent.forall((_: W) < event)) {

This comment has been minimized.

@adleong

adleong Nov 29, 2017

Member

is the _: W type annotation necessary?

This comment has been minimized.

@sgrankin

sgrankin Nov 29, 2017

Contributor

It was in some other incarnation of this lambda, but it seems no longer (or was possibly an IntelliJ-only error). Will remove.

*/
def _processEventStream(
stream: () => AsyncStream[W],
resourceVersion: Option[String],

This comment has been minimized.

@adleong

adleong Nov 29, 2017

Member

I think this variable shadows the resourceVersion passed to watch which is a bit confusing.

@sgrankin
Copy link
Contributor

sgrankin left a comment

Thanks. I'll do the last round of cleanups tomorrow. Please LMK about your preference on the watch signature

*/
def watch(
labelSelector: Option[String] = None,
fieldSelector: Option[String] = None,
resourceVersion: Option[String] = None
): (AsyncStream[W], Closable) = {
): (Activity.State[W] => Unit) => Closable = { (state: Activity.State[W] => Unit) =>

This comment has been minimized.

@sgrankin

sgrankin Nov 29, 2017

Contributor

IIRC I actually started this change with watch returning a Event[]. It was indeed cleaner, and worked in deployment, but the tests failed:

  • some of the existing tests write multiple events into the response to be processed before actually testing what came out on the other end.
  • the only way to consume a stream of events from a Var or Activity seems to be Var.changes or Activity.states (which then calls Var.changes)
  • in the observe method of the Var created in Var.async, I think multiple updates were managing to sneak in between the synchronized creation of the nested Var and the call to .observe which would register the Observer, so the first update would not necessarily be seen
  • and so the tests would not necessarily see the first event and would fail.

I thought this behavior of missing some initial events should be ok in the wild (we'd get the latest event eventually), but wanted to make sure the tests (that test the watch function directly) saw the complete stream of events, as they did previously with AsyncStream. I was also trying to avoid changing the logic of the tests. So I ended up with this function thus this function and the concurrent-queue machinery in the tests.

I don't see a way of extracting the complete stream of events out of a Var (otherwise we'd likely run into the same issue as with AsyncStream, right?). Anything I may be missing about Vars/Activities?

Actually... another option to improve the readability without changing the signature much (obvious in retrospect):

def watch(
    labelSelector: Option[String] = None,
    fieldSelector: Option[String] = None,
    resourceVersion: Option[String] = None,
    state: Activity.State[W] => Unit
  ) 

The watchable call-site doesn't actually benefit from being able to partially watch, and the ApiTest callsite is easily changed to work with this signature as well.

Thoughts? It'd still keep the weird state type but make the watch signature plainer, and the tests can still avoid dealing with Var quirks.

import Ordering.Implicits._
// Register the update only if its resource version is larger than the largest version
// seen so far.
if (previousEvent.forall((_: W) < event)) {

This comment has been minimized.

@sgrankin

sgrankin Nov 29, 2017

Contributor

It was in some other incarnation of this lambda, but it seems no longer (or was possibly an IntelliJ-only error). Will remove.

@adleong

This comment has been minimized.

Copy link
Member

adleong commented Nov 29, 2017

Ah, that's super interesting! Yes, I think you are correct that it is a fundamental property of Var (and Event) that they do not keep history. With that in mind, your approach here makes sense. I do like the updated signature you propose (moving state into the param list).

@siggy

This comment has been minimized.

Copy link
Member

siggy commented Nov 30, 2017

Hi @sgrankin, just checking in to see how things are going. We'd love to get this merged for our 1.3.3 release this week.

@sgrankin

This comment has been minimized.

Copy link
Contributor

sgrankin commented Nov 30, 2017

Fix AsyncStream leak in kubernetes watch (#1694)
k8s watch returned an AsyncStream, the head of which was leaking (as
determined via heap dumps), leading to significant leaks over 24-48hrs.

Since the AsyncStream was not strictly necessary, rewrite `watch` as
an update function suitable for Var.async (as used by `activity`).
- The change attempts to preserve previous bug fixes and comments
  documenting them.
- The AsyncStream enrichments and their tests have been removed as they
  are no longer used.
- A `watch` helper was added to ApiTest to hide the more complicated
  usage pattern around .watch.
- Fix orphaned infinite-retry watches in ingress tests by adding
  no-data /watch/ handlers.

Fixes #1694

@sgrankin sgrankin force-pushed the sgrankin:sgrankin/#1694-k8s-leak branch from 9a22ffd to 3394389 Nov 30, 2017

@sgrankin

This comment has been minimized.

Copy link
Contributor

sgrankin commented Nov 30, 2017

@siggy: I'm done with all the changes. Thanks!

@siggy siggy merged commit 399a070 into linkerd:master Nov 30, 2017

1 check passed

ci/circleci Your tests passed on CircleCI!
Details

tbrooks8 pushed a commit to tbrooks8/linkerd that referenced this pull request Dec 20, 2018

Proxy init and sidecar containers auto-injection (linkerd#1714)
* Support auto sidecar-injection

1. Add proxy-injector deployment spec to cli/install/template.go
2. Inject the Linkerd CA bundle into the MutatingWebhookConfiguration
during the webhook's start-up process.
3. Add a new handler to the CA controller to create a new secret for the
webhook when a new MutatingWebhookConfiguration is created.
4. Declare a config map to store the proxy and proxy-init container
specs used during the auto-inject process.
5. Ignore namespace and pods that are labeled with
linkerd.io/auto-inject: disabled or linkerd.io/auto-inject: completed
6. Add new flag to `linkerd install` to enable/disable proxy
auto-injection

Proposed implementation for linkerd#561.

* Resolve missing packages errors
* Move the auto-inject label to the pod level
* PR review items
* Move proxy-injector to its own deployment
* Ignore pods that already have proxy injected

This ensures the webhook doesn't error out due to proxy that are injected using the  command

* PR review items on creating/updating the MWC on-start
* Replace API calls to ConfigMap with file reads
* Fixed post-rebase broken tests
* Don't mutate the auto-inject label

Since we started using healhcheck.HasExistingSidecars() to ensure pods with
existing proxies aren't mutated, we don't need to use the auto-inject label as
an indicator.

This resolves a bug which happens with the kubectl run command where the deployment
is also assigned the auto-inject label. The mutation causes the pod auto-inject
label to not match the deployment label, causing kubectl run to fail.

* Tidy up unit tests
* Include proxy resource requests in sidecar config map
* Fixes to broken YAML in CLI install config

The ignore inbound and outbound ports are changed to string type to
avoid broken YAML caused by the string conversion in the uint slice.

Also, parameterized the proxy bind timeout option in template.go.

Renamed the sidecar config map to
'linkerd-proxy-injector-webhook-config'.

Signed-off-by: ihcsim <ihcsim@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment