Skip to content

Commit

Permalink
cmd/contour: add service status informer (#2386)
Browse files Browse the repository at this point in the history
Updates #403

Add informer to watch envoy's service document and pass the
status.loadbalancer stanza to the ingress status updater.

cmd/contour: add IngressStatusWriter

Wire up envoy service name and namespace to flags, defaults match the
example deployment.

examples/contour: add ingress status subresource to Contour's role

Signed-off-by: Dave Cheney <dave@cheney.net>
  • Loading branch information
davecheney authored and youngnick committed Mar 31, 2020
1 parent d4fb68d commit fc79b07
Show file tree
Hide file tree
Showing 8 changed files with 405 additions and 10 deletions.
94 changes: 94 additions & 0 deletions cmd/contour/ingressstatus.go
@@ -0,0 +1,94 @@
// Copyright © 2019 VMware
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"sync"

"github.com/projectcontour/contour/internal/k8s"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
)

// ingressStatusWriter manages the lifetime of StatusLoadBalancerUpdaters.
//
// The theory of operation of the ingressStatusWriter is as follows:
// 1. On startup the ingressStatusWriter waits to be elected leader.
// 2. Once elected leader, the ingressStatusWriter waits to receive a
// v1.LoadBalancerStatus value.
// 3. Once a v1.LoadBalancerStatus value has been received, any existing informer
// is stopped and a new informer started in its place.
// 4. Each informer is connected to a k8s.StatusLoadBalancerUpdater which reacts to
// OnAdd events for networking.k8s.io/ingress.v1beta1 objects. For each OnAdd
// the object is patched with the v1.LoadBalancerStatus value obtained on creation.
// OnUpdate and OnDelete events are ignored.If a new v1.LoadBalancerStatus value
// is been received, operation restarts at step 3.
// 5. If the worker is stopped, any existing informer is stopped before the worker stops.
type ingressStatusWriter struct {
log logrus.FieldLogger
clients *k8s.Clients
isLeader chan struct{}
lbStatus chan v1.LoadBalancerStatus
}

func (isw *ingressStatusWriter) Start(stop <-chan struct{}) error {

// await leadership election
isw.log.Info("awaiting leadership election")
select {
case <-stop:
// asked to stop before elected leader
return nil
case <-isw.isLeader:
isw.log.Info("elected leader")
}

var shutdown chan struct{}
var stopping sync.WaitGroup
for {
select {
case <-stop:
// stop existing informer and shut down
if shutdown != nil {
close(shutdown)
}
stopping.Wait()
return nil
case lbs := <-isw.lbStatus:
// stop existing informer
if shutdown != nil {
close(shutdown)
}
stopping.Wait()

// create informer for the new LoadBalancerStatus
factory := isw.clients.NewInformerFactory()
inf := factory.Networking().V1beta1().Ingresses().Informer()
log := isw.log.WithField("context", "IngressStatusLoadBalancerUpdater")
inf.AddEventHandler(&k8s.StatusLoadBalancerUpdater{
Client: isw.clients.ClientSet(),
Logger: log,
Status: lbs,
})

shutdown = make(chan struct{})
stopping.Add(1)
fn := startInformer(factory, log)
go func() {
defer stopping.Done()
fn(shutdown)
}()
}
}
}
26 changes: 23 additions & 3 deletions cmd/contour/serve.go
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/sirupsen/logrus"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
coreinformers "k8s.io/client-go/informers"
)

Expand Down Expand Up @@ -104,6 +105,8 @@ func registerServe(app *kingpin.Application) (*kingpin.CmdClause, *serveContext)
serve.Flag("envoy-service-https-address", "Kubernetes Service address for HTTPS requests.").StringVar(&ctx.httpsAddr)
serve.Flag("envoy-service-http-port", "Kubernetes Service port for HTTP requests.").IntVar(&ctx.httpPort)
serve.Flag("envoy-service-https-port", "Kubernetes Service port for HTTPS requests.").IntVar(&ctx.httpsPort)
serve.Flag("envoy-service-name", "Envoy Service Name.").StringVar(&ctx.envoyServiceName)
serve.Flag("envoy-service-namespace", "Envoy Service Namespace.").StringVar(&ctx.envoyServiceNamespace)
serve.Flag("use-proxy-protocol", "Use PROXY protocol for all listeners.").BoolVar(&ctx.useProxyProto)

serve.Flag("accesslog-format", "Format for Envoy access logs.").StringVar(&ctx.AccessLogFormat)
Expand Down Expand Up @@ -265,7 +268,24 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
// step 10. register leadership election
eventHandler.IsLeader = setupLeadershipElection(&g, log, ctx, clients, eventHandler.UpdateNow)

// step 12. create grpc handler and register with workgroup.
// step 11. set up ingress status writer
isw := ingressStatusWriter{
log: log.WithField("context", "ingressStatusWriter"),
clients: clients,
isLeader: eventHandler.IsLeader,
lbStatus: make(chan v1.LoadBalancerStatus, 1),
}
g.Add(isw.Start)

// step 12. register an informer to watch envoy's service.
ssw := &k8s.ServiceStatusLoadBalancerWatcher{
ServiceName: ctx.envoyServiceName,
LBStatus: isw.lbStatus,
}
factory := clients.NewInformerFactoryForNamespace(ctx.envoyServiceNamespace)
factory.Core().V1().Services().Informer().AddEventHandler(ssw)
g.Add(startInformer(factory, log.WithField("context", "serviceStatusLoadBalancerWatcher")))

