Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #88209: Fix route conflicted operations when updating multiple routes #88223

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions staging/src/k8s.io/legacy-cloud-providers/azure/azure.go
Expand Up @@ -215,6 +215,7 @@ type Cloud struct {
kubeClient clientset.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
routeUpdater *delayedRouteUpdater

vmCache *timedCache
lbCache *timedCache
Expand Down Expand Up @@ -495,6 +496,10 @@ func (az *Cloud) InitializeCloudFromConfig(config *Config, fromSecret bool) erro
return err
}

// start delayed route updater.
az.routeUpdater = newDelayedRouteUpdater(az, routeUpdateInterval)
go az.routeUpdater.run()

return nil
}

Expand Down
201 changes: 184 additions & 17 deletions staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes.go
Expand Up @@ -19,6 +19,9 @@ package azure
import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2018-07-01/network"
"github.com/Azure/go-autorest/autorest/to"
Expand All @@ -28,6 +31,164 @@ import (
"k8s.io/klog"
)

var (
// routeUpdateInterval defines the route reconciling interval.
routeUpdateInterval = 30 * time.Second
)

// routeOperation defines the allowed operations for route updating.
type routeOperation string

const (
// Route operations.
routeOperationAdd routeOperation = "add"
routeOperationDelete routeOperation = "delete"
)

// delayedRouteOperation defines a delayed route operation which is used in delayedRouteUpdater.
type delayedRouteOperation struct {
route network.Route
operation routeOperation
result chan error
}

// wait waits for the operation completion and returns the result.
func (op *delayedRouteOperation) wait() error {
return <-op.result
}

// delayedRouteUpdater defines a delayed route updater, which batches all the
// route updating operations within "interval" period.
// Example usage:
// op, err := updater.addRouteOperation(routeOperationAdd, route)
// err = op.wait()
type delayedRouteUpdater struct {
az *Cloud
interval time.Duration

lock sync.Mutex
routesToUpdate []*delayedRouteOperation
}

// newDelayedRouteUpdater creates a new delayedRouteUpdater.
func newDelayedRouteUpdater(az *Cloud, interval time.Duration) *delayedRouteUpdater {
return &delayedRouteUpdater{
az: az,
interval: interval,
routesToUpdate: make([]*delayedRouteOperation, 0),
}
}

// run starts the updater reconciling loop.
func (d *delayedRouteUpdater) run() {
for {
d.updateRoutes()
time.Sleep(d.interval)
}
}

// updateRoutes invokes route table client to update all routes.
func (d *delayedRouteUpdater) updateRoutes() {
d.lock.Lock()
defer d.lock.Unlock()

// No need to do any updating.
if len(d.routesToUpdate) == 0 {
return
}

var err error
defer func() {
// Notify all the goroutines.
for _, rt := range d.routesToUpdate {
rt.result <- err
}
// Clear all the jobs.
d.routesToUpdate = make([]*delayedRouteOperation, 0)
}()

var routeTable network.RouteTable
var existsRouteTable bool
routeTable, existsRouteTable, err = d.az.getRouteTable(cacheReadTypeDefault)
if err != nil {
klog.Errorf("getRouteTable() failed with error: %v", err)
return
}

// create route table if it doesn't exists yet.
if !existsRouteTable {
err = d.az.createRouteTable()
if err != nil {
klog.Errorf("createRouteTable() failed with error: %v", err)
return
}

routeTable, _, err = d.az.getRouteTable(cacheReadTypeDefault)
if err != nil {
klog.Errorf("getRouteTable() failed with error: %v", err)
return
}
}

// reconcile routes.
dirty := false
routes := []network.Route{}
if routeTable.Routes != nil {
routes = *routeTable.Routes
}
for _, rt := range d.routesToUpdate {
routeMatch := false
for i, existingRoute := range routes {
if strings.EqualFold(to.String(existingRoute.Name), to.String(rt.route.Name)) {
// delete the name-matched routes here (missing routes would be added later if the operation is add).
routes = append(routes[:i], routes[i+1:]...)
if existingRoute.RoutePropertiesFormat != nil &&
rt.route.RoutePropertiesFormat != nil &&
strings.EqualFold(to.String(existingRoute.AddressPrefix), to.String(rt.route.AddressPrefix)) &&
strings.EqualFold(to.String(existingRoute.NextHopIPAddress), to.String(rt.route.NextHopIPAddress)) {
routeMatch = true
}
if rt.operation == routeOperationDelete {
dirty = true
}
break
}
}

// Add missing routes if the operation is add.
if rt.operation == routeOperationAdd {
routes = append(routes, rt.route)
if !routeMatch {
dirty = true
}
continue
}
}

if dirty {
routeTable.Routes = &routes
err = d.az.CreateOrUpdateRouteTable(routeTable)
if err != nil {
klog.Errorf("CreateOrUpdateRouteTable() failed with error: %v", err)
return
}
}
}

