Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ingress status management #2386

Merged
merged 2 commits into from Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
99 changes: 99 additions & 0 deletions cmd/contour/ingressstatus.go
@@ -0,0 +1,99 @@
// Copyright © 2020 VMware
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"sync"

"github.com/projectcontour/contour/internal/k8s"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
)

// loadBalancerStatusWriter manages the lifetime of IngressStatusUpdaters.
//
// The theory of operation of the loadBalancerStatusWriter is as follows:
// 1. On startup the loadBalancerStatusWriter waits to be elected leader.
// 2. Once elected leader, the loadBalancerStatusWriter waits to receive a
// v1.LoadBalancerStatus value.
// 3. Once a v1.LoadBalancerStatus value has been received, any existing informer
// is stopped and a new informer started in its place. This ensures that all existing
// Ingress objects will have OnAdd events fired to the new event handler.
// 4. Each informer is connected to a k8s.IngressStatusUpdater which reacts to
// OnAdd events for networking.k8s.io/ingress.v1beta1 objects. For each OnAdd
// the object is patched with the v1.LoadBalancerStatus value obtained on creation.
// OnUpdate and OnDelete events are ignored.If a new v1.LoadBalancerStatus value
// is been received, operation restarts at step 3.
// 5. If the worker is stopped, any existing informer is stopped before the worker stops.
type loadBalancerStatusWriter struct {
log logrus.FieldLogger
clients *k8s.Clients
isLeader chan struct{}
lbStatus chan v1.LoadBalancerStatus
}

func (isw *loadBalancerStatusWriter) Start(stop <-chan struct{}) error {

// Await leadership election.
isw.log.Info("awaiting leadership election")
select {
case <-stop:
// We were asked to stop before elected leader.
return nil
case <-isw.isLeader:
isw.log.Info("elected leader")
}

var shutdown chan struct{}
var ingressInformers sync.WaitGroup
for {
select {
case <-stop:
// Use the shutdown channel to stop existing informer and shut down
if shutdown != nil {
close(shutdown)
}
ingressInformers.Wait()
return nil
case lbs := <-isw.lbStatus:
// Stop the existing informer.
if shutdown != nil {
close(shutdown)
}
ingressInformers.Wait()

isw.log.Info("Received a new address for status.loadBalancer")

// Create new informer for the new LoadBalancerStatus
factory := isw.clients.NewInformerFactory()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we reuse the informers created earlier? What if the informer would fail to create?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason to throw away the old informer is to ensure that the new informer does a full resync, so that all the existing Ingress objects that are in scope will get updated (via OnAdd calls).

I think that if the informer failed to create, there would be a panic somewhere, because this method doesn't return an error for us to check.

inf := factory.Networking().V1beta1().Ingresses().Informer()
log := isw.log.WithField("context", "IngressStatusUpdater")
inf.AddEventHandler(&k8s.IngressStatusUpdater{
Client: isw.clients.ClientSet(),
Logger: log,
Status: lbs,
})

shutdown = make(chan struct{})
ingressInformers.Add(1)
fn := startInformer(factory, log)
go func() {
defer ingressInformers.Done()
if err := fn(shutdown); err != nil {
return
}
}()
}
}
}
26 changes: 23 additions & 3 deletions cmd/contour/serve.go
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/sirupsen/logrus"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
coreinformers "k8s.io/client-go/informers"
)

Expand Down Expand Up @@ -104,6 +105,8 @@ func registerServe(app *kingpin.Application) (*kingpin.CmdClause, *serveContext)
serve.Flag("envoy-service-https-address", "Kubernetes Service address for HTTPS requests.").StringVar(&ctx.httpsAddr)
serve.Flag("envoy-service-http-port", "Kubernetes Service port for HTTP requests.").IntVar(&ctx.httpPort)
serve.Flag("envoy-service-https-port", "Kubernetes Service port for HTTPS requests.").IntVar(&ctx.httpsPort)
serve.Flag("envoy-service-name", "Envoy Service Name.").StringVar(&ctx.envoyServiceName)
serve.Flag("envoy-service-namespace", "Envoy Service Namespace.").StringVar(&ctx.envoyServiceNamespace)
serve.Flag("use-proxy-protocol", "Use PROXY protocol for all listeners.").BoolVar(&ctx.useProxyProto)

