Skip to content

Commit

Permalink
fix: 5ms is too short for eip and nats creation (#1781)
Browse files Browse the repository at this point in the history
* fix: 20ms is too short for eip and nats creation

* fix: 20ms is too short for eip and nats creation
  • Loading branch information
bobz965 committed Aug 4, 2022
1 parent 80425b7 commit 9efd4bb
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 41 deletions.
5 changes: 4 additions & 1 deletion pkg/controller/config.go
Expand Up @@ -25,6 +25,7 @@ type Configuration struct {
OvnSbAddr string
OvnTimeout int
CustCrdRetryMaxDelay int
CustCrdRetryMinDelay int
KubeConfigFile string
KubeRestConfig *rest.Config

Expand Down Expand Up @@ -96,7 +97,8 @@ func ParseFlags() (*Configuration, error) {
argOvnNbAddr = pflag.String("ovn-nb-addr", "", "ovn-nb address")
argOvnSbAddr = pflag.String("ovn-sb-addr", "", "ovn-sb address")
argOvnTimeout = pflag.Int("ovn-timeout", 60, "")
argCustCrdRetryMaxDelay = pflag.Int("cust-crd-retry-max-delay", 20, "The max delay between custom crd two retries")
argCustCrdRetryMinDelay = pflag.Int("cust-crd-retry-min-delay", 2, "The min delay seconds between custom crd two retries")
argCustCrdRetryMaxDelay = pflag.Int("cust-crd-retry-max-delay", 20, "The max delay seconds between custom crd two retries")
argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.")

argDefaultLogicalSwitch = pflag.String("default-ls", util.DefaultSubnet, "The default logical switch name")
Expand Down Expand Up @@ -169,6 +171,7 @@ func ParseFlags() (*Configuration, error) {
OvnNbAddr: *argOvnNbAddr,
OvnSbAddr: *argOvnSbAddr,
OvnTimeout: *argOvnTimeout,
CustCrdRetryMinDelay: *argCustCrdRetryMinDelay,
CustCrdRetryMaxDelay: *argCustCrdRetryMaxDelay,
KubeConfigFile: *argKubeConfigFile,
DefaultLogicalSwitch: *argDefaultLogicalSwitch,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Expand Up @@ -185,7 +185,7 @@ func NewController(config *Configuration) *Controller {
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeFactoryClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
custCrdRateLimiter := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
workqueue.NewItemExponentialFailureRateLimiter(time.Duration(config.CustCrdRetryMinDelay)*time.Second, time.Duration(config.CustCrdRetryMaxDelay)*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)

Expand Down
45 changes: 17 additions & 28 deletions pkg/controller/vpc_nat_gateway.go
Expand Up @@ -25,10 +25,10 @@ import (
)

var (
vpcNatImage = ""
vpcNatEnabled = "unknown"
VpcNatCmVersion = ""
createAt = ""
vpcNatImage = ""
vpcNatEnabled = "unknown"
VpcNatCmVersion = ""
NAT_GW_CREATED_AT = ""
)

const (
Expand Down Expand Up @@ -267,7 +267,7 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error {
if needToCreate {
_, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).
Create(context.Background(), newSts, metav1.CreateOptions{})

// if pod create successfully, will add initVpcNatGatewayQueue, then syncVpcNatGwRules
if err != nil {
klog.Errorf("failed to create statefulset '%s', err: %v", newSts.Name, err)
return err
Expand All @@ -282,17 +282,6 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error {
return err
}
}

pod, err := c.getNatGwPod(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
if _, ok := pod.Annotations[util.VpcNatGatewayInitAnnotation]; ok {
return c.syncVpcNatGwRules(key)
}
return nil
}

Expand Down Expand Up @@ -358,8 +347,8 @@ func (c *Controller) handleInitVpcNatGw(key string) error {
if _, hasInit := pod.Annotations[util.VpcNatGatewayInitAnnotation]; hasInit {
return nil
}
createAt = pod.CreationTimestamp.Format("2006-01-02T15:04:05")
klog.V(3).Infof("nat gw pod '%s' inited at %s", key, createAt)
NAT_GW_CREATED_AT = pod.CreationTimestamp.Format("2006-01-02T15:04:05")
klog.V(3).Infof("nat gw pod '%s' inited at %s", key, NAT_GW_CREATED_AT)
if err = c.execNatGwRules(pod, natGwInit, []string{v4Cidr}); err != nil {
klog.Errorf("failed to init vpc nat gateway, %v", err)
return err
Expand Down Expand Up @@ -401,9 +390,9 @@ func (c *Controller) handleUpdateVpcFloatingIp(natGwKey string) error {
}

for _, fip := range fips.Items {
if fip.Status.Redo != createAt {
if fip.Status.Redo != NAT_GW_CREATED_AT {
klog.V(3).Infof("redo fip %s", fip.Name)
if err = c.redoFip(fip.Name, createAt, false); err != nil {
if err = c.redoFip(fip.Name, NAT_GW_CREATED_AT, false); err != nil {
klog.Errorf("failed to update eip '%s' to make sure applied, %v", fip.Spec.EIP, err)
return err
}
Expand Down Expand Up @@ -433,9 +422,9 @@ func (c *Controller) handleUpdateVpcEip(natGwKey string) error {
return err
}
for _, eip := range eips.Items {
if eip.Spec.NatGwDp == natGwKey && eip.Status.Redo != createAt {
if eip.Spec.NatGwDp == natGwKey && eip.Status.Redo != NAT_GW_CREATED_AT {
klog.V(3).Infof("redo eip %s", eip.Name)
if err = c.patchEipStatus(eip.Name, "", createAt, "", false); err != nil {
if err = c.patchEipStatus(eip.Name, "", NAT_GW_CREATED_AT, "", false); err != nil {
klog.Errorf("failed to update eip '%s' to make sure applied, %v", eip.Name, err)
return err
}
Expand Down Expand Up @@ -465,9 +454,9 @@ func (c *Controller) handleUpdateVpcSnat(natGwKey string) error {
return err
}
for _, snat := range snats.Items {
if snat.Status.Redo != createAt {
if snat.Status.Redo != NAT_GW_CREATED_AT {
klog.V(3).Infof("redo snat %s", snat.Name)
if err = c.redoSnat(snat.Name, createAt, false); err != nil {
if err = c.redoSnat(snat.Name, NAT_GW_CREATED_AT, false); err != nil {
klog.Errorf("failed to update eip '%s' to make sure applied, %v", snat.Spec.EIP, err)
return err
}
Expand Down Expand Up @@ -498,9 +487,9 @@ func (c *Controller) handleUpdateVpcDnat(natGwKey string) error {
return err
}
for _, dnat := range dnats.Items {
if dnat.Status.Redo != createAt {
if dnat.Status.Redo != NAT_GW_CREATED_AT {
klog.V(3).Infof("redo dnat %s", dnat.Name)
if err = c.redoDnat(dnat.Name, createAt, false); err != nil {
if err = c.redoDnat(dnat.Name, NAT_GW_CREATED_AT, false); err != nil {
klog.Errorf("failed to update dnat '%s' to make sure applied, %v", dnat.Name, err)
return err
}
Expand Down Expand Up @@ -768,7 +757,7 @@ func (c *Controller) checkVpcExternalNet() (err error) {
}

func (c *Controller) initCreateAt(key string) (err error) {
if createAt != "" {
if NAT_GW_CREATED_AT != "" {
return nil
}
pod, err := c.getNatGwPod(key)
Expand All @@ -778,6 +767,6 @@ func (c *Controller) initCreateAt(key string) (err error) {
}
return err
}
createAt = pod.CreationTimestamp.Format("2006-01-02T15:04:05")
NAT_GW_CREATED_AT = pod.CreationTimestamp.Format("2006-01-02T15:04:05")
return nil
}
10 changes: 7 additions & 3 deletions pkg/controller/vpc_nat_gw_eip.go
Expand Up @@ -227,7 +227,7 @@ func (c *Controller) handleAddIptablesEip(key string) error {
}
return err
}
if cachedEip.Spec.MacAddress != "" {
if cachedEip.Status.Ready && cachedEip.Status.IP != "" {
// already ok
return nil
}
Expand Down Expand Up @@ -261,7 +261,10 @@ func (c *Controller) handleAddIptablesEip(key string) error {
klog.Errorf("failed to update eip %s, %v", key, err)
return err
}
if err = c.patchEipStatus(key, v4ip, "", "", true); err != nil {
if err := c.initCreateAt(eip.Spec.NatGwDp); err != nil {
klog.Errorf("failed to init nat gw pod '%s' create at, %v", eip.Spec.NatGwDp, err)
}
if err = c.patchEipStatus(key, v4ip, NAT_GW_CREATED_AT, "", true); err != nil {
klog.Errorf("failed to patch status for eip %s, %v", key, err)
return err
}
Expand Down Expand Up @@ -484,7 +487,8 @@ func (c *Controller) handleUpdateIptablesEip(key string) error {
}

// redo
if eip.Status.Redo != "" &&
if !eip.Status.Ready &&
eip.Status.Redo != "" &&
eip.Status.IP != "" &&
eip.DeletionTimestamp.IsZero() {
eipV4Cidr, err := c.getEipV4Cidr(eip.Status.IP)
Expand Down
32 changes: 24 additions & 8 deletions pkg/controller/vpc_nat_gw_nat.go
Expand Up @@ -520,6 +520,10 @@ func (c *Controller) handleAddIptablesFip(key string) error {
}
return err
}
if cachedFip.Status.Ready && cachedFip.Status.V4ip != "" {
// already ok
return nil
}
fip := cachedFip.DeepCopy()
klog.V(3).Infof("handle add fip %s", key)
// get eip
Expand Down Expand Up @@ -562,7 +566,10 @@ func (c *Controller) handleAddIptablesFip(key string) error {
klog.Errorf("failed to patch status for eip %s, %v", key, err)
return err
}
if err = c.patchFipStatus(key, eip.Spec.V4ip, eip.Spec.V6ip, eip.Spec.NatGwDp, "", true); err != nil {
if err := c.initCreateAt(eip.Spec.NatGwDp); err != nil {
klog.Errorf("failed to init nat gw pod '%s' create at, %v", eip.Spec.NatGwDp, err)
}
if err = c.patchFipStatus(key, eip.Spec.V4ip, eip.Spec.V6ip, eip.Spec.NatGwDp, NAT_GW_CREATED_AT, true); err != nil {
klog.Errorf("failed to patch status for fip %s, %v", key, err)
return err
}
Expand Down Expand Up @@ -652,7 +659,8 @@ func (c *Controller) handleUpdateIptablesFip(key string) error {
return nil
}
// redo
if fip.Status.Redo != "" &&
if !fip.Status.Ready &&
fip.Status.Redo != "" &&
fip.Status.V4ip != "" &&
fip.DeletionTimestamp.IsZero() {
klog.V(3).Infof("reapply fip '%s' in pod ", key)
Expand Down Expand Up @@ -694,7 +702,7 @@ func (c *Controller) handleAddIptablesDnatRule(key string) error {
}
return err
}
if cachedDnat.Status.V4ip != "" {
if cachedDnat.Status.Ready && cachedDnat.Status.V4ip != "" {
// already ok
return nil
}
Expand Down Expand Up @@ -738,7 +746,10 @@ func (c *Controller) handleAddIptablesDnatRule(key string) error {
klog.Errorf("failed to patch status for eip %s, %v", key, err)
return err
}
if err = c.patchDnatStatus(key, eip.Spec.V4ip, eip.Spec.V6ip, eip.Spec.NatGwDp, "", true); err != nil {
if err := c.initCreateAt(eip.Spec.NatGwDp); err != nil {
klog.Errorf("failed to init nat gw pod '%s' create at, %v", eip.Spec.NatGwDp, err)
}
if err = c.patchDnatStatus(key, eip.Spec.V4ip, eip.Spec.V6ip, eip.Spec.NatGwDp, NAT_GW_CREATED_AT, true); err != nil {
klog.Errorf("failed to patch status for dnat %s, %v", key, err)
return err
}
Expand Down Expand Up @@ -831,7 +842,8 @@ func (c *Controller) handleUpdateIptablesDnatRule(key string) error {
return nil
}
// redo
if dnat.Status.Redo != "" &&
if !dnat.Status.Ready &&
dnat.Status.Redo != "" &&
dnat.Status.V4ip != "" &&
dnat.DeletionTimestamp.IsZero() {
klog.V(3).Infof("reapply dnat in pod for %s", key)
Expand Down Expand Up @@ -875,7 +887,7 @@ func (c *Controller) handleAddIptablesSnatRule(key string) error {
}
return err
}
if cachedSnat.Status.V4ip != "" {
if cachedSnat.Status.Ready && cachedSnat.Status.V4ip != "" {
// already ok
return nil
}
Expand Down Expand Up @@ -919,7 +931,10 @@ func (c *Controller) handleAddIptablesSnatRule(key string) error {
klog.Errorf("failed to patch status for eip %s, %v", key, err)
return err
}
if err = c.patchSnatStatus(key, eip.Spec.V4ip, eip.Spec.V6ip, eip.Spec.NatGwDp, "", true); err != nil {
if err := c.initCreateAt(eip.Spec.NatGwDp); err != nil {
klog.Errorf("failed to init nat gw pod '%s' create at, %v", eip.Spec.NatGwDp, err)
}
if err = c.patchSnatStatus(key, eip.Spec.V4ip, eip.Spec.V6ip, eip.Spec.NatGwDp, NAT_GW_CREATED_AT, true); err != nil {
klog.Errorf("failed to update status for snat %s, %v", key, err)
return err
}
Expand Down Expand Up @@ -1008,7 +1023,8 @@ func (c *Controller) handleUpdateIptablesSnatRule(key string) error {
return nil
}
// redo
if snat.Status.Redo != "" &&
if !snat.Status.Ready &&
snat.Status.Redo != "" &&
snat.Status.V4ip != "" &&
snat.DeletionTimestamp.IsZero() {
if err = c.createSnatInPod(snat.Status.NatGwDp, snat.Status.V4ip, v4Cidr); err != nil {
Expand Down

0 comments on commit 9efd4bb

Please sign in to comment.