// addRouteOperation adds the routeOperation to delayedRouteUpdater and returns a delayedRouteOperation.
func (d *delayedRouteUpdater) addRouteOperation(operation routeOperation, route network.Route) (*delayedRouteOperation, error) {
d.lock.Lock()
defer d.lock.Unlock()

op := &delayedRouteOperation{
route: route,
operation: operation,
result: make(chan error),
}
d.routesToUpdate = append(d.routesToUpdate, op)
return op, nil
}

// ListRoutes lists all managed routes that belong to the specified clusterName
func (az *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) {
klog.V(10).Infof("ListRoutes: START clusterName=%q", clusterName)
Expand Down Expand Up @@ -86,16 +247,6 @@ func processRoutes(routeTable network.RouteTable, exists bool, err error) ([]*cl
return kubeRoutes, nil
}

func (az *Cloud) createRouteTableIfNotExists(clusterName string, kubeRoute *cloudprovider.Route) error {
if _, existsRouteTable, err := az.getRouteTable(cacheReadTypeDefault); err != nil {
klog.V(2).Infof("createRouteTableIfNotExists error: couldn't get routetable. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
return err
} else if existsRouteTable {
return nil
}
return az.createRouteTable()
}

func (az *Cloud) createRouteTable() error {
routeTable := network.RouteTable{
Name: to.StringPtr(az.RouteTableName),
Expand Down Expand Up @@ -132,10 +283,6 @@ func (az *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint s
return nil
}

klog.V(2).Infof("CreateRoute: creating route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
if err := az.createRouteTableIfNotExists(clusterName, kubeRoute); err != nil {
return err
}
targetIP, _, err := az.getIPForMachine(kubeRoute.TargetNode)
if err != nil {
return err
Expand All @@ -151,9 +298,17 @@ func (az *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint s
},
}

klog.V(3).Infof("CreateRoute: creating route: instance=%q cidr=%q", kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
err = az.CreateOrUpdateRoute(route)
klog.V(2).Infof("CreateRoute: creating route for clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
op, err := az.routeUpdater.addRouteOperation(routeOperationAdd, route)
if err != nil {
klog.Errorf("CreateRoute failed for node %q with error: %v", kubeRoute.TargetNode, err)
return err
}

// Wait for operation complete.
err = op.wait()
if err != nil {
klog.Errorf("CreateRoute failed for node %q with error: %v", kubeRoute.TargetNode, err)
return err
}

Expand Down Expand Up @@ -181,8 +336,20 @@ func (az *Cloud) DeleteRoute(ctx context.Context, clusterName string, kubeRoute
klog.V(2).Infof("DeleteRoute: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)

routeName := mapNodeNameToRouteName(kubeRoute.TargetNode)
err = az.DeleteRouteWithName(routeName)
route := network.Route{
Name: to.StringPtr(routeName),
RoutePropertiesFormat: &network.RoutePropertiesFormat{},
}
op, err := az.routeUpdater.addRouteOperation(routeOperationDelete, route)
if err != nil {
klog.Errorf("DeleteRoute failed for node %q with error: %v", kubeRoute.TargetNode, err)
return err
}

// Wait for operation complete.
err = op.wait()
if err != nil {
klog.Errorf("DeleteRoute failed for node %q with error: %v", kubeRoute.TargetNode, err)
return err
}

Expand Down