Skip to content

Commit

Permalink
Rebuild dynamic configuration on pod events
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinpollet committed May 7, 2020
1 parent 50641c4 commit d0ea25c
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 100 deletions.
38 changes: 24 additions & 14 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
splitlister "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/listers/split/v1alpha2"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)

// PortMapper is capable of storing and retrieving a port mapping for a given service.
Expand Down Expand Up @@ -64,9 +66,9 @@ type Config struct {
// Controller hold controller configuration.
type Controller struct {
cfg Config
handler *Handler
handler cache.ResourceEventHandler
serviceManager ServiceManager
configRefreshChan chan string
configRefreshChan chan struct{}
provider *provider.Provider
ignoredResources k8s.IgnoreWrapper
tcpStateTable PortMapper
Expand Down Expand Up @@ -135,9 +137,12 @@ func (c *Controller) init() {
c.ServiceLister = c.kubernetesFactory.Core().V1().Services().Lister()
c.serviceManager = NewShadowServiceManager(c.logger, c.ServiceLister, c.cfg.Namespace, c.tcpStateTable, c.udpStateTable, c.cfg.DefaultMode, c.cfg.MinHTTPPort, c.cfg.MaxHTTPPort, c.clients.GetKubernetesClient())

// configRefreshChan is used to trigger configuration refreshes and deploys.
c.configRefreshChan = make(chan string)
c.handler = NewHandler(c.logger, c.ignoredResources, c.serviceManager, c.configRefreshChan)
// configRefreshChan is used to trigger configuration refreshes.
c.configRefreshChan = make(chan struct{})
c.handler = cache.FilteringResourceEventHandler{
FilterFunc: c.isWatchedResource,
Handler: NewHandler(c.logger, c.serviceManager, c.configRefreshChan),
}

// Create listers and register the event handler to informers that are not ACL related.
c.PodLister = c.kubernetesFactory.Core().V1().Pods().Lister()
Expand All @@ -146,7 +151,6 @@ func (c *Controller) init() {

c.kubernetesFactory.Core().V1().Services().Informer().AddEventHandler(c.handler)
c.kubernetesFactory.Core().V1().Endpoints().Informer().AddEventHandler(c.handler)
c.kubernetesFactory.Core().V1().Pods().Informer().AddEventHandler(c.handler)
c.splitFactory.Split().V1alpha2().TrafficSplits().Informer().AddEventHandler(c.handler)

// Create SharedInformers, listers and register the event handler for ACL related resources.
Expand All @@ -159,6 +163,7 @@ func (c *Controller) init() {
c.TCPRouteLister = c.specsFactory.Specs().V1alpha1().TCPRoutes().Lister()

c.accessFactory.Access().V1alpha1().TrafficTargets().Informer().AddEventHandler(c.handler)
c.kubernetesFactory.Core().V1().Pods().Informer().AddEventHandler(c.handler)
c.specsFactory.Specs().V1alpha1().HTTPRouteGroups().Informer().AddEventHandler(c.handler)
c.specsFactory.Specs().V1alpha1().TCPRoutes().Informer().AddEventHandler(c.handler)
}
Expand All @@ -182,7 +187,6 @@ func (c *Controller) init() {
MaxHTTPPort: c.cfg.MaxHTTPPort,
ACL: c.cfg.ACLEnabled,
DefaultTrafficType: c.cfg.DefaultMode,
MaeshNamespace: c.cfg.Namespace,
}

c.provider = provider.New(c.tcpStateTable, c.udpStateTable, annotations.BuildMiddlewares, providerCfg, c.logger)
Expand Down Expand Up @@ -214,7 +218,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
case <-stopCh:
c.logger.Info("Shutting down workers")
return nil
case message := <-c.configRefreshChan:
case <-c.configRefreshChan:
// Reload the configuration.
topo, err := c.topologyBuilder.Build(c.ignoredResources)
if err != nil {
Expand All @@ -224,15 +228,14 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {

conf := c.provider.BuildConfig(topo)

if message == k8s.ConfigMessageChanForce || !reflect.DeepEqual(c.lastConfiguration.Get(), conf) {
if !reflect.DeepEqual(c.lastConfiguration.Get(), conf) {
c.lastConfiguration.Set(conf)

// Configuration successfully created, enable readiness in the api.
c.api.EnableReadiness()
}
case <-timer.C:
rawCfg := c.lastConfiguration.Get()
if rawCfg == nil {
if rawCfg := c.lastConfiguration.Get(); rawCfg == nil {
break
}

Expand Down Expand Up @@ -312,7 +315,14 @@ func (c *Controller) createMeshServices() error {
return nil
}

// isMeshPod checks if the pod is a mesh pod. Can be modified to use multiple metrics if needed.
func isMeshPod(pod *corev1.Pod) bool {
return pod.Labels["component"] == "maesh-mesh"
// isWatchedResource returns true if the given resource is not ignored, false otherwise.
func (c *Controller) isWatchedResource(obj interface{}) bool {
accessor, err := meta.Accessor(obj)
if err != nil {
return false
}

pMeta := meta.AsPartialObjectMetadata(accessor)

return !c.ignoredResources.IsIgnored(pMeta.ObjectMeta)
}
94 changes: 15 additions & 79 deletions pkg/controller/handler.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,43 @@
package controller

import (
"github.com/containous/maesh/pkg/k8s"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
)

// Handler is an implementation of a ResourceEventHandler.
type Handler struct {
log logrus.FieldLogger
ignored k8s.IgnoreWrapper
configRefreshChan chan string
configRefreshChan chan struct{}
serviceManager ServiceManager
}

// NewHandler creates a handler.
func NewHandler(log logrus.FieldLogger, ignored k8s.IgnoreWrapper, serviceManager ServiceManager, configRefreshChan chan string) *Handler {
h := &Handler{
func NewHandler(log logrus.FieldLogger, serviceManager ServiceManager, configRefreshChan chan struct{}) *Handler {
return &Handler{
log: log,
ignored: ignored,
configRefreshChan: configRefreshChan,
serviceManager: serviceManager,
}

if err := h.Init(); err != nil {
log.Errorln("Could not initialize MeshControllerHandler")
}

return h
}

// Init handles any handler initialization.
func (h *Handler) Init() error {
h.log.Debugln("MeshControllerHandler.Init")

return nil
}

// OnAdd executed when an object is added.
// OnAdd is called when an object is added.
func (h *Handler) OnAdd(obj interface{}) {
// assert the type to an object to pull out relevant data
switch obj := obj.(type) {
case *corev1.Service:
if h.ignored.IsIgnored(obj.ObjectMeta) {
return
}

// If the created object is a service we have to create a corresponding shadow service.
if obj, isService := obj.(*corev1.Service); isService {
if err := h.serviceManager.Create(obj); err != nil {
h.log.Errorf("Could not create mesh service: %v", err)
}
case *corev1.Endpoints:
return
case *corev1.Pod:
if !isMeshPod(obj) {
return
}
}

// Trigger a configuration rebuild.
h.configRefreshChan <- k8s.ConfigMessageChanRebuild
h.configRefreshChan <- struct{}{}
}

// OnUpdate executed when an object is updated.
// OnUpdate is called when an object is updated.
func (h *Handler) OnUpdate(oldObj, newObj interface{}) {
// Assert the type to an object to pull out relevant data.
switch obj := newObj.(type) {
case *corev1.Service:
if h.ignored.IsIgnored(obj.ObjectMeta) {
return
}

// If the updated object is a service we have to update the corresponding shadow service.
if obj, isService := newObj.(*corev1.Service); isService {
oldSvc, ok := oldObj.(*corev1.Service)
if !ok {
h.log.Errorf("Old object is not a kubernetes Service")
Expand All @@ -81,55 +49,23 @@ func (h *Handler) OnUpdate(oldObj, newObj interface{}) {
}

h.log.Debugf("MeshControllerHandler ObjectUpdated with type: *corev1.Service: %s/%s", obj.Namespace, obj.Name)
case *corev1.Endpoints:
// We can use the same ignore for services and endpoints.
if h.ignored.IsIgnored(obj.ObjectMeta) {
return
}

h.log.Debugf("MeshControllerHandler ObjectUpdated with type: *corev1.Endpoints: %s/%s", obj.Namespace, obj.Name)
case *corev1.Pod:
if !isMeshPod(obj) {
// We don't track updates of user pods, updates are done through endpoints.
return
}

h.log.Debugf("MeshControllerHandler ObjectUpdated with type: *corev1.Pod: %s/%s", obj.Namespace, obj.Name)
// Since this is a mesh pod update, trigger a force deploy.
h.configRefreshChan <- k8s.ConfigMessageChanForce

return
}

// Trigger a configuration rebuild.
h.configRefreshChan <- k8s.ConfigMessageChanRebuild
h.configRefreshChan <- struct{}{}
}

// OnDelete executed when an object is deleted.
// OnDelete is called when an object is deleted.
func (h *Handler) OnDelete(obj interface{}) {
// Assert the type to an object to pull out relevant data.
switch obj := obj.(type) {
case *corev1.Service:
if h.ignored.IsIgnored(obj.ObjectMeta) {
return
}

// If the deleted object is a service we have to delete the corresponding shadow service.
if obj, isService := obj.(*corev1.Service); isService {
h.log.Debugf("MeshControllerHandler ObjectDeleted with type: *corev1.Service: %s/%s", obj.Namespace, obj.Name)

if err := h.serviceManager.Delete(obj); err != nil {
h.log.Errorf("Could not delete mesh service: %v", err)
}
case *corev1.Endpoints:
// We can use the same ignore for services and endpoints.
if h.ignored.IsIgnored(obj.ObjectMeta) {
return
}

h.log.Debugf("MeshController ObjectDeleted with type: *corev1.Endpoints: %s/%s", obj.Namespace, obj.Name)
case *corev1.Pod:
return
}

// Trigger a configuration rebuild.
h.configRefreshChan <- k8s.ConfigMessageChanRebuild
h.configRefreshChan <- struct{}{}
}
5 changes: 0 additions & 5 deletions pkg/k8s/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,4 @@ const (
TCPStateConfigMapName string = "tcp-state-table"
// UDPStateConfigMapName UDP config map name.
UDPStateConfigMapName string = "udp-state-table"

// ConfigMessageChanRebuild rebuild.
ConfigMessageChanRebuild string = "rebuild"
// ConfigMessageChanForce force.
ConfigMessageChanForce string = "force"
)
1 change: 0 additions & 1 deletion pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type Config struct {
MaxHTTPPort int32
ACL bool
DefaultTrafficType string
MaeshNamespace string
}

// Provider holds the configuration for generating dynamic configuration from a kubernetes cluster state.
Expand Down
1 change: 0 additions & 1 deletion pkg/provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func TestProvider_BuildConfig(t *testing.T) {
MaxHTTPPort: 10010,
ACL: test.acl,
DefaultTrafficType: defaultTrafficType,
MaeshNamespace: "maesh",
}

tcpStateTable := func(port mk8s.ServiceWithPort) (int32, bool) {
Expand Down

0 comments on commit d0ea25c

Please sign in to comment.