Skip to content

Commit

Permalink
perf: optimize pod add
Browse files Browse the repository at this point in the history
1. increase client-go limit and use protobuf
2. avoid duplicated path request
3. remove wait=sb
  • Loading branch information
oilbeater committed Nov 8, 2019
1 parent 2085334 commit 0859da1
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 75 deletions.
16 changes: 10 additions & 6 deletions pkg/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,22 @@ func (config *Configuration) initKubeClient() error {
return err
}
}
kubeClient, err := kubernetes.NewForConfig(cfg)
cfg.QPS = 1000
cfg.Burst = 2000
kubeOvnClient, err := clientset.NewForConfig(cfg)
if err != nil {
klog.Errorf("init kubernetes client failed %v", err)
klog.Errorf("init kubeovn client failed %v", err)
return err
}
config.KubeClient = kubeClient
config.KubeOvnClient = kubeOvnClient

kubeOvnClient, err := clientset.NewForConfig(cfg)
cfg.ContentType = "application/vnd.kubernetes.protobuf"
cfg.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Errorf("init kubeovn client failed %v", err)
klog.Errorf("init kubernetes client failed %v", err)
return err
}
config.KubeOvnClient = kubeOvnClient
config.KubeClient = kubeClient
return nil
}
113 changes: 69 additions & 44 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"reflect"
"strconv"
"strings"
"time"

kubeovnv1 "github.com/alauda/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/alauda/kube-ovn/pkg/ovs"
Expand Down Expand Up @@ -177,6 +178,7 @@ func (c *Controller) processNextAddPodWorkItem() bool {
if shutdown {
return false
}
now := time.Now()

// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
Expand Down Expand Up @@ -219,7 +221,8 @@ func (c *Controller) processNextAddPodWorkItem() bool {
utilruntime.HandleError(err)
return true
}

last := time.Since(now)
klog.Infof("take %d ms to deal with add pod", last.Milliseconds())
return true
}

Expand Down Expand Up @@ -284,6 +287,7 @@ func (c *Controller) processNextDeletePodWorkItem() bool {
return false
}

now := time.Now()
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
Expand Down Expand Up @@ -325,7 +329,8 @@ func (c *Controller) processNextDeletePodWorkItem() bool {
utilruntime.HandleError(err)
return true
}

last := time.Since(now)
klog.Infof("take %d ms to deal with delete pod", last.Milliseconds())
return true
}

Expand Down Expand Up @@ -438,31 +443,41 @@ func (c *Controller) handleAddPod(key string) error {
return err
}

op := "replace"
if len(pod.Annotations) == 0 {
op = "add"
}
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[util.IpAddressAnnotation] = nic.IpAddress
pod.Annotations[util.MacAddressAnnotation] = nic.MacAddress
pod.Annotations[util.CidrAnnotation] = subnet.Spec.CIDRBlock
pod.Annotations[util.GatewayAnnotation] = subnet.Spec.Gateway
pod.Annotations[util.LogicalSwitchAnnotation] = subnet.Name
if pod.Annotations == nil ||
pod.Annotations[util.IpAddressAnnotation] != nic.IpAddress ||
pod.Annotations[util.MacAddressAnnotation] != nic.MacAddress ||
pod.Annotations[util.CidrAnnotation] != subnet.Spec.CIDRBlock ||
pod.Annotations[util.GatewayAnnotation] != subnet.Spec.Gateway ||
pod.Annotations[util.LogicalSwitchAnnotation] != subnet.Name {

patchPayloadTemplate :=
`[{
op := "replace"
if len(pod.Annotations) == 0 {
op = "add"
}
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[util.IpAddressAnnotation] = nic.IpAddress
pod.Annotations[util.MacAddressAnnotation] = nic.MacAddress
pod.Annotations[util.CidrAnnotation] = subnet.Spec.CIDRBlock
pod.Annotations[util.GatewayAnnotation] = subnet.Spec.Gateway
pod.Annotations[util.LogicalSwitchAnnotation] = subnet.Name

patchPayloadTemplate :=
`[{
"op": "%s",
"path": "/metadata/annotations",
"value": %s
}]`

raw, _ := json.Marshal(pod.Annotations)
patchPayload := fmt.Sprintf(patchPayloadTemplate, op, raw)
if _, err = c.config.KubeClient.CoreV1().Pods(namespace).Patch(name, types.JSONPatchType, []byte(patchPayload)); err != nil {
klog.Errorf("patch pod %s/%s failed %v", name, namespace, err)
return err
}]`

raw, _ := json.Marshal(pod.Annotations)
patchPayload := fmt.Sprintf(patchPayloadTemplate, op, raw)
go func() {
if _, err = c.config.KubeClient.CoreV1().Pods(namespace).Patch(name, types.JSONPatchType, []byte(patchPayload)); err != nil {
klog.Errorf("patch pod %s/%s failed %v", name, namespace, err)
c.addPodQueue.AddRateLimited(key)
}
}()
}

