Skip to content

Commit

Permalink
ha for kube-ovn-controller
Browse files Browse the repository at this point in the history
  • Loading branch information
halfcrazy committed Apr 22, 2019
1 parent b7d0f59 commit 12a4bec
Show file tree
Hide file tree
Showing 19 changed files with 970 additions and 21 deletions.
6 changes: 5 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/alauda/kube-ovn/pkg/controller"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/klog"
"k8s.io/sample-controller/pkg/signals"
)
Expand Down Expand Up @@ -50,9 +49,7 @@ func main() {
os.Exit(1)
}

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(config.KubeClient, time.Second*30)
ctl := controller.NewController(config, kubeInformerFactory)
kubeInformerFactory.Start(stopCh)
ctl := controller.NewController(config)
ctl.Run(stopCh)
}

Expand Down
2 changes: 1 addition & 1 deletion dist/images/start-controller.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env bash
set -euo pipefail
export OVN_NB_DAEMON=$(ovn-nbctl --db=tcp:${OVN_NB_SERVICE_HOST}:${OVN_NB_SERVICE_PORT} --pidfile --detach)
./kube-ovn-controller --ovn-nb-host=${OVN_NB_SERVICE_HOST} --ovn-nb-port=${OVN_NB_SERVICE_PORT} $@
exec ./kube-ovn-controller --ovn-nb-host=${OVN_NB_SERVICE_HOST} --ovn-nb-port=${OVN_NB_SERVICE_PORT} $@
6 changes: 6 additions & 0 deletions pkg/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"flag"
"os"

"github.com/spf13/pflag"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -30,6 +31,9 @@ type Configuration struct {

ClusterTcpLoadBalancer string
ClusterUdpLoadBalancer string

PodName string
PodNamespace string
}

// TODO: validate configuration
Expand Down Expand Up @@ -83,6 +87,8 @@ func ParseFlags() (*Configuration, error) {
NodeSwitchGateway: *argNodeSwitchGateway,
ClusterTcpLoadBalancer: *argClusterTcpLoadBalancer,
ClusterUdpLoadBalancer: *argClusterUdpLoadBalancer,
PodName: os.Getenv("POD_NAME"),
PodNamespace: os.Getenv("KUBE_NAMESPACE"),
}
err := config.initKubeClient()
if err != nil {
Expand Down
40 changes: 36 additions & 4 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
Expand Down Expand Up @@ -61,13 +63,14 @@ type Controller struct {
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder

informerFactory informers.SharedInformerFactory

elector *leaderelection.LeaderElector
}

// NewController returns a new ovn controller
func NewController(
config *Configuration,
informerFactory informers.SharedInformerFactory) *Controller {

func NewController(config *Configuration) *Controller {
// Create event broadcaster
// Add ovn-controller types to the default Kubernetes Scheme so Events can be
// logged for ovn-controller types.
Expand All @@ -78,6 +81,8 @@ func NewController(
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

informerFactory := kubeinformers.NewSharedInformerFactory(config.KubeClient, time.Second*30)

podInformer := informerFactory.Core().V1().Pods()
namespaceInformer := informerFactory.Core().V1().Namespaces()
nodeInformer := informerFactory.Core().V1().Nodes()
Expand Down Expand Up @@ -116,6 +121,8 @@ func NewController(
updateEndpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateEndpoint"),

recorder: recorder,

informerFactory: informerFactory,
}

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -161,6 +168,23 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
// Start the informer factories to begin populating the informer caches
klog.Info("Starting OVN controller")

// leader election
elector := setupLeaderElection(&leaderElectionConfig{
Client: c.config.KubeClient,
ElectionID: "ovn-config",
PodName: c.config.PodName,
PodNamespace: c.config.PodNamespace,
})
c.elector = elector
for {
klog.Info("waiting for a leader")
if c.hasLeader() {
break
}
time.Sleep(1 * time.Second)
}
c.informerFactory.Start(stopCh)

// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.podsSynced, c.namespacesSynced, c.nodesSynced, c.serviceSynced, c.endpointsSynced); !ok {
Expand Down Expand Up @@ -192,3 +216,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {

return nil
}

func (c *Controller) isLeader() bool {
return c.elector.IsLeader()
}

func (c *Controller) hasLeader() bool {
return c.elector.GetLeader() != ""
}
114 changes: 114 additions & 0 deletions pkg/controller/election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package controller

import (
"context"
"os"
"time"

"k8s.io/klog"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
)

const ovnLeaderElector = "ovn-controller-leader-elector"

type leaderElectionConfig struct {
PodName string
PodNamespace string

Client clientset.Interface

ElectionID string

OnStartedLeading func(chan struct{})
OnStoppedLeading func()
OnNewLeader func(identity string)
}

func setupLeaderElection(config *leaderElectionConfig) *leaderelection.LeaderElector {
var elector *leaderelection.LeaderElector

// start a new context
ctx := context.Background()

var cancelContext context.CancelFunc

var newLeaderCtx = func(ctx context.Context) context.CancelFunc {
// allow to cancel the context in case we stop being the leader
leaderCtx, cancel := context.WithCancel(ctx)
go elector.Run(leaderCtx)
return cancel
}

var stopCh chan struct{}
callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Infof("I am the new leader")
stopCh = make(chan struct{})

if config.OnStartedLeading != nil {
config.OnStartedLeading(stopCh)
}
},
OnStoppedLeading: func() {
klog.Info("I am not leader anymore")
close(stopCh)

// cancel the context
cancelContext()

cancelContext = newLeaderCtx(ctx)

if config.OnStoppedLeading != nil {
config.OnStoppedLeading()
}
},
OnNewLeader: func(identity string) {
klog.Infof("new leader elected: %v", identity)
if config.OnNewLeader != nil {
config.OnNewLeader(identity)
}
},
}

broadcaster := record.NewBroadcaster()
hostname, _ := os.Hostname()

recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: ovnLeaderElector,
Host: hostname,
})

lock := resourcelock.ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{Namespace: config.PodNamespace, Name: config.ElectionID},
Client: config.Client.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: config.PodName,
EventRecorder: recorder,
},
}

