Skip to content

Commit

Permalink
Merge b984c6a into cbb703c
Browse files Browse the repository at this point in the history
  • Loading branch information
aledbf committed Oct 1, 2017
2 parents cbb703c + b984c6a commit 1f0a6d9
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 74 deletions.
52 changes: 48 additions & 4 deletions core/pkg/ingress/controller/backend_ssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ import (
"github.com/golang/glog"

apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/client-go/tools/cache"

"k8s.io/ingress/core/pkg/ingress"
"k8s.io/ingress/core/pkg/ingress/annotations/class"
"k8s.io/ingress/core/pkg/ingress/annotations/parser"
"k8s.io/ingress/core/pkg/net/ssl"
)

Expand All @@ -53,15 +56,21 @@ func (ic *GenericController) syncSecret(key string) {
glog.Infof("updating secret %v in the local store", key)
ic.sslCertTracker.Update(key, cert)
// we need to force the sync of the secret to disk
ic.syncSecret(key)
// this update must trigger an update
// (like an update event from a change in Ingress)
ic.syncIngress("secret-update")
if ic.isInitialSyncDone() {
// this update must trigger an update
// (like an update event from a change in Ingress)
ic.syncQueue.Enqueue(&extensions.Ingress{})
}
return
}

glog.Infof("adding secret %v to the local store", key)
ic.sslCertTracker.Add(key, cert)
if ic.isInitialSyncDone() {
// this update must trigger an update
// (like an update event from a change in Ingress)
ic.syncQueue.Enqueue(&extensions.Ingress{})
}
}

// getPemCertificate receives a secret, and creates a ingress.SSLCert as return.
Expand Down Expand Up @@ -120,6 +129,41 @@ func (ic *GenericController) getPemCertificate(secretName string) (*ingress.SSLC
return s, nil
}

// checkMissingSecrets verify if one or more ingress rules contains a reference
// to a secret that is not present in the local secret store.
// In this case we call syncSecret.
func (ic *GenericController) checkMissingSecrets() {
for _, key := range ic.listers.Ingress.ListKeys() {
if obj, exists, _ := ic.listers.Ingress.GetByKey(key); exists {
ing := obj.(*extensions.Ingress)

if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
continue
}

for _, tls := range ing.Spec.TLS {
if tls.SecretName == "" {
continue
}

key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName)
if _, ok := ic.sslCertTracker.Get(key); !ok {
ic.syncSecret(key)
}
}

key, _ := parser.GetStringAnnotation("ingress.kubernetes.io/auth-tls-secret", ing)
if key == "" {
continue
}

if _, ok := ic.sslCertTracker.Get(key); !ok {
ic.syncSecret(key)
}
}
}
}

