From aa65a7974a6e6b10d68d75c8da32952a8880c02e Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Wed, 25 May 2016 14:39:03 +0200 Subject: [PATCH] Spread creating routes over time and retry on failures --- pkg/controller/route/routecontroller.go | 39 ++++++++++++++++++++----- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/pkg/controller/route/routecontroller.go b/pkg/controller/route/routecontroller.go index 1aeaed2d742d6..68a2ac8fe5a57 100644 --- a/pkg/controller/route/routecontroller.go +++ b/pkg/controller/route/routecontroller.go @@ -30,6 +30,14 @@ import ( "k8s.io/kubernetes/pkg/util/wait" ) +const ( + // Maximal number of concurrent CreateRoute API calls. + // TODO: This should be per-provider. + maxConcurrentRouteCreations int = 200 + // Maximum number of retries of route creations. + maxRetries int = 5 +) + type RouteController struct { routes cloudprovider.Routes kubeClient clientset.Interface @@ -50,6 +58,11 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterNam } func (rc *RouteController) Run(syncPeriod time.Duration) { + // TODO: If we do just the full Resync every 5 minutes (default value) + // that means that we may wait up to 5 minutes before even starting + // creating a route for it. This is bad. + // We should have a watch on node and if we observe a new node (with CIDR?) + // trigger reconciliation for that node. go wait.NonSlidingUntil(func() { if err := rc.reconcileNodeRoutes(); err != nil { glog.Errorf("Couldn't reconcile node routes: %v", err) @@ -79,7 +92,10 @@ func (rc *RouteController) reconcile(nodes []api.Node, routes []*cloudprovider.R for _, route := range routes { routeMap[route.TargetInstance] = route } + wg := sync.WaitGroup{} + rateLimiter := make(chan struct{}, maxConcurrentRouteCreations) + for _, node := range nodes { // Skip if the node hasn't been assigned a CIDR yet. if node.Spec.PodCIDR == "" { @@ -96,14 +112,23 @@ func (rc *RouteController) reconcile(nodes []api.Node, routes []*cloudprovider.R nameHint := string(node.UID) wg.Add(1) glog.Infof("Creating route for node %s %s with hint %s", node.Name, route.DestinationCIDR, nameHint) - go func(nodeName string, nameHint string, route *cloudprovider.Route, startTime time.Time) { - if err := rc.routes.CreateRoute(rc.clusterName, nameHint, route); err != nil { - glog.Errorf("Could not create route %s %s for node %s after %v: %v", nameHint, route.DestinationCIDR, nodeName, time.Now().Sub(startTime), err) - } else { - glog.Infof("Created route for node %s %s with hint %s after %v", nodeName, route.DestinationCIDR, nameHint, time.Now().Sub(startTime)) + go func(nodeName string, nameHint string, route *cloudprovider.Route) { + defer wg.Done() + for i := 0; i < maxRetries; i++ { + startTime := time.Now() + // Ensure that we don't have more than maxConcurrentRouteCreations + // CreateRoute calls in flight. + rateLimiter <- struct{}{} + err := rc.routes.CreateRoute(rc.clusterName, nameHint, route) + <-rateLimiter + if err != nil { + glog.Errorf("Could not create route %s %s for node %s after %v: %v", nameHint, route.DestinationCIDR, nodeName, time.Now().Sub(startTime), err) + } else { + glog.Infof("Created route for node %s %s with hint %s after %v", nodeName, route.DestinationCIDR, nameHint, time.Now().Sub(startTime)) + return + } } - wg.Done() - }(node.Name, nameHint, route, time.Now()) + }(node.Name, nameHint, route) } nodeCIDRs[node.Name] = node.Spec.PodCIDR }