Skip to content

Commit

Permalink
Merge pull request coredns#56 from sgreene570/bump-for-kube-1.21
Browse files Browse the repository at this point in the history
Bug 1947478: Upstream cherry-picks and downstream dep bumps for kube 1.21 compatability
  • Loading branch information
openshift-merge-robot committed May 14, 2021
2 parents 1cc720f + 1af2087 commit 842f354
Show file tree
Hide file tree
Showing 1,351 changed files with 107,428 additions and 9,964 deletions.
12 changes: 6 additions & 6 deletions go.mod
@@ -1,6 +1,6 @@
module github.com/coredns/coredns

go 1.13
go 1.16

require (
github.com/Azure/azure-sdk-for-go v40.6.0+incompatible
Expand All @@ -23,13 +23,13 @@ require (
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.15.0
go.etcd.io/etcd v0.5.0-alpha.5.0.20200306183522-221f0cc107cb
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073
google.golang.org/api v0.29.0
google.golang.org/grpc v1.29.1
gopkg.in/DataDog/dd-trace-go.v1 v1.28.0
k8s.io/api v0.20.2
k8s.io/apimachinery v0.20.2
k8s.io/client-go v0.20.2
k8s.io/api v0.21.0
k8s.io/apimachinery v0.21.0
k8s.io/client-go v0.21.0
k8s.io/klog v1.0.0
)
77 changes: 52 additions & 25 deletions go.sum

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions plugin/kubernetes/README.md
Expand Up @@ -101,6 +101,20 @@ kubernetes [ZONES...] {

Enabling zone transfer is done by using the *transfer* plugin.

## Startup

When CoreDNS starts with the *kubernetes* plugin enabled, it will delay serving DNS for up to 5 seconds
until it can connect to the Kubernetes API and synchronize all object watches. If this cannot happen within
5 seconds, then CoreDNS will start serving DNS while the *kubernetes* plugin continues to try to connect
and synchronize all object watches. CoreDNS will answer SERVFAIL to any request made for a Kubernetes record
that has not yet been synchronized.

## Monitoring Kubernetes Endpoints

By default the *kubernetes* plugin watches Endpoints via the `discovery.EndpointSlices` API. However the
`api.Endpoints` API is used instead if the Kubernetes version does not support the `EndpointSliceProxying`
feature gate by default (i.e. Kubernetes version < 1.19).

## Ready

This plugin reports readiness to the ready plugin. This will happen after it has synced to the
Expand Down
109 changes: 82 additions & 27 deletions plugin/kubernetes/controller.go
Expand Up @@ -11,7 +11,8 @@ import (
"github.com/coredns/coredns/plugin/kubernetes/object"

api "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
discovery "k8s.io/api/discovery/v1"
discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -59,6 +60,10 @@ type dnsControl struct {
selector labels.Selector
namespaceSelector labels.Selector

// epLock is used to lock reads of epLister and epController while they are being replaced
// with the api.Endpoints Lister/Controller on k8s systems that don't use discovery.EndpointSlices
epLock sync.RWMutex

svcController cache.Controller
podController cache.Controller
epController cache.Controller
Expand All @@ -83,7 +88,6 @@ type dnsControl struct {
type dnsControlOpts struct {
initPodCache bool
initEndpointsCache bool
useEndpointSlices bool
ignoreEmptyService bool

// Label handling.
Expand Down Expand Up @@ -132,32 +136,18 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
}

if opts.initEndpointsCache {
var (
apiObj runtime.Object
listWatch cache.ListWatch
to object.ToFunc
latency *object.EndpointLatencyRecorder
)
if opts.useEndpointSlices {
apiObj = &discovery.EndpointSlice{}
listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
to = object.EndpointSliceToEndpoints
latency = dns.EndpointSliceLatencyRecorder()
} else {
apiObj = &api.Endpoints{}
listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
to = object.ToEndpoints
latency = dns.EndpointsLatencyRecorder()
}
dns.epLock.Lock()
dns.epLister, dns.epController = object.NewIndexerInformer(
&listWatch,
apiObj,
&cache.ListWatch{
ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&discovery.EndpointSlice{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.DefaultProcessor(to, latency),
object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()),
)
dns.epLock.Unlock()
}

dns.nsLister, dns.nsController = cache.NewInformer(
Expand All @@ -172,6 +162,42 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
return &dns
}

// WatchEndpoints will set the endpoint Lister and Controller to watch object.Endpoints
// instead of the default discovery.EndpointSlice. This is used in older k8s clusters where
// discovery.EndpointSlice is not fully supported.
// This can be removed when all supported k8s versions fully support EndpointSlice.
func (dns *dnsControl) WatchEndpoints(ctx context.Context) {
dns.epLock.Lock()
dns.epLister, dns.epController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&api.Endpoints{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.DefaultProcessor(object.ToEndpoints, dns.EndpointsLatencyRecorder()),
)
dns.epLock.Unlock()
}

// WatchEndpointSliceV1beta1 will set the endpoint Lister and Controller to watch v1beta1
// instead of the default v1.
func (dns *dnsControl) WatchEndpointSliceV1beta1(ctx context.Context) {
dns.epLock.Lock()
dns.epLister, dns.epController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointSliceListFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: endpointSliceWatchFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&discoveryV1beta1.EndpointSlice{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.DefaultProcessor(object.EndpointSliceV1beta1ToEndpoints, dns.EndpointSliceLatencyRecorder()),
)
dns.epLock.Unlock()
}

func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder {
return &object.EndpointLatencyRecorder{
ServiceFunc: func(o meta.Object) []*object.Service {
Expand Down Expand Up @@ -254,13 +280,21 @@ func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s label
return c.CoreV1().Pods(ns).List(ctx, opts)
}
}
func endpointSliceListFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts)
}
}

func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts)
return c.DiscoveryV1().EndpointSlices(ns).List(ctx, opts)
}
}

Expand Down Expand Up @@ -304,7 +338,7 @@ func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labe
}
}