serve.Flag("accesslog-format", "Format for Envoy access logs.").StringVar(&ctx.AccessLogFormat)
Expand Down Expand Up @@ -265,7 +268,24 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
// step 10. register leadership election
eventHandler.IsLeader = setupLeadershipElection(&g, log, ctx, clients, eventHandler.UpdateNow)

// step 12. create grpc handler and register with workgroup.
// step 11. set up ingress load balancer status writer
lbsw := loadBalancerStatusWriter{
log: log.WithField("context", "loadBalancerStatusWriter"),
clients: clients,
isLeader: eventHandler.IsLeader,
lbStatus: make(chan v1.LoadBalancerStatus, 1),
}
g.Add(lbsw.Start)

// step 12. register an informer to watch envoy's service.
ssw := &k8s.ServiceStatusLoadBalancerWatcher{
ServiceName: ctx.envoyServiceName,
LBStatus: lbsw.lbStatus,
}
factory := clients.NewInformerFactoryForNamespace(ctx.envoyServiceNamespace)
factory.Core().V1().Services().Informer().AddEventHandler(ssw)
g.Add(startInformer(factory, log.WithField("context", "serviceStatusLoadBalancerWatcher")))

g.Add(func(stop <-chan struct{}) error {
log := log.WithField("context", "grpc")

Expand Down Expand Up @@ -306,7 +326,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
return s.Serve(l)
})

// step 13. Setup SIGTERM handler
// step 14. Setup SIGTERM handler
g.Add(func(stop <-chan struct{}) error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
Expand All @@ -319,7 +339,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
return nil
})

// step 14. GO!
// GO!
return g.Run()
}

Expand Down
10 changes: 10 additions & 0 deletions cmd/contour/servecontext.go
Expand Up @@ -109,6 +109,14 @@ type serveContext struct {
// If the value is true, Contour will register for all the service-apis types
// (GatewayClass, Gateway, HTTPRoute, TCPRoute, and any more as they are added)
UseExperimentalServiceAPITypes bool `yaml:"-"`

// envoy service details

// Namespace of the envoy service
envoyServiceNamespace string `yaml:"-"`

// Name of the envoy service
envoyServiceName string `yaml:"-"`
}

// newServeContext returns a serveContext initialized to defaults.
Expand Down Expand Up @@ -165,6 +173,8 @@ func newServeContext() *serveContext {
Name: "leader-elect",
},
UseExperimentalServiceAPITypes: false,
envoyServiceName: "envoy",
envoyServiceNamespace: getEnv("CONTOUR_NAMESPACE", "projectcontour"),
}
}

Expand Down
7 changes: 5 additions & 2 deletions examples/contour/02-rbac.yaml
Expand Up @@ -43,7 +43,7 @@ rules:
- list
- watch
- apiGroups:
- extensions
- "networking.k8s.io"
resources:
- ingresses
verbs:
Expand All @@ -53,11 +53,14 @@ rules:
- apiGroups:
- "networking.k8s.io"
resources:
- ingresses
- "ingresses/status"
verbs:
- get
- list
- watch
- patch
- post
- update
- apiGroups: ["contour.heptio.com"]
resources: ["ingressroutes", "tlscertificatedelegations"]
verbs:
Expand Down
7 changes: 5 additions & 2 deletions examples/render/contour.yaml
Expand Up @@ -1429,7 +1429,7 @@ rules:
- list
- watch
- apiGroups:
- extensions
- "networking.k8s.io"
resources:
- ingresses
verbs:
Expand All @@ -1439,11 +1439,14 @@ rules:
- apiGroups:
- "networking.k8s.io"
resources:
- ingresses
- "ingresses/status"
verbs:
- get
- list
- watch
- patch
- post
- update
- apiGroups: ["contour.heptio.com"]
resources: ["ingressroutes", "tlscertificatedelegations"]
verbs:
Expand Down
2 changes: 2 additions & 0 deletions hack/release/prepare-release.go
@@ -1,3 +1,5 @@
// +build none