var err error
ttl := 8 * time.Second

elector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: &lock,
LeaseDuration: ttl,
RenewDeadline: ttl / 2,
RetryPeriod: ttl / 4,

Callbacks: callbacks,
})
if err != nil {
klog.Fatalf("unexpected error starting leader election: %v", err)
}

cancelContext = newLeaderCtx(ctx)
return elector
}
13 changes: 11 additions & 2 deletions pkg/controller/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,33 @@ package controller

import (
"fmt"
"k8s.io/api/core/v1"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"strings"
)

func (c *Controller) enqueueAddEndpoint(obj interface{}) {
if !c.isLeader() {
return
}
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
klog.V(5).Infof("enqueue add endpoint %s", key)
c.updateEndpointQueue.AddRateLimited(key)
}

func (c *Controller) enqueueUpdateEndpoint(old, new interface{}) {
if !c.isLeader() {
return
}
oldEp := old.(*v1.Endpoints)
newEp := new.(*v1.Endpoints)
if oldEp.ResourceVersion == newEp.ResourceVersion {
Expand All @@ -37,6 +45,7 @@ func (c *Controller) enqueueUpdateEndpoint(old, new interface{}) {
utilruntime.HandleError(err)
return
}
klog.V(5).Infof("enqueue update endpoint %s", key)
c.updateEndpointQueue.AddRateLimited(key)
}

Expand Down
20 changes: 18 additions & 2 deletions pkg/controller/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,54 @@ package controller

import (
"fmt"
"strings"

"github.com/alauda/kube-ovn/pkg/util"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"strings"
)

func (c *Controller) enqueueAddNamespace(obj interface{}) {
if !c.isLeader() {
return
}
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
klog.V(5).Infof("enqueue add namespace %s", key)
c.addNamespaceQueue.AddRateLimited(key)
}

func (c *Controller) enqueueDeleteNamespace(obj interface{}) {
if !c.isLeader() {
return
}
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
klog.V(5).Infof("enqueue delete namespace %s", key)
c.deleteNamespaceQueue.AddRateLimited(key)
}

func (c *Controller) enqueueUpdateNamespace(old, new interface{}) {
if !c.isLeader() {
return
}
oldNs := old.(*v1.Namespace)
newNs := new.(*v1.Namespace)
if oldNs.ResourceVersion == newNs.ResourceVersion {
return
}
if oldNs.Annotations[util.PrivateSwitchAnnotation] != newNs.Annotations[util.PrivateSwitchAnnotation] ||
oldNs.Annotations[util.AllowAccessAnnotation] != newNs.Annotations[util.AllowAccessAnnotation] {
var key string
Expand All @@ -43,6 +58,7 @@ func (c *Controller) enqueueUpdateNamespace(old, new interface{}) {
utilruntime.HandleError(err)
return
}
klog.V(5).Infof("enqueue update namespace %s", key)
c.updateNamespaceQueue.AddRateLimited(key)
}
}
Expand Down
Loading

0 comments on commit 12a4bec

Please sign in to comment.