Skip to content

Commit

Permalink
feat: gateway ha
Browse files Browse the repository at this point in the history
  • Loading branch information
oilbeater committed Sep 24, 2019
1 parent b61c171 commit 4246cb7
Show file tree
Hide file tree
Showing 9 changed files with 365 additions and 61 deletions.
2 changes: 1 addition & 1 deletion docs/kubectl-plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ To enable kubectl plugin, kubectl version of 1.12 or later is recommended. You c

1. Get the `kubectl-ko` file
```bash
wget https://github.com/alauda/kube-ovn/blob/master/dist/images/kubectl-ko
wget https://raw.githubusercontent.com/alauda/kube-ovn/master/dist/images/kubectl-ko
```

2. Move the file to one of $PATH directories
Expand Down
2 changes: 1 addition & 1 deletion docs/subnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ For a distributed Gateway, outgoing traffic from Pods within the OVN network to
For a centralized gateway, outgoing traffic from Pods within the OVN network to external destinations will go through Gateway Node for the Namespace.

- `gatewayType`: `distributed` or `centralized`, default is `distributed`.
- `gatewayNode`: when `gatewayType` is `centralized` used this field to specify which node act as the namespace gateway.
- `gatewayNode`: when `gatewayType` is `centralized` used this field to specify which node act as the namespace gateway. This field can be a comma separated string, like `node1,node2` and kube-ovn will automatically apply an active-backup failover strategy.
- `natOutgoing`: `true` or `false`, whether pod ip need to be masqueraded when go through gateway. When `false`, pod ip will be exposed to external network directly, default `false`.
8 changes: 4 additions & 4 deletions pkg/apis/kubeovn/v1/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,22 @@ func (m *SubnetStatus) ConditionReason(ctype ConditionType) string {
return ""
}

// Ready - shortcut to set ready contition to true
// Ready - shortcut to set ready condition to true
func (m *SubnetStatus) Ready(reason, message string) {
m.SetCondition(Ready, reason, message)
}

// NotReady - shortcut to set ready contition to false
// NotReady - shortcut to set ready condition to false
func (m *SubnetStatus) NotReady(reason, message string) {
m.ClearCondition(Ready, reason, message)
}

// Validated - shortcut to set validated contition to true
// Validated - shortcut to set validated condition to true
func (m *SubnetStatus) Validated(reason, message string) {
m.SetCondition(Validated, reason, message)
}