package main

import (
Expand Down
2 changes: 1 addition & 1 deletion internal/contour/handler.go
Expand Up @@ -223,7 +223,7 @@ func (e *EventHandler) updateDAG() {
e.Metrics.SetIngressRouteMetric(metrics)
e.Metrics.SetHTTPProxyMetric(proxymetrics)
default:
e.Debug("skipping status update: not the leader")
e.Debug("skipping metrics and CRD status update, not leader")
}
}

Expand Down
110 changes: 110 additions & 0 deletions internal/k8s/ingressstatus.go
@@ -0,0 +1,110 @@
// Copyright © 2020 VMware
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8s

import (
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
clientset "k8s.io/client-go/kubernetes"
)

// StatusLoadbalancerUpdater observes informer OnAdd events and
// updates the ingress.status.loadBalancer field on all Ingress
// objects that match the ingress class (if used).
type IngressStatusUpdater struct {
Client clientset.Interface
Logger logrus.FieldLogger
Status v1.LoadBalancerStatus
}

jpeach marked this conversation as resolved.
Show resolved Hide resolved
func (s *IngressStatusUpdater) OnAdd(obj interface{}) {
ing := obj.(*v1beta1.Ingress).DeepCopy()

// TODO(dfc) check ingress class

ing.Status.LoadBalancer = s.Status
_, err := s.Client.NetworkingV1beta1().Ingresses(ing.GetNamespace()).UpdateStatus(ing)
if err != nil {
s.Logger.
WithField("name", ing.GetName()).
WithField("namespace", ing.GetNamespace()).
WithError(err).Error("unable to update status")
}
}

func (s *IngressStatusUpdater) OnUpdate(oldObj, newObj interface{}) {
// Ignoring OnUpdate allows us to avoid the message generated
// from the status update.

// TODO(dfc) handle these cases:
// - OnUpdate transitions from an ingress class which is out of scope
// to one in scope.
// - OnUpdate transitions from an ingress class in scope to one out
// of scope.
}

func (s *IngressStatusUpdater) OnDelete(obj interface{}) {
// we don't need to update the status on resources that
// have been deleted.
}

// ServiceStatusLoadBalancerWatcher implements ResourceEventHandler and
// watches for changes to the status.loadbalancer field
type ServiceStatusLoadBalancerWatcher struct {
ServiceName string
LBStatus chan v1.LoadBalancerStatus
jpeach marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *ServiceStatusLoadBalancerWatcher) OnAdd(obj interface{}) {
svc, ok := obj.(*v1.Service)
if !ok {
// not a service
return
}
if svc.Name != s.ServiceName {
return
}
s.notify(svc.Status.LoadBalancer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is a method useful here? We could just

s.LBStatus <- svc.Status.LoadBalancer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this could boil down to:

if svc, ok := obj.(*v1.Service); ok && svc.Name == s.ServiceName {
        s.LBStatus <- svc.Status.LoadBalancer
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed about the method, it's not really necessary for now, unless we end up wanting to do something more clever than just send down the channel.

For the code though, I feel like having individual clauses is more verbose, but far easier to read. For lack of a better word, I'd call it more idiomatic.

}

func (s *ServiceStatusLoadBalancerWatcher) OnUpdate(oldObj, newObj interface{}) {
svc, ok := newObj.(*v1.Service)
if !ok {
// not a service
return
}
if svc.Name != s.ServiceName {
return
}
s.notify(svc.Status.LoadBalancer)
}

func (s *ServiceStatusLoadBalancerWatcher) OnDelete(obj interface{}) {
svc, ok := obj.(*v1.Service)
if !ok {
// not a service
return
}
if svc.Name != s.ServiceName {
return
}
s.notify(v1.LoadBalancerStatus{
Ingress: nil,
})
}

func (s *ServiceStatusLoadBalancerWatcher) notify(lbstatus v1.LoadBalancerStatus) {
s.LBStatus <- lbstatus
}