// sslCertTracker holds a store of referenced Secrets in Ingress rules
type sslCertTracker struct {
cache.ThreadSafeStore
Expand Down
16 changes: 6 additions & 10 deletions core/pkg/ingress/controller/backend_ssl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testclient "k8s.io/client-go/kubernetes/fake"
cache_client "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/flowcontrol"

"k8s.io/ingress/core/pkg/ingress"
"k8s.io/ingress/core/pkg/ingress/store"
Expand Down Expand Up @@ -102,21 +103,16 @@ func buildControllerForBackendSSL() cache_client.Controller {
}

func buildGenericControllerForBackendSSL() *GenericController {
return &GenericController{
gc := &GenericController{
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
cfg: &Configuration{
Client: buildSimpleClientSetForBackendSSL(),
},
listers: buildListers(),

ingController: buildControllerForBackendSSL(),
endpController: buildControllerForBackendSSL(),
svcController: buildControllerForBackendSSL(),
nodeController: buildControllerForBackendSSL(),
secrController: buildControllerForBackendSSL(),
mapController: buildControllerForBackendSSL(),

listers: buildListers(),
sslCertTracker: newSSLCertTracker(),
}

return gc
}

func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) {
Expand Down
83 changes: 32 additions & 51 deletions core/pkg/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ import (
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"

Expand Down Expand Up @@ -79,14 +78,8 @@ var (
type GenericController struct {
cfg *Configuration

ingController cache.Controller
endpController cache.Controller
svcController cache.Controller
nodeController cache.Controller
secrController cache.Controller
mapController cache.Controller

listers *ingress.StoreLister
listers *ingress.StoreLister
cacheController *cacheController

annotations annotationExtractor

Expand All @@ -113,6 +106,7 @@ type GenericController struct {
runningConfig *ingress.Configuration

forceReload int32
initialSync int32
}

// Configuration contains all the settings required by an Ingress controller
Expand Down Expand Up @@ -165,12 +159,11 @@ func newIngressController(config *Configuration) *GenericController {
Component: "ingress-controller",
}),
sslCertTracker: newSSLCertTracker(),
listers: &ingress.StoreLister{},
}

ic.syncQueue = task.NewTaskQueue(ic.syncIngress)

ic.createListers(config.DisableNodeList)
ic.listers, ic.cacheController = ic.createListers(config.DisableNodeList)

if config.UpdateStatus {
ic.syncStatus = status.NewStatusSyncer(status.Config{
Expand Down Expand Up @@ -682,25 +675,24 @@ func (ic *GenericController) getBackendServers(ingresses []*extensions.Ingress)
return aUpstreams, aServers
}


// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret
func (ic GenericController) GetAuthCertificate(secretName string) (*resolver.AuthSSLCert, error) {
if _, exists := ic.sslCertTracker.Get(secretName); !exists {
ic.syncSecret(secretName)
func (ic GenericController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) {
if _, exists := ic.sslCertTracker.Get(name); !exists {
ic.syncSecret(name)
}

_, err := ic.listers.Secret.GetByName(secretName)
_, err := ic.listers.Secret.GetByName(name)
if err != nil {
return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err)
}

bc, exists := ic.sslCertTracker.Get(secretName)
bc, exists := ic.sslCertTracker.Get(name)
if !exists {
return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", secretName)
return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", name)
}
cert := bc.(*ingress.SSLCert)
return &resolver.AuthSSLCert{
Secret: secretName,
Secret: name,
CAFileName: cert.CAFileName,
PemSHA: cert.PemSHA,
}, nil
Expand Down Expand Up @@ -1213,55 +1205,44 @@ func (ic GenericController) Stop() error {
func (ic *GenericController) Start() {
glog.Infof("starting Ingress controller")

go ic.ingController.Run(ic.stopCh)
go ic.endpController.Run(ic.stopCh)
go ic.svcController.Run(ic.stopCh)
go ic.nodeController.Run(ic.stopCh)
go ic.secrController.Run(ic.stopCh)
go ic.mapController.Run(ic.stopCh)

// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(ic.stopCh,
ic.ingController.HasSynced,
ic.svcController.HasSynced,
ic.endpController.HasSynced,
ic.secrController.HasSynced,
ic.mapController.HasSynced,
ic.nodeController.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
ic.cacheController.Run(ic.stopCh)

// initial sync of secrets to avoid unnecessary reloads
for _, key := range ic.listers.Ingress.ListKeys() {
if obj, exists, _ := ic.listers.Ingress.GetByKey(key); exists {
ing := obj.(*extensions.Ingress)
createDefaultSSLCertificate()

if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
continue
}
time.Sleep(5 * time.Second)
// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secret")
for _, obj := range ic.listers.Ingress.List() {
ing := obj.(*extensions.Ingress)

ic.readSecrets(ing)
if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
continue
}
}

createDefaultSSLCertificate()
ic.readSecrets(ing)
}

go ic.syncQueue.Run(time.Second, ic.stopCh)

if ic.syncStatus != nil {
go ic.syncStatus.Run(ic.stopCh)
}

time.Sleep(5 * time.Second)
go wait.Until(ic.checkMissingSecrets, 30*time.Second, ic.stopCh)

// force initial sync
atomic.StoreInt32(&ic.initialSync, 1)
ic.syncQueue.Enqueue(&extensions.Ingress{})

<-ic.stopCh
}

func (ic *GenericController) isInitialSyncDone() bool {
return atomic.LoadInt32(&ic.initialSync) != 0
}

func (ic *GenericController) isForceReload() bool {
return atomic.LoadInt32(&ic.forceReload) != 0
}
Expand Down
57 changes: 49 additions & 8 deletions core/pkg/ingress/controller/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,46 @@ import (
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
fcache "k8s.io/client-go/tools/cache/testing"

"k8s.io/ingress/core/pkg/ingress"
"k8s.io/ingress/core/pkg/ingress/annotations/class"
"k8s.io/ingress/core/pkg/ingress/annotations/parser"
)

func (ic *GenericController) createListers(disableNodeLister bool) {
type cacheController struct {
Ingress cache.Controller
Endpoint cache.Controller
Service cache.Controller
Node cache.Controller
Secret cache.Controller
Configmap cache.Controller
}

func (c *cacheController) Run(stopCh chan struct{}) {
go c.Ingress.Run(stopCh)
go c.Endpoint.Run(stopCh)
go c.Service.Run(stopCh)
go c.Node.Run(stopCh)
go c.Secret.Run(stopCh)
go c.Configmap.Run(stopCh)

// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh,
c.Ingress.HasSynced,
c.Endpoint.HasSynced,
c.Service.HasSynced,
c.Node.HasSynced,
c.Secret.HasSynced,
c.Configmap.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
}

func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.StoreLister, *cacheController) {
// from here to the end of the method all the code is just boilerplate
// required to watch Ingress, Secrets, ConfigMaps and Endoints.
// This is used to detect new content, updates or removals and act accordingly
Expand All @@ -45,7 +77,9 @@ func (ic *GenericController) createListers(disableNodeLister bool) {
return
}
ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
ic.syncQueue.Enqueue(obj)
if ic.isInitialSyncDone() {
ic.syncQueue.Enqueue(obj)
}
},
DeleteFunc: func(obj interface{}) {
delIng, ok := obj.(*extensions.Ingress)
Expand Down Expand Up @@ -113,6 +147,7 @@ func (ic *GenericController) createListers(disableNodeLister bool) {
}
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
ic.sslCertTracker.DeleteAll(key)
ic.syncQueue.Enqueue(key)
},
}

Expand Down Expand Up @@ -165,23 +200,27 @@ func (ic *GenericController) createListers(disableNodeLister bool) {
watchNs = ic.cfg.Namespace
}

ic.listers.Ingress.Store, ic.ingController = cache.NewInformer(
lister := &ingress.StoreLister{}

controller := &cacheController{}

lister.Ingress.Store, controller.Ingress = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()),
&extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler)

ic.listers.Endpoint.Store, ic.endpController = cache.NewInformer(
lister.Endpoint.Store, controller.Endpoint = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
&apiv1.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)

ic.listers.Secret.Store, ic.secrController = cache.NewInformer(
lister.Secret.Store, controller.Secret = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()),
&apiv1.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)

ic.listers.ConfigMap.Store, ic.mapController = cache.NewInformer(
lister.ConfigMap.Store, controller.Configmap = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()),
&apiv1.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)

ic.listers.Service.Store, ic.svcController = cache.NewInformer(
lister.Service.Store, controller.Service = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
&apiv1.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})

Expand All @@ -191,7 +230,9 @@ func (ic *GenericController) createListers(disableNodeLister bool) {
} else {
nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
}
ic.listers.Node.Store, ic.nodeController = cache.NewInformer(
lister.Node.Store, controller.Node = cache.NewInformer(
nodeListerWatcher,
&apiv1.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})

return lister, controller
}
2 changes: 1 addition & 1 deletion examples/custom-controller/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (n DummyController) ConfigureFlags(*pflag.FlagSet) {
func (n DummyController) OverrideFlags(*pflag.FlagSet) {
}

func (n DummyController) SetListers(lister ingress.StoreLister) {
func (n DummyController) SetListers(lister *ingress.StoreLister) {

}

Expand Down

0 comments on commit 1f0a6d9

Please sign in to comment.