g.Add(func(stop <-chan struct{}) error {
log := log.WithField("context", "grpc")

Expand Down Expand Up @@ -306,7 +326,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
return s.Serve(l)
})

// step 13. Setup SIGTERM handler
// step 14. Setup SIGTERM handler
g.Add(func(stop <-chan struct{}) error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
Expand All @@ -319,7 +339,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
return nil
})

// step 14. GO!
// GO!
return g.Run()
}

Expand Down
10 changes: 10 additions & 0 deletions cmd/contour/servecontext.go
Expand Up @@ -109,6 +109,14 @@ type serveContext struct {
// If the value is true, Contour will register for all the service-apis types
// (GatewayClass, Gateway, HTTPRoute, TCPRoute, and any more as they are added)
UseExperimentalServiceAPITypes bool `yaml:"-"`

// envoy service details

// Namespace of the envoy service
envoyServiceNamespace string `yaml:"-"`

// Name of the envoy service
envoyServiceName string `yaml:"-"`
}

// newServeContext returns a serveContext initialized to defaults.
Expand Down Expand Up @@ -165,6 +173,8 @@ func newServeContext() *serveContext {
Name: "leader-elect",
},
UseExperimentalServiceAPITypes: false,
envoyServiceName: "envoy",
envoyServiceNamespace: "projectcontour",
}
}

Expand Down
6 changes: 4 additions & 2 deletions examples/contour/02-rbac.yaml
Expand Up @@ -43,7 +43,7 @@ rules:
- list
- watch
- apiGroups:
- extensions
- "networking.k8s.io"
resources:
- ingresses
verbs:
Expand All @@ -53,11 +53,13 @@ rules:
- apiGroups:
- "networking.k8s.io"
resources:
- ingresses
- "ingresses/status"
verbs:
- get
- list
- watch
- patch
- post
- apiGroups: ["contour.heptio.com"]
resources: ["ingressroutes", "tlscertificatedelegations"]
verbs:
Expand Down
6 changes: 1 addition & 5 deletions examples/contour/03-envoy.yaml
@@ -1,16 +1,12 @@
---
apiVersion: apps/v1
kind: DaemonSet
kind: Deployment
metadata:
labels:
app: envoy
name: envoy
namespace: projectcontour
spec:
updateStrategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 10%
selector:
matchLabels:
app: envoy
Expand Down
2 changes: 2 additions & 0 deletions hack/release/prepare-release.go
@@ -1,3 +1,5 @@
// +build none

package main

import (
Expand Down
110 changes: 110 additions & 0 deletions internal/k8s/ingressstatus.go
@@ -0,0 +1,110 @@
// Copyright © 2020 VMware
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8s

import (
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
clientset "k8s.io/client-go/kubernetes"
)

// StatusLoadbalancerUpdater observes informer OnAdd events and
// updates the ingress.status.loadBalancer field on all Ingress
// objects that match the ingress class (if used).
type StatusLoadBalancerUpdater struct {
Client clientset.Interface
Logger logrus.FieldLogger
Status v1.LoadBalancerStatus
}

func (s *StatusLoadBalancerUpdater) OnAdd(obj interface{}) {
ing := obj.(*v1beta1.Ingress).DeepCopy()

// TODO(dfc) check ingress class

ing.Status.LoadBalancer = s.Status
_, err := s.Client.NetworkingV1beta1().Ingresses(ing.GetNamespace()).UpdateStatus(ing)
if err != nil {
s.Logger.
WithField("name", ing.GetName()).
WithField("namespace", ing.GetNamespace()).
WithError(err).Error("unable to update status")
}
}

func (s *StatusLoadBalancerUpdater) OnUpdate(oldObj, newObj interface{}) {
// Ignoring OnUpdate allows us to avoid the message generated
// from the status update.

// TODO(dfc) handle these cases:
// - OnUpdate transitions from an ingress class which is out of scope
// to one in scope.
// - OnUpdate transitions from an ingress class in scope to one out
// of scope.
}

func (s *StatusLoadBalancerUpdater) OnDelete(obj interface{}) {
// we don't need to update the status on resources that
// have been deleted.
}

// ServiceStatusLoadBalancerWatcher implements ResourceEventHandler and
// watches for changes to the status.loadbalancer field
type ServiceStatusLoadBalancerWatcher struct {
ServiceName string
LBStatus chan v1.LoadBalancerStatus
}

func (s *ServiceStatusLoadBalancerWatcher) OnAdd(obj interface{}) {
svc, ok := obj.(*v1.Service)
if !ok {
// not a service
return
}
if svc.Name != s.ServiceName {
return
}
s.notify(svc.Status.LoadBalancer)
}

func (s *ServiceStatusLoadBalancerWatcher) OnUpdate(oldObj, newObj interface{}) {
svc, ok := newObj.(*v1.Service)
if !ok {
// not a service
return
}
if svc.Name != s.ServiceName {
return
}
s.notify(svc.Status.LoadBalancer)
}

func (s *ServiceStatusLoadBalancerWatcher) OnDelete(obj interface{}) {
svc, ok := obj.(*v1.Service)
if !ok {
// not a service
return
}
if svc.Name != s.ServiceName {
return
}
s.notify(v1.LoadBalancerStatus{
Ingress: nil,
})
}

func (s *ServiceStatusLoadBalancerWatcher) notify(lbstatus v1.LoadBalancerStatus) {
s.LBStatus <- lbstatus
}

0 comments on commit fc79b07

Please sign in to comment.