func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
func endpointSliceWatchFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
Expand All @@ -313,6 +347,15 @@ func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns stri
}
}

func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
return c.DiscoveryV1().EndpointSlices(ns).Watch(ctx, options)
}
}

func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
Expand Down Expand Up @@ -351,7 +394,11 @@ func (dns *dnsControl) Stop() error {
func (dns *dnsControl) Run() {
go dns.svcController.Run(dns.stopCh)
if dns.epController != nil {
go dns.epController.Run(dns.stopCh)
go func() {
dns.epLock.RLock()
dns.epController.Run(dns.stopCh)
dns.epLock.RUnlock()
}()
}
if dns.podController != nil {
go dns.podController.Run(dns.stopCh)
Expand All @@ -365,7 +412,9 @@ func (dns *dnsControl) HasSynced() bool {
a := dns.svcController.HasSynced()
b := true
if dns.epController != nil {
dns.epLock.RLock()
b = dns.epController.HasSynced()
dns.epLock.RUnlock()
}
c := true
if dns.podController != nil {
Expand All @@ -388,6 +437,8 @@ func (dns *dnsControl) ServiceList() (svcs []*object.Service) {
}

func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) {
dns.epLock.RLock()
defer dns.epLock.RUnlock()
os := dns.epLister.List()
for _, o := range os {
ep, ok := o.(*object.Endpoints)
Expand Down Expand Up @@ -446,6 +497,8 @@ func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*object.Service) {
}

func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
dns.epLock.RLock()
defer dns.epLock.RUnlock()
os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
if err != nil {
return nil
Expand All @@ -461,6 +514,8 @@ func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
}

func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
dns.epLock.RLock()
defer dns.epLock.RUnlock()
os, err := dns.epLister.ByIndex(epIPIndex, ip)
if err != nil {
return nil
Expand Down

0 comments on commit 842f354

Please sign in to comment.