diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index cee3e77a03b..2db36399370 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -11,6 +11,17 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Affecting all Beats* - Update to Golang 1.12.1. {pull}11330[11330] +- Update to Golang 1.12.4. {pull}11782[11782] +- Update to ECS 1.0.1. {pull}12284[12284] {pull}12317[12317] +- Default of output.kafka.metadata.full is set to false by now. This reduced the amount of metadata to be queried from a kafka cluster. {pull}12738[12738] +- Fixed a crash under Windows when fetching processes information. {pull}12833[12833] +- Update to Golang 1.12.7. {pull}12931[12931] +- Remove `in_cluster` configuration parameter for Kuberentes, now in-cluster configuration is used only if no other kubeconfig is specified {pull}13051[13051] +- Disable Alibaba Cloud and Tencent Cloud metadata providers by default. {pull}13812[12812] +- Libbeat HTTP's Server can listen to a unix socket using the `unix:///tmp/hello.sock` syntax. {pull}13655[13655] +- Libbeat HTTP's Server can listen to a Windows named pipe using the `npipe:///hello` syntax. {pull}13655[13655] +- By default, all Beats-created files and folders will have a umask of 0027 (on POSIX systems). {pull}14119[14119] +- Fix memory leak in kubernetes autodiscover provider and add_kubernetes_metadata processor happening when pods are terminated without sending a delete event. {pull}14259[14259] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index ca37803c7db..e8c197634cf 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -123,24 +123,49 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis } watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - p.logger.Debugf("Watcher Pod add: %+v", obj) - p.emit(obj.(*kubernetes.Pod), "start") - }, - UpdateFunc: func(obj interface{}) { - p.logger.Debugf("Watcher Pod update: %+v", obj) - p.emit(obj.(*kubernetes.Pod), "stop") - p.emit(obj.(*kubernetes.Pod), "start") - }, - DeleteFunc: func(obj interface{}) { - p.logger.Debugf("Watcher Pod delete: %+v", obj) - time.AfterFunc(config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) - }, + AddFunc: p.handleAdd, + UpdateFunc: p.handleUpdate, + DeleteFunc: p.handleDelete, }) return p, nil } +// handleAdd emits a start event for the given pod +func (p *Provider) handleAdd(obj interface{}) { + p.logger.Debugf("Watcher Pod add: %+v", obj) + p.emit(obj.(*kubernetes.Pod), "start") +} + +// handleUpdate emits events for a given pod depending on the state of the pod, +// if it is terminating, a stop event is scheduled, if not, a stop and a start +// events are sent sequentially to recreate the resources assotiated to the pod. +func (p *Provider) handleUpdate(obj interface{}) { + pod := obj.(*kubernetes.Pod) + if pod.GetObjectMeta().GetDeletionTimestamp() != nil { + p.logger.Debugf("Watcher Pod update (terminating): %+v", obj) + // Pod is terminating, don't reload its configuration and ignore the event + // if some pod is still running, we will receive more events when containers + // terminate. + for _, container := range pod.Status.ContainerStatuses { + if container.State.Running != nil { + return + } + } + time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) + } else { + p.logger.Debugf("Watcher Pod update: %+v", obj) + p.emit(pod, "stop") + p.emit(pod, "start") + } +} + +// handleDelete emits a stop event for the given pod +func (p *Provider) handleDelete(obj interface{}) { + p.logger.Debugf("Watcher Pod delete: %+v", obj) + time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) +} + // Start for Runner interface. func (p *Provider) Start() { if err := p.watcher.Start(); err != nil { diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 33399f0d942..00ecf06d98d 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -109,11 +109,11 @@ func New(cfg *common.Config) (processors.Processor, error) { client, err := kubernetes.GetKubernetesClient(config.KubeConfig) if err != nil { if kubernetes.IsInCluster(config.KubeConfig) { - logp.Debug("kubernetes", "%v: could not create kubernetes client using in_cluster config", "add_kubernetes_metadata") + logp.Debug("kubernetes", "%v: could not create kubernetes client using in_cluster config: %v", "add_kubernetes_metadata", err) } else if config.KubeConfig == "" { - logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v", "add_kubernetes_metadata", os.Getenv("KUBECONFIG")) + logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v: %v", "add_kubernetes_metadata", os.Getenv("KUBECONFIG"), err) } else { - logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v", "add_kubernetes_metadata", config.KubeConfig) + logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v: %v", "add_kubernetes_metadata", config.KubeConfig, err) } return processor, nil } @@ -152,14 +152,19 @@ func New(cfg *common.Config) (processors.Processor, error) { watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - processor.addPod(obj.(*kubernetes.Pod)) + pod := obj.(*kubernetes.Pod) + logp.Debug("kubernetes", "%v: adding pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName()) + processor.addPod(pod) }, UpdateFunc: func(obj interface{}) { - processor.removePod(obj.(*kubernetes.Pod)) - processor.addPod(obj.(*kubernetes.Pod)) + pod := obj.(*kubernetes.Pod) + logp.Debug("kubernetes", "%v: updating pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName()) + processor.updatePod(pod) }, DeleteFunc: func(obj interface{}) { - processor.removePod(obj.(*kubernetes.Pod)) + pod := obj.(*kubernetes.Pod) + logp.Debug("kubernetes", "%v: removing pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName()) + processor.removePod(pod) }, }) @@ -198,6 +203,18 @@ func (k *kubernetesAnnotator) addPod(pod *kubernetes.Pod) { } } +func (k *kubernetesAnnotator) updatePod(pod *kubernetes.Pod) { + k.removePod(pod) + + // Add it again only if it is not being deleted + if pod.GetObjectMeta().GetDeletionTimestamp() != nil { + logp.Debug("kubernetes", "%v: removing pod being terminated: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName()) + return + } + + k.addPod(pod) +} + func (k *kubernetesAnnotator) removePod(pod *kubernetes.Pod) { indexes := k.indexers.GetIndexes(pod) for _, idx := range indexes {