Skip to content

Commit

Permalink
feat: validate namespace/pod annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
oilbeater committed May 15, 2019
1 parent 3fba272 commit 71c15d6
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 98 deletions.
2 changes: 1 addition & 1 deletion docs/high-available.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# High available for ovn db

ovs support clustered database. If want to use high-available database in kube-ovn,
modifie ovn-central deployment in yamls/ovn.yaml.
modify ovn-central deployment in yamls/ovn.yaml.

Change the replicas to 3, and add NODE_IPS environment var.
```yaml
Expand Down
4 changes: 2 additions & 2 deletions docs/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ For high-available ovn db, see [high available](high-available.md)

### Controller Configuration

```bash
```bash
--default-cidr: Default CIDR for Namespaces with no logical switch annotation, default: 10.16.0.0/16
--node-switch-cidr: The CIDR for the Node switch. Default: 100.64.0.0/16
```
```

## To uninstall

Expand Down
2 changes: 1 addition & 1 deletion docs/subnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ metadata:
ovn.kubernetes.io/cidr: 10.17.0.0/16
ovn.kubernetes.io/gateway: 10.17.0.1
ovn.kubernetes.io/logical_switch: ovn-subnet
ovn.kubernetes.io/exclude_ips: 10.17.0.0..10.17.0.10
ovn.kubernetes.io/exclude_ips: "192.168.0.4,192.168.0.30..192.168.0.60,192.168.0.110..192.168.0.120"
name: ovn-subnet
```

Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (c *Controller) handleAddNamespace(key string) error {
ls := ns.Annotations[util.LogicalSwitchAnnotation]
cidr := ns.Annotations[util.CidrAnnotation]
gateway := ns.Annotations[util.GatewayAnnotation]
excludeIps := ns.Annotations[util.ExcludeIpsAnnotation]
excludeIps := strings.Replace(ns.Annotations[util.ExcludeIpsAnnotation], ",", " ", -1)
private := ns.Annotations[util.PrivateSwitchAnnotation]
allow := ns.Annotations[util.AllowAccessAnnotation]

Expand All @@ -204,8 +204,10 @@ func (c *Controller) handleAddNamespace(key string) error {
}
}
if !exist {
if cidr == "" || gateway == "" {
return fmt.Errorf("cidr and gateway are required for namespace %s", key)
if err := util.ValidateLogicalSwitch(ns.Annotations); err != nil {
klog.Errorf("validate namespace %s failed, %v", key, err)
c.recorder.Eventf(ns, v1.EventTypeWarning, "ValidateLogicalSwitchFailed", err.Error())
return err
}
if excludeIps == "" {
excludeIps = gateway
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ func (c *Controller) handleAddPod(key string) error {
ls = c.config.DefaultLogicalSwitch
}

if err := util.ValidatePodNetwork(pod.Annotations); err != nil {
klog.Errorf("validate pod %s/%s failed, %v", namespace, name, err)
c.recorder.Eventf(pod, v1.EventTypeWarning, "ValidatePodNetworkFailed", err.Error())
return err
}

// pod address info may already exist in ovn
ip := pod.Annotations[util.IpAddressAnnotation]
mac := pod.Annotations[util.MacAddressAnnotation]
Expand Down
136 changes: 47 additions & 89 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"strings"

"github.com/alauda/kube-ovn/pkg/util"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -202,16 +201,6 @@ func (c *Controller) handleUpdateService(key string) error {
return err
}

if !svc.DeletionTimestamp.IsZero() {
if containsString(svc.Finalizers, util.ServiceAnnotation) {
svc.SetFinalizers(removeString(svc.Finalizers, util.ServiceAnnotation))
_, err = c.config.KubeClient.CoreV1().Services(namespace).Update(svc)
if err != nil {
return err
}
}
}

ip := svc.Spec.ClusterIP
if ip == "" || ip == v1.ClusterIPNone {
return nil
Expand All @@ -227,92 +216,71 @@ func (c *Controller) handleUpdateService(key string) error {
udpVips = append(udpVips, fmt.Sprintf("%s:%d", ip, port.Port))
}
}
// for service update
klog.Infof("update service %s/%s", namespace, name)
lbUuid, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
}
vips, err := c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get tcp lb vips %v", err)
return err
}
klog.Infof("exist tcp vips are %v", vips)
for _, vip := range tcpVips {
if _, ok := vips[vip]; !ok {
klog.Infof("add vip %s to tcp lb", vip)
c.updateEndpointQueue.AddRateLimited(key)
break
}
}

if !svc.DeletionTimestamp.IsZero() {
// for service deletion
klog.Infof("delete service %s/%s", namespace, name)
for _, vip := range tcpVips {
for vip := range vips {
if strings.HasPrefix(vip, ip) && !containsString(tcpVips, vip) {
klog.Infof("remove stall vip %s", vip)
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer)
if err != nil {
klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err)
klog.Errorf("failed to delete vip %s from tcp lb %v", vip, err)
return err
}
}
}

lbUuid, err = c.ovnClient.FindLoadbalancer(c.config.ClusterUdpLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
}
vips, err = c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get udp lb vips %v", err)
return err
}
klog.Infof("exist udp vips are %v", vips)
for _, vip := range udpVips {
if _, ok := vips[vip]; !ok {
klog.Infof("add vip %s to udp lb", vip)
c.updateEndpointQueue.AddRateLimited(key)
break
}
}

