Skip to content

Commit

Permalink
Fix cleanup when autodiscover pods are terminated (elastic#14259)
Browse files Browse the repository at this point in the history
In some cases pods termination doesn't originate a delete event in the
API watchers. Detect termination also by checking if a deletion
timestamp exists in update events.

(cherry picked from commit 1c36118)
  • Loading branch information
jsoriano committed Oct 28, 2019
1 parent 17a527d commit 819ec04
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 20 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -11,6 +11,17 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Affecting all Beats*

- Update to Golang 1.12.1. {pull}11330[11330]
- Update to Golang 1.12.4. {pull}11782[11782]
- Update to ECS 1.0.1. {pull}12284[12284] {pull}12317[12317]
- Default of output.kafka.metadata.full is set to false by now. This reduced the amount of metadata to be queried from a kafka cluster. {pull}12738[12738]
- Fixed a crash under Windows when fetching processes information. {pull}12833[12833]
- Update to Golang 1.12.7. {pull}12931[12931]
- Remove `in_cluster` configuration parameter for Kuberentes, now in-cluster configuration is used only if no other kubeconfig is specified {pull}13051[13051]
- Disable Alibaba Cloud and Tencent Cloud metadata providers by default. {pull}13812[12812]
- Libbeat HTTP's Server can listen to a unix socket using the `unix:///tmp/hello.sock` syntax. {pull}13655[13655]
- Libbeat HTTP's Server can listen to a Windows named pipe using the `npipe:///hello` syntax. {pull}13655[13655]
- By default, all Beats-created files and folders will have a umask of 0027 (on POSIX systems). {pull}14119[14119]
- Fix memory leak in kubernetes autodiscover provider and add_kubernetes_metadata processor happening when pods are terminated without sending a delete event. {pull}14259[14259]

*Auditbeat*

Expand Down
51 changes: 38 additions & 13 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Expand Up @@ -123,24 +123,49 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis
}

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
p.logger.Debugf("Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
},
UpdateFunc: func(obj interface{}) {
p.logger.Debugf("Watcher Pod update: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "stop")
p.emit(obj.(*kubernetes.Pod), "start")
},
DeleteFunc: func(obj interface{}) {
p.logger.Debugf("Watcher Pod delete: %+v", obj)
time.AfterFunc(config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
},
AddFunc: p.handleAdd,
UpdateFunc: p.handleUpdate,
DeleteFunc: p.handleDelete,
})

return p, nil
}

// handleAdd emits a start event for the given pod
func (p *Provider) handleAdd(obj interface{}) {
p.logger.Debugf("Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
}

// handleUpdate emits events for a given pod depending on the state of the pod,
// if it is terminating, a stop event is scheduled, if not, a stop and a start
// events are sent sequentially to recreate the resources assotiated to the pod.
func (p *Provider) handleUpdate(obj interface{}) {
pod := obj.(*kubernetes.Pod)
if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
p.logger.Debugf("Watcher Pod update (terminating): %+v", obj)
// Pod is terminating, don't reload its configuration and ignore the event
// if some pod is still running, we will receive more events when containers
// terminate.
for _, container := range pod.Status.ContainerStatuses {
if container.State.Running != nil {
return
}
}
time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") })
} else {
p.logger.Debugf("Watcher Pod update: %+v", obj)
p.emit(pod, "stop")
p.emit(pod, "start")
}
}

// handleDelete emits a stop event for the given pod
func (p *Provider) handleDelete(obj interface{}) {
p.logger.Debugf("Watcher Pod delete: %+v", obj)
time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
}

// Start for Runner interface.
func (p *Provider) Start() {
if err := p.watcher.Start(); err != nil {
Expand Down
31 changes: 24 additions & 7 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Expand Up @@ -109,11 +109,11 @@ func New(cfg *common.Config) (processors.Processor, error) {
client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
if err != nil {
if kubernetes.IsInCluster(config.KubeConfig) {
logp.Debug("kubernetes", "%v: could not create kubernetes client using in_cluster config", "add_kubernetes_metadata")
logp.Debug("kubernetes", "%v: could not create kubernetes client using in_cluster config: %v", "add_kubernetes_metadata", err)
} else if config.KubeConfig == "" {
logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v", "add_kubernetes_metadata", os.Getenv("KUBECONFIG"))
logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v: %v", "add_kubernetes_metadata", os.Getenv("KUBECONFIG"), err)
} else {
logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v", "add_kubernetes_metadata", config.KubeConfig)
logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v: %v", "add_kubernetes_metadata", config.KubeConfig, err)
}
return processor, nil
}
Expand Down Expand Up @@ -152,14 +152,19 @@ func New(cfg *common.Config) (processors.Processor, error) {

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
processor.addPod(obj.(*kubernetes.Pod))
pod := obj.(*kubernetes.Pod)
logp.Debug("kubernetes", "%v: adding pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName())
processor.addPod(pod)
},
UpdateFunc: func(obj interface{}) {
processor.removePod(obj.(*kubernetes.Pod))
processor.addPod(obj.(*kubernetes.Pod))
pod := obj.(*kubernetes.Pod)
logp.Debug("kubernetes", "%v: updating pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName())
processor.updatePod(pod)
},
DeleteFunc: func(obj interface{}) {
processor.removePod(obj.(*kubernetes.Pod))
pod := obj.(*kubernetes.Pod)
logp.Debug("kubernetes", "%v: removing pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName())
processor.removePod(pod)
},
})

Expand Down Expand Up @@ -198,6 +203,18 @@ func (k *kubernetesAnnotator) addPod(pod *kubernetes.Pod) {
}
}

func (k *kubernetesAnnotator) updatePod(pod *kubernetes.Pod) {
k.removePod(pod)

// Add it again only if it is not being deleted
if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
logp.Debug("kubernetes", "%v: removing pod being terminated: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName())
return
}

k.addPod(pod)
}

func (k *kubernetesAnnotator) removePod(pod *kubernetes.Pod) {
indexes := k.indexers.GetIndexes(pod)
for _, idx := range indexes {
Expand Down

0 comments on commit 819ec04

Please sign in to comment.