// In case update event might lost during leader election
Expand Down Expand Up @@ -565,31 +580,41 @@ func (c *Controller) handleAddIpPoolPod(key string) error {
return err
}

op := "replace"
if len(pod.Annotations) == 0 {
op = "add"
}
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[util.IpAddressAnnotation] = nic.IpAddress
pod.Annotations[util.MacAddressAnnotation] = nic.MacAddress
pod.Annotations[util.CidrAnnotation] = subnet.Spec.CIDRBlock
pod.Annotations[util.GatewayAnnotation] = subnet.Spec.Gateway
pod.Annotations[util.LogicalSwitchAnnotation] = subnet.Name
if pod.Annotations == nil ||
pod.Annotations[util.IpAddressAnnotation] != nic.IpAddress ||
pod.Annotations[util.MacAddressAnnotation] != nic.MacAddress ||
pod.Annotations[util.CidrAnnotation] != subnet.Spec.CIDRBlock ||
pod.Annotations[util.GatewayAnnotation] != subnet.Spec.Gateway ||
pod.Annotations[util.LogicalSwitchAnnotation] != subnet.Name {

patchPayloadTemplate :=
`[{
op := "replace"
if len(pod.Annotations) == 0 {
op = "add"
}
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[util.IpAddressAnnotation] = nic.IpAddress
pod.Annotations[util.MacAddressAnnotation] = nic.MacAddress
pod.Annotations[util.CidrAnnotation] = subnet.Spec.CIDRBlock
pod.Annotations[util.GatewayAnnotation] = subnet.Spec.Gateway
pod.Annotations[util.LogicalSwitchAnnotation] = subnet.Name

patchPayloadTemplate :=
`[{
"op": "%s",
"path": "/metadata/annotations",
"value": %s
}]`

raw, _ := json.Marshal(pod.Annotations)
patchPayload := fmt.Sprintf(patchPayloadTemplate, op, raw)
if _, err = c.config.KubeClient.CoreV1().Pods(namespace).Patch(name, types.JSONPatchType, []byte(patchPayload)); err != nil {
klog.Errorf("patch pod %s/%s failed %v", name, namespace, err)
return err
}]`

raw, _ := json.Marshal(pod.Annotations)
patchPayload := fmt.Sprintf(patchPayloadTemplate, op, raw)
go func() {
if _, err = c.config.KubeClient.CoreV1().Pods(namespace).Patch(name, types.JSONPatchType, []byte(patchPayload)); err != nil {
klog.Errorf("patch pod %s/%s failed %v", name, namespace, err)
c.addPodQueue.AddRateLimited(key)
}
}()
}

// In case update event might lost during leader election
Expand Down
16 changes: 10 additions & 6 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,20 +144,24 @@ func (config *Configuration) initKubeClient() error {
return err
}
}
cfg.QPS = 1000
cfg.Burst = 2000

kubeClient, err := kubernetes.NewForConfig(cfg)
kubeOvnClient, err := clientset.NewForConfig(cfg)
if err != nil {
klog.Errorf("init kubernetes client failed %v", err)
klog.Errorf("init kubeovn client failed %v", err)
return err
}
config.KubeClient = kubeClient
config.KubeOvnClient = kubeOvnClient

kubeOvnClient, err := clientset.NewForConfig(cfg)
cfg.ContentType = "application/vnd.kubernetes.protobuf"
cfg.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Errorf("init kubeovn client failed %v", err)
klog.Errorf("init kubernetes client failed %v", err)
return err
}
config.KubeOvnClient = kubeOvnClient
config.KubeClient = kubeClient
return nil
}