for _, vip := range udpVips {
for vip := range vips {
if strings.HasPrefix(vip, ip) && !containsString(udpVips, vip) {
klog.Infof("remove stall vip %s", vip)
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpLoadBalancer)
if err != nil {
klog.Errorf("failed to delete vip %s from udp lb, %v", vip, err)
klog.Errorf("failed to delete vip %s from udp lb %v", vip, err)
return err
}
}
} else {
// for service update
klog.Infof("update service %s/%s", namespace, name)
lbUuid, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
}
vips, err := c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get tcp lb vips %v", err)
return err
}
klog.Infof("exist tcp vips are %v", vips)
for _, vip := range tcpVips {
if _, ok := vips[vip]; !ok {
klog.Infof("add vip %s to tcp lb", vip)
c.updateEndpointQueue.AddRateLimited(key)
break
}
}

for vip := range vips {
if strings.HasPrefix(vip, ip) && !containsString(tcpVips, vip) {
klog.Infof("remove stall vip %s", vip)
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer)
if err != nil {
klog.Errorf("failed to delete vip %s from tcp lb %v", vip, err)
return err
}
}
}

lbUuid, err = c.ovnClient.FindLoadbalancer(c.config.ClusterUdpLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
}
vips, err = c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get udp lb vips %v", err)
return err
}
klog.Infof("exist udp vips are %v", vips)
for _, vip := range udpVips {
if _, ok := vips[vip]; !ok {
klog.Infof("add vip %s to udp lb", vip)
c.updateEndpointQueue.AddRateLimited(key)
break
}
}

for vip := range vips {
if strings.HasPrefix(vip, ip) && !containsString(udpVips, vip) {
klog.Infof("remove stall vip %s", vip)
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpLoadBalancer)
if err != nil {
klog.Errorf("failed to delete vip %s from udp lb %v", vip, err)
return err
}
}
}
}

return nil
}

//
// Helper functions to check and remove string from a slice of strings.
// Helper functions to check string from a slice of strings.
//
func containsString(slice []string, s string) bool {
for _, item := range slice {
Expand All @@ -322,13 +290,3 @@ func containsString(slice []string, s string) bool {
}
return false
}

func removeString(slice []string, s string) (result []string) {
for _, item := range slice {
if item == s {
continue
}
result = append(result, item)
}
return
}
18 changes: 18 additions & 0 deletions pkg/daemon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"net"
"time"

"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"

"github.com/alauda/kube-ovn/pkg/ovs"
"k8s.io/apimachinery/pkg/api/errors"

Expand All @@ -19,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand All @@ -38,12 +42,19 @@ type Controller struct {
podsSynced cache.InformerSynced
podQueue workqueue.RateLimitingInterface

recorder record.EventRecorder

ipSetsMgr *ipsets.IPSets
iptablesMgr *iptables.IPTables
}

// NewController init a daemon controller
func NewController(config *Configuration, informerFactory informers.SharedInformerFactory) (*Controller, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: config.NodeName})

namespaceInformer := informerFactory.Core().V1().Namespaces()
podInformer := informerFactory.Core().V1().Pods()
iptablesMgr, err := iptables.New()
Expand All @@ -63,6 +74,8 @@ func NewController(config *Configuration, informerFactory informers.SharedInform
podsSynced: podInformer.Informer().HasSynced,
podQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pod"),

recorder: recorder,

ipSetsMgr: ipsetsMgr,
iptablesMgr: iptablesMgr,
}
Expand Down Expand Up @@ -301,6 +314,11 @@ func (c *Controller) handlePod(key string) error {
}
return err
}
if err := util.ValidatePodNetwork(pod.Annotations); err != nil {
klog.Errorf("validate pod %s/%s failed, %v", namespace, name, err)
c.recorder.Eventf(pod, v1.EventTypeWarning, "ValidatePodNetworkFailed", err.Error())
return err
}
return ovs.SetPodBandwidth(pod.Name, pod.Namespace, pod.Annotations[util.IngressRateAnnotation], pod.Annotations[util.EgressRateAnnotation])
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/util/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ const (
LogicalSwitchAnnotation = "ovn.kubernetes.io/logical_switch"
ExcludeIpsAnnotation = "ovn.kubernetes.io/exclude_ips"

ServiceAnnotation = "ovn.kubernetes.io/service"

ProtocolTCP = "tcp"
ProtocolUDP = "udp"

Expand Down

0 comments on commit 71c15d6

Please sign in to comment.