Skip to content

Commit

Permalink
feat: support pod snat
Browse files Browse the repository at this point in the history
  • Loading branch information
oilbeater committed Sep 8, 2020
1 parent 7a60b56 commit 2b2e7a9
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 31 deletions.
2 changes: 2 additions & 0 deletions dist/images/.dockerignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
install.sh
install-pre-1.16.sh
ovn-ic-db-docker.sh
cleanup.sh
kubectl-ko
*.yaml
20 changes: 19 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ type Controller struct {
updateNpQueue workqueue.RateLimitingInterface
deleteNpQueue workqueue.RateLimitingInterface

configMapsLister v1.ConfigMapLister
configMapsSynced cache.InformerSynced

recorder record.EventRecorder
informerFactory informers.SharedInformerFactory
cmInformerFactory informers.SharedInformerFactory
kubeovnInformerFactory kubeovninformer.SharedInformerFactory
elector *leaderelection.LeaderElector
}
Expand All @@ -105,6 +109,10 @@ func NewController(config *Configuration) *Controller {
kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
listOption.AllowWatchBookmarks = true
}))
cmInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(config.KubeClient, 0,
kubeinformers.WithTweakListOptions(func(listOption *metav1.ListOptions) {
listOption.AllowWatchBookmarks = true
}), kubeinformers.WithNamespace(config.PodNamespace))
kubeovnInformerFactory := kubeovninformer.NewSharedInformerFactoryWithOptions(config.KubeOvnClient, 0,
kubeovninformer.WithTweakListOptions(func(listOption *metav1.ListOptions) {
listOption.AllowWatchBookmarks = true
Expand All @@ -119,6 +127,7 @@ func NewController(config *Configuration) *Controller {
serviceInformer := informerFactory.Core().V1().Services()
endpointInformer := informerFactory.Core().V1().Endpoints()
npInformer := informerFactory.Networking().V1().NetworkPolicies()
configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()

controller := &Controller{
config: config,
Expand Down Expand Up @@ -173,9 +182,13 @@ func NewController(config *Configuration) *Controller {
updateNpQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNp"),
deleteNpQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNp"),

configMapsLister: configMapInformer.Lister(),
configMapsSynced: configMapInformer.Informer().HasSynced,

recorder: recorder,

informerFactory: informerFactory,
cmInformerFactory: cmInformerFactory,
kubeovnInformerFactory: kubeovnInformerFactory,
}

Expand Down Expand Up @@ -247,10 +260,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) {

// Wait for the caches to be synced before starting workers
c.informerFactory.Start(stopCh)
c.cmInformerFactory.Start(stopCh)
c.kubeovnInformerFactory.Start(stopCh)

klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.subnetSynced, c.ipSynced, c.vlanSynced, c.podsSynced, c.namespacesSynced, c.nodesSynced, c.serviceSynced, c.endpointsSynced, c.npsSynced); !ok {
if ok := cache.WaitForCacheSync(stopCh, c.subnetSynced, c.ipSynced, c.vlanSynced, c.podsSynced, c.namespacesSynced, c.nodesSynced, c.serviceSynced, c.endpointsSynced, c.npsSynced, c.configMapsSynced); !ok {
klog.Fatalf("failed to wait for caches to sync")
}

Expand Down Expand Up @@ -377,6 +391,10 @@ func (c *Controller) startWorkers(stopCh <-chan struct{}) {
c.resyncInterConnection()
}, 30*time.Second, stopCh)

go wait.Until(func() {
c.resyncExternalGateway()
}, 30*time.Second, stopCh)

go wait.Until(func() {
if err := c.markAndCleanLSP(); err != nil {
klog.Errorf("gc lsp error %v", err)
Expand Down
139 changes: 139 additions & 0 deletions pkg/controller/external-gw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package controller

import (
"encoding/json"
"fmt"
"github.com/alauda/kube-ovn/pkg/util"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
"reflect"
"strings"
)

var (
exGwEnabled = "unknown"
lastExGwCM map[string]string = nil
)

func (c *Controller) resyncExternalGateway() {
cm, err := c.configMapsLister.ConfigMaps(c.config.PodNamespace).Get(util.ExternalGatewayConfig)
if err != nil && !k8serrors.IsNotFound(err) {
klog.Errorf("failed to get ovn-external-gw-config, %v", err)
return
}

if k8serrors.IsNotFound(err) || cm.Data["enable-external-gw"] == "false" {
if exGwEnabled == "false" {
return
}
klog.Info("start to remove ovn external gw")
if err := c.removeExternalGateway(); err != nil {
klog.Errorf("failed to remove ovn external gw, %v", err)
return
}
exGwEnabled = "false"
lastExGwCM = nil
klog.Info("finish remove ovn external gw")
return
} else {
if exGwEnabled == "true" && lastExGwCM != nil && reflect.DeepEqual(cm.Data, lastExGwCM) {
return
}
klog.Info("start to establish ovn external gw")
if err := c.establishExternalGateway(cm.Data); err != nil {
klog.Errorf("failed to establish ovn-external-gw, %v", err)
return
}
exGwEnabled = "true"
lastExGwCM = cm.Data
klog.Info("finish establishing ovn external gw")
}
}

func (c *Controller) removeExternalGateway() error {
sel, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{util.ExGatewayLabel: "true"}})
nodes, err := c.nodesLister.List(sel)
if err != nil {
klog.Errorf("failed to list nodes, %v", err)
return err
}
for _, no := range nodes {
patchPayloadTemplate :=
`[{
"op": "%s",
"path": "/metadata/labels",
"value": %s
}]`
op := "replace"
if len(no.Labels) == 0 {
op = "add"
}
no.Labels[util.ExGatewayLabel] = "false"
raw, _ := json.Marshal(no.Labels)
patchPayload := fmt.Sprintf(patchPayloadTemplate, op, raw)
_, err = c.config.KubeClient.CoreV1().Nodes().Patch(no.Name, types.JSONPatchType, []byte(patchPayload))
if err != nil {
klog.Errorf("patch external gw node %s failed %v", no.Name, err)
return err
}
}

if err := c.ovnClient.DeleteGatewaySwitch(util.ExternalGatewaySwitch); err != nil {
klog.Errorf("failed to delete external gateway switch, %v", err)
return err
}
return nil
}

func (c *Controller) establishExternalGateway(config map[string]string) error {
chassises := []string{}
gwNodes := strings.Split(config["external-gw-nodes"], ",")
for _, gw := range gwNodes {
gw = strings.TrimSpace(gw)
node, err := c.nodesLister.Get(gw)
if err != nil {
klog.Errorf("failed to get gw node %s, %v", gw, err)
return err
}
patchPayloadTemplate :=
`[{
"op": "%s",
"path": "/metadata/labels",
"value": %s
}]`
op := "replace"
if len(node.Labels) == 0 {
op = "add"
}
node.Labels[util.ExGatewayLabel] = "true"
raw, _ := json.Marshal(node.Labels)
patchPayload := fmt.Sprintf(patchPayloadTemplate, op, raw)
_, err = c.config.KubeClient.CoreV1().Nodes().Patch(gw, types.JSONPatchType, []byte(patchPayload))
if err != nil {
klog.Errorf("patch external gw node %s failed %v", gw, err)
return err
}
chassisID, err := c.ovnClient.GetChassis(gw)
if err != nil {
klog.Errorf("failed to get external gw %s chassisID, %v", gw, err)
return err
}
if chassisID == "" {
return fmt.Errorf("no chassisID for external gw %s", gw)
}
chassises = append(chassises, chassisID)
}
if len(chassises) == 0 {
klog.Error("no available external gw")
return fmt.Errorf("no available external gw")
}

if err := c.ovnClient.CreateGatewaySwitch(util.ExternalGatewaySwitch, config["nic-ip"], config["nic-mac"], chassises); err != nil {
klog.Errorf("failed to create external gateway switch, %v", err)
return err
}

return nil
}
2 changes: 1 addition & 1 deletion pkg/controller/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *Controller) gcLogicalSwitch() error {
klog.Infof("ls in ovn %v", lss)
klog.Infof("subnet in kubernetes %v", subnetNames)
for _, ls := range lss {
if ls == "ts" {
if ls == util.InterconnectionSwitch || ls == util.ExternalGatewaySwitch {
continue
}
if !util.IsStringIn(ls, subnetNames) {
Expand Down
57 changes: 34 additions & 23 deletions pkg/controller/ovn-ic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,40 @@ import (
)

var (
icEnabled = false
lastCM map[string]string = nil
icEnabled = "unknown"
lastICCM map[string]string = nil
)

func (c *Controller) resyncInterConnection() {
cm, err := c.config.KubeClient.CoreV1().ConfigMaps("kube-system").Get("ovn-ic-config", metav1.GetOptions{})
cm, err := c.configMapsLister.ConfigMaps(c.config.PodNamespace).Get(util.InterconnectionConfig)
if err != nil && !k8serrors.IsNotFound(err) {
klog.Errorf("failed to get ovn-ic-config, %v", err)
return
}

if k8serrors.IsNotFound(err) || cm.Data["enable-ic"] == "false" {
if icEnabled == false {
if icEnabled == "false" {
return
}
klog.Info("start to remove ovn-ic")
if err := c.removeInterConnection(cm.Data); err != nil {
azName := ""
if cm != nil {
azName = cm.Data["az-name"]
}
if err := c.removeInterConnection(azName); err != nil {
klog.Errorf("failed to remove ovn-ic, %v", err)
return
}
icEnabled = false
lastCM = nil
icEnabled = "false"
lastICCM = nil
klog.Info("finish removing ovn-ic")
return
} else {
if icEnabled && lastCM != nil && reflect.DeepEqual(cm.Data, lastCM) {
if icEnabled == "true" && lastICCM != nil && reflect.DeepEqual(cm.Data, lastICCM) {
return
}

if err := c.removeInterConnection(cm.Data); err != nil {
if err := c.removeInterConnection(cm.Data["az-name"]); err != nil {
klog.Errorf("failed to remove ovn-ic, %v", err)
return
}
Expand All @@ -53,14 +58,15 @@ func (c *Controller) resyncInterConnection() {
klog.Errorf("failed to establish ovn-ic, %v", err)
return
}
icEnabled = true
lastCM = cm.Data
icEnabled = "true"
lastICCM = cm.Data
klog.Info("finish establishing ovn-ic")
return
}
}

func (c *Controller) removeInterConnection(config map[string]string) error {
sel, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{util.ICGatewayAnnotation: "true"}})
func (c *Controller) removeInterConnection(azName string) error {
sel, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{util.ICGatewayLabel: "true"}})
nodes, err := c.nodesLister.List(sel)
if err != nil {
klog.Errorf("failed to list nodes, %v", err)
Expand All @@ -77,19 +83,21 @@ func (c *Controller) removeInterConnection(config map[string]string) error {
if len(no.Labels) == 0 {
op = "add"
}
no.Labels[util.ICGatewayAnnotation] = "false"
no.Labels[util.ICGatewayLabel] = "false"
raw, _ := json.Marshal(no.Labels)
patchPayload := fmt.Sprintf(patchPayloadTemplate, op, raw)
_, err = c.config.KubeClient.CoreV1().Nodes().Patch(no.Name, types.JSONPatchType, []byte(patchPayload))
if err != nil {
klog.Errorf("patch gw node %s failed %v", no.Name, err)
klog.Errorf("patch ic gw node %s failed %v", no.Name, err)
return err
}
}

if err := c.ovnClient.DeleteICLogicalRouterPort(config["az-name"]); err != nil {
klog.Errorf("failed to delete ovn-ic lrp, %v", err)
return err
if azName != "" {
if err := c.ovnClient.DeleteICLogicalRouterPort(azName); err != nil {
klog.Errorf("failed to delete ovn-ic lrp, %v", err)
return err
}
}

if err := c.stopOVNIC(); err != nil {
Expand Down Expand Up @@ -126,7 +134,7 @@ func (c *Controller) establishInterConnection(config map[string]string) error {
gw = strings.TrimSpace(gw)
node, err := c.nodesLister.Get(gw)
if err != nil {
klog.Errorf("failed to get gw %s, %v", gw, err)
klog.Errorf("failed to get gw node %s, %v", gw, err)
return err
}
patchPayloadTemplate :=
Expand All @@ -139,7 +147,7 @@ func (c *Controller) establishInterConnection(config map[string]string) error {
if len(node.Labels) == 0 {
op = "add"
}
node.Labels[util.ICGatewayAnnotation] = "true"
node.Labels[util.ICGatewayLabel] = "true"
raw, _ := json.Marshal(node.Labels)
patchPayload := fmt.Sprintf(patchPayloadTemplate, op, raw)
_, err = c.config.KubeClient.CoreV1().Nodes().Patch(gw, types.JSONPatchType, []byte(patchPayload))
Expand All @@ -157,13 +165,16 @@ func (c *Controller) establishInterConnection(config map[string]string) error {
}
chassises = append(chassises, chassisID)
}

if len(chassises) == 0 {
klog.Error("no available ic gw")
return fmt.Errorf("noavailable ic gw")
}
if err := c.waitTsReady(); err != nil {
klog.Errorf("failed to wait ts ready, %v", err)
return err
}

subnet, err := c.acquireLrpAddress("ts")
subnet, err := c.acquireLrpAddress(util.InterconnectionSwitch)
if err != nil {
klog.Errorf("failed to acquire lrp address, %v", err)
return err
Expand Down Expand Up @@ -224,7 +235,7 @@ func (c *Controller) stopOVNIC() error {
func (c *Controller) waitTsReady() error {
retry := 6
for retry > 0 {
exists, err := c.ovnClient.LogicalSwitchExists("ts")
exists, err := c.ovnClient.LogicalSwitchExists(util.InterconnectionSwitch)
if err != nil {
klog.Errorf("failed to list logical switch, %v", err)
return err
Expand Down

0 comments on commit 2b2e7a9

Please sign in to comment.