// NotValidated - shortcut to set validated contition to false
// NotValidated - shortcut to set validated condition to false
func (m *SubnetStatus) NotValidated(reason, message string) {
m.ClearCondition(Validated, reason, message)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/apis/kubeovn/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ type SubnetStatus struct {
// +patchStrategy=merge
Conditions []SubnetCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`

AvailableIPs uint64 `json:"availableIPs"`
UsingIPs uint64 `json:"usingIPs"`
AvailableIPs uint64 `json:"availableIPs"`
UsingIPs uint64 `json:"usingIPs"`
ActivateGateway string `json:"activateGateway"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
9 changes: 9 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Controller struct {
subnetSynced cache.InformerSynced
addSubnetQueue workqueue.RateLimitingInterface
deleteSubnetQueue workqueue.RateLimitingInterface
deleteRouteQueue workqueue.RateLimitingInterface
updateSubnetQueue workqueue.RateLimitingInterface
updateSubnetStatusQueue workqueue.RateLimitingInterface

Expand All @@ -59,6 +60,7 @@ type Controller struct {
nodesLister v1.NodeLister
nodesSynced cache.InformerSynced
addNodeQueue workqueue.RateLimitingInterface
updateNodeQueue workqueue.RateLimitingInterface
deleteNodeQueue workqueue.RateLimitingInterface

servicesLister v1.ServiceLister
Expand Down Expand Up @@ -117,6 +119,7 @@ func NewController(config *Configuration) *Controller {
subnetSynced: subnetInformer.Informer().HasSynced,
addSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddSubnet"),
deleteSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteSubnet"),
deleteRouteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteRoute"),
updateSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnet"),
updateSubnetStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnetStatus"),

Expand All @@ -136,6 +139,7 @@ func NewController(config *Configuration) *Controller {
nodesLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
addNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddNode"),
updateNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNode"),
deleteNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNode"),

servicesLister: serviceInformer.Lister(),
Expand Down Expand Up @@ -173,6 +177,7 @@ func NewController(config *Configuration) *Controller {

nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueAddNode,
UpdateFunc: controller.enqueueUpdateNode,
DeleteFunc: controller.enqueueDeleteNode,
})

Expand Down Expand Up @@ -223,9 +228,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
defer c.addSubnetQueue.ShutDown()
defer c.updateSubnetQueue.ShutDown()
defer c.deleteSubnetQueue.ShutDown()
defer c.deleteRouteQueue.ShutDown()
defer c.updateSubnetStatusQueue.ShutDown()

defer c.addNodeQueue.ShutDown()
defer c.updateNodeQueue.ShutDown()
defer c.deleteNodeQueue.ShutDown()

defer c.deleteTcpServiceQueue.ShutDown()
Expand Down Expand Up @@ -279,9 +286,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {

go wait.Until(c.runDeleteSubnetWorker, time.Second, stopCh)
go wait.Until(c.runUpdateSubnetWorker, time.Second, stopCh)
go wait.Until(c.runDeleteRouteWorker, time.Second, stopCh)
go wait.Until(c.runUpdateSubnetStatusWorker, time.Second, stopCh)

go wait.Until(c.runAddNodeWorker, time.Second, stopCh)
go wait.Until(c.runUpdateNodeWorker, time.Second, stopCh)
go wait.Until(c.runDeleteNodeWorker, time.Second, stopCh)

go wait.Until(c.runUpdateServiceWorker, time.Second, stopCh)
Expand Down
110 changes: 110 additions & 0 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
Expand All @@ -31,6 +32,35 @@ func (c *Controller) enqueueAddNode(obj interface{}) {
c.addNodeQueue.AddRateLimited(key)
}

func nodeReady(node *v1.Node) bool {
for _, con := range node.Status.Conditions {
if con.Type == v1.NodeReady && con.Status == v1.ConditionTrue {
return true
}
}
return false
}

func (c *Controller) enqueueUpdateNode(oldObj, newObj interface{}) {
if !c.isLeader() {
return
}

oldNode := oldObj.(*v1.Node)
newNode := newObj.(*v1.Node)

if nodeReady(oldNode) != nodeReady(newNode) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(newObj); err != nil {
utilruntime.HandleError(err)
return
}
klog.V(3).Infof("enqueue update node %s", key)
c.updateNodeQueue.AddRateLimited(key)
}
}

func (c *Controller) enqueueDeleteNode(obj interface{}) {
if !c.isLeader() {
return
Expand All @@ -50,6 +80,11 @@ func (c *Controller) runAddNodeWorker() {
}
}

func (c *Controller) runUpdateNodeWorker() {
for c.processNextUpdateNodeWorkItem() {
}
}

func (c *Controller) runDeleteNodeWorker() {
for c.processNextDeleteNodeWorkItem() {
}
Expand Down Expand Up @@ -86,6 +121,37 @@ func (c *Controller) processNextAddNodeWorkItem() bool {
return true
}

func (c *Controller) processNextUpdateNodeWorkItem() bool {
obj, shutdown := c.updateNodeQueue.Get()

if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.updateNodeQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.updateNodeQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.handleUpdateNode(key); err != nil {
c.updateNodeQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.updateNodeQueue.Forget(obj)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}

func (c *Controller) processNextDeleteNodeWorkItem() bool {
obj, shutdown := c.deleteNodeQueue.Get()

Expand Down Expand Up @@ -259,6 +325,50 @@ func (c *Controller) handleDeleteNode(key string) error {
return nil
}

func (c *Controller) handleUpdateNode(key string) error {
node, err := c.nodesLister.Get(key)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get subnets %v", err)
return err
}

if nodeReady(node) {
for _, subnet := range subnets {
if subnet.Status.ActivateGateway == "" && gatewayContains(subnet.Spec.GatewayNode, node.Name) {
if err := c.reconcileCentralizedGateway(subnet); err != nil {
return err
}
}
}
} else {
for _, subnet := range subnets {
if subnet.Status.ActivateGateway == node.Name {
if err := c.reconcileCentralizedGateway(subnet); err != nil {
return err
}
}
}
}
return nil
}

func gatewayContains(gatewayNodeStr, gateway string) bool {
for _, gw := range strings.Split(gatewayNodeStr, ",") {
gw = strings.TrimSpace(gw)
if gw == gateway {
return true
}
}
return false
}

func getNodeInternalIP(node *v1.Node) string {
var nodeAddr string
for _, addr := range node.Status.Addresses {
Expand Down
19 changes: 1 addition & 18 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,7 @@ func (c *Controller) handleUpdatePod(key string) error {
}
}

switch subnet.Spec.GatewayType {
case "", kubeovnv1.GWDistributedType:
if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
node, err := c.nodesLister.Get(pod.Spec.NodeName)
if err != nil {
klog.Errorf("get node %s failed %v", pod.Spec.NodeName, err)
Expand All @@ -666,22 +665,6 @@ func (c *Controller) handleUpdatePod(key string) error {
if err := c.ovnClient.AddStaticRouter(ovs.PolicySrcIP, pod.Status.PodIP, nodeTunlIPAddr.String(), c.config.ClusterRouter); err != nil {
return errors.Annotate(err, "add static route failed")
}
case kubeovnv1.GWCentralizedType:
node, err := c.nodesLister.Get(subnet.Spec.GatewayNode)
if err != nil {
klog.Errorf("get node %s failed %v", pod.Spec.NodeName, err)
return err
}
nodeTunlIPAddr, err := getNodeTunlIP(node)
if err != nil {
return err
}
if err := c.ovnClient.DeleteStaticRouter(pod.Status.PodIP, c.config.ClusterRouter); err != nil {
return errors.Annotate(err, "del static route failed")
}
if err := c.ovnClient.AddStaticRouter(ovs.PolicySrcIP, pod.Status.PodIP, nodeTunlIPAddr.String(), c.config.ClusterRouter); err != nil {
return errors.Annotate(err, "add static route failed")
}
}
return nil
}
Expand Down

0 comments on commit 4246cb7

Please sign in to comment.