Expand Down
33 changes: 17 additions & 16 deletions pkg/ovs/ovn-nbctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c Client) ovnNbCommand(cmdArgs ...string) (string, error) {

// DeletePort delete logical switch port in ovn
func (c Client) DeletePort(port string) error {
_, err := c.ovnNbCommand(WaitSb, IfExists, "lsp-del", port)
_, err := c.ovnNbCommand(IfExists, "lsp-del", port)
if err != nil {
return fmt.Errorf("failed to delete port %s, %v", port, err)
}
Expand All @@ -37,7 +37,7 @@ func (c Client) DeletePort(port string) error {
// CreatePort create logical switch port in ovn
func (c Client) CreatePort(ls, port, ip, cidr, mac string) (*nic, error) {
if ip == "" && mac == "" {
_, err := c.ovnNbCommand(WaitSb, MayExist, "lsp-add", ls, port,
_, err := c.ovnNbCommand(MayExist, "lsp-add", ls, port,
"--", "set", "logical_switch_port", port, "addresses=dynamic")
if err != nil {
klog.Errorf("create port %s failed %v", port, err)
Expand All @@ -60,7 +60,7 @@ func (c Client) CreatePort(ls, port, ip, cidr, mac string) (*nic, error) {
// in this way we can deal with static ip with/without mask
ip = strings.Split(ip, "/")[0]

_, err := c.ovnNbCommand(WaitSb, MayExist, "lsp-add", ls, port, "--",
_, err := c.ovnNbCommand(MayExist, "lsp-add", ls, port, "--",
"lsp-set-addresses", port, fmt.Sprintf("%s %s", mac, ip))
if err != nil {
klog.Errorf("create port %s failed %v", port, err)
Expand Down Expand Up @@ -98,13 +98,13 @@ func (c Client) CreateLogicalSwitch(ls, protocol, subnet, gateway string, exclud
var err error
switch protocol {
case kubeovnv1.ProtocolIPv4:
_, err = c.ovnNbCommand(WaitSb, MayExist, "ls-add", ls, "--",
_, err = c.ovnNbCommand(MayExist, "ls-add", ls, "--",
"set", "logical_switch", ls, fmt.Sprintf("other_config:subnet=%s", subnet), "--",
"set", "logical_switch", ls, fmt.Sprintf("other_config:gateway=%s", gateway), "--",
"set", "logical_switch", ls, fmt.Sprintf("other_config:exclude_ips=%s", strings.Join(excludeIps, " ")), "--",
"acl-add", ls, "to-lport", util.NodeAllowPriority, fmt.Sprintf("ip4.src==%s", c.NodeSwitchCIDR), "allow-related")
case kubeovnv1.ProtocolIPv6:
_, err = c.ovnNbCommand(WaitSb, MayExist, "ls-add", ls, "--",
_, err = c.ovnNbCommand(MayExist, "ls-add", ls, "--",
"set", "logical_switch", ls, fmt.Sprintf("other_config:ipv6_prefix=%s", strings.Split(subnet, "/")[0]), "--",
"set", "logical_switch", ls, fmt.Sprintf("other_config:gateway=%s", gateway), "--",
"set", "logical_switch", ls, fmt.Sprintf("other_config:exclude_ips=%s", strings.Join(excludeIps, " ")), "--",
Expand Down Expand Up @@ -215,13 +215,13 @@ func (c Client) ListLogicalRouter() ([]string, error) {

// DeleteLogicalSwitch delete logical switch and related router port
func (c Client) DeleteLogicalSwitch(ls string) error {
_, err := c.ovnNbCommand(WaitSb, IfExists, "lrp-del", fmt.Sprintf("%s-%s", c.ClusterRouter, ls))
_, err := c.ovnNbCommand(IfExists, "lrp-del", fmt.Sprintf("%s-%s", c.ClusterRouter, ls))
if err != nil {
klog.Errorf("failed to del lrp %s-%s, %v", c.ClusterRouter, ls, err)
return err
}

_, err = c.ovnNbCommand(WaitSb, IfExists, "ls-del", ls)
_, err = c.ovnNbCommand(IfExists, "ls-del", ls)
if err != nil {
klog.Errorf("failed to del ls %s, %v", ls, err)
return err
Expand All @@ -231,15 +231,15 @@ func (c Client) DeleteLogicalSwitch(ls string) error {

// CreateLogicalRouter create logical router in ovn
func (c Client) CreateLogicalRouter(lr string) error {
_, err := c.ovnNbCommand(WaitSb, MayExist, "lr-add", lr)
_, err := c.ovnNbCommand(MayExist, "lr-add", lr)
return err
}

func (c Client) createRouterPort(ls, lr, ip, mac string) error {
klog.Infof("add %s to %s with ip: %s, mac :%s", ls, lr, ip, mac)
lsTolr := fmt.Sprintf("%s-%s", ls, lr)
lrTols := fmt.Sprintf("%s-%s", lr, ls)
_, err := c.ovnNbCommand(WaitSb, MayExist, "lsp-add", ls, lsTolr, "--",
_, err := c.ovnNbCommand(MayExist, "lsp-add", ls, lsTolr, "--",
"set", "logical_switch_port", lsTolr, "type=router", "--",
"set", "logical_switch_port", lsTolr, fmt.Sprintf("addresses=\"%s\"", mac), "--",
"set", "logical_switch_port", lsTolr, fmt.Sprintf("options:router-port=%s", lrTols))
Expand All @@ -248,7 +248,7 @@ func (c Client) createRouterPort(ls, lr, ip, mac string) error {
return err
}

_, err = c.ovnNbCommand(WaitSb, MayExist, "lrp-add", lr, lrTols, mac, ip)
_, err = c.ovnNbCommand(MayExist, "lrp-add", lr, lrTols, mac, ip)
if err != nil {
klog.Errorf("failed to create router port %s %v", lrTols, err)
return err
Expand All @@ -261,13 +261,13 @@ func (c Client) AddStaticRouter(policy, cidr, nextHop, router string) error {
if policy == "" {
policy = PolicyDstIP
}
_, err := c.ovnNbCommand(WaitSb, MayExist, fmt.Sprintf("%s=%s", Policy, policy), "lr-route-add", router, cidr, nextHop)
_, err := c.ovnNbCommand(MayExist, fmt.Sprintf("%s=%s", Policy, policy), "lr-route-add", router, cidr, nextHop)
return err
}

// DeleteStaticRouter delete a static route rule in ovn
func (c Client) DeleteStaticRouter(cidr, router string) error {
_, err := c.ovnNbCommand(WaitSb, IfExists, "lr-route-del", router, cidr)
_, err := c.ovnNbCommand(IfExists, "lr-route-del", router, cidr)
return err
}

Expand All @@ -287,18 +287,18 @@ func (c Client) CreateLoadBalancer(lb, protocol string) error {

// CreateLoadBalancerRule create loadbalancer rul in ovn
func (c Client) CreateLoadBalancerRule(lb, vip, ips string) error {
_, err := c.ovnNbCommand(WaitSb, MayExist, "lb-add", lb, vip, ips)
_, err := c.ovnNbCommand(MayExist, "lb-add", lb, vip, ips)
return err
}

func (c Client) addLoadBalancerToLogicalSwitch(lb, ls string) error {
_, err := c.ovnNbCommand(WaitSb, MayExist, "ls-lb-add", ls, lb)
_, err := c.ovnNbCommand(MayExist, "ls-lb-add", ls, lb)
return err
}

// DeleteLoadBalancerVip delete a vip rule from loadbalancer
func (c Client) DeleteLoadBalancerVip(vip, lb string) error {
_, err := c.ovnNbCommand(WaitSb, IfExists, "lb-del", lb, vip)
_, err := c.ovnNbCommand(IfExists, "lb-del", lb, vip)
return err
}

Expand Down Expand Up @@ -391,7 +391,8 @@ func (c Client) GetLogicalSwitchPortAddress(port string) ([]string, error) {
}

func (c Client) GetLogicalSwitchPortDynamicAddress(port string) ([]string, error) {
output, err := c.ovnNbCommand("get", "logical_switch_port", port, "dynamic-addresses")
output, err := c.ovnNbCommand("wait-until", "logical_switch_port", port, "dynamic_addresses!=[]", "--",
"get", "logical_switch_port", port, "dynamic-addresses")
if err != nil {
klog.Errorf("get port %s dynamic_addresses failed %v", port, err)
return nil, err
Expand Down
1 change: 0 additions & 1 deletion pkg/ovs/ovn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ const (
OvnNbCtl = "ovn-nbctl"
MayExist = "--may-exist"
IfExists = "--if-exists"
WaitSb = "--wait=sb"
Policy = "--policy"
PolicyDstIP = "dst-ip"
PolicySrcIP = "src-ip"
Expand Down
4 changes: 4 additions & 0 deletions pkg/pinger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (config *Configuration) initKubeClient() error {
}
}
cfg.Timeout = 15 * time.Second
cfg.QPS = 1000
cfg.Burst = 2000
cfg.ContentType = "application/vnd.kubernetes.protobuf"
cfg.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Errorf("init kubernetes client failed %v", err)
Expand Down
4 changes: 2 additions & 2 deletions yamls/ovn.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ spec:
fieldPath: status.podIP
resources:
requests:
cpu: 200m
cpu: 500m
memory: 300Mi
limits:
cpu: 400m
cpu: 1
memory: 800Mi
volumeMounts:
- mountPath: /run/openvswitch
Expand Down

0 comments on commit 0859da1

Please sign in to comment.