Skip to content

Commit

Permalink
Route creation reconciler loop.
Browse files Browse the repository at this point in the history
  • Loading branch information
cjcullen committed May 13, 2015
1 parent 106ecf9 commit 366beaa
Show file tree
Hide file tree
Showing 14 changed files with 499 additions and 95 deletions.
3 changes: 2 additions & 1 deletion cluster/gce/util.sh
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ function kube-down {
--project "${PROJECT}" \
--quiet \
"${NODE_INSTANCE_PREFIX}-group")
echo "$deleteCmdOutput"
if [[ "$deleteCmdOutput" != "" ]]; then
# Managed instance group deletion is done asyncronously, we must wait for it to complete, or subsequent steps fail
deleteCmdOperationId=$(echo $deleteCmdOutput | grep "Operation:" | sed "s/.*Operation:\s//" | sed "s/\s.*//" | sed "s/ //g")
Expand Down Expand Up @@ -719,7 +720,7 @@ function kube-down {
# Delete routes.
local -a routes
routes=( $(gcloud compute routes list --project "${PROJECT}" \
--regexp "${NODE_INSTANCE_PREFIX}-.+" | awk 'NR >= 2 { print $1 }') )
--regexp "${INSTANCE_PREFIX}-.{8}-.{4}-.{4}-.{4}-.{12}" | awk 'NR >= 2 { print $1 }') )
routes+=("${MASTER_NAME}")
while (( "${#routes[@]}" > 0 )); do
echo Deleting routes "${routes[*]::10}"
Expand Down
10 changes: 10 additions & 0 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
clientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/nodecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/routecontroller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/servicecontroller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
Expand Down Expand Up @@ -241,6 +242,15 @@ func (s *CMServer) Run(_ []string) error {
glog.Errorf("Failed to start service controller: %v", err)
}

if s.AllocateNodeCIDRs {
routes, ok := cloud.Routes()
if !ok {
glog.Fatal("Cloud provider must support routes if allocate-node-cidrs is set")
}
routeController := routecontroller.New(routes, kubeClient, s.ClusterName, (*net.IPNet)(&s.ClusterCIDR))
routeController.Run(s.NodeSyncPeriod)
}

resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)

Expand Down
13 changes: 5 additions & 8 deletions pkg/cloudprovider/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func (aws *AWSCloud) Zones() (cloudprovider.Zones, bool) {
return aws, true
}

// Routes returns an implementation of Routes for Amazon Web Services.
func (aws *AWSCloud) Routes() (cloudprovider.Routes, bool) {
return nil, false
}

// NodeAddresses is an implementation of Instances.NodeAddresses.
func (aws *AWSCloud) NodeAddresses(name string) ([]api.NodeAddress, error) {
instance, err := aws.getInstancesByDnsName(name)
Expand Down Expand Up @@ -973,11 +978,3 @@ func (aws *AWSCloud) DeleteVolume(volumeName string) error {
}
return awsDisk.delete()
}

func (v *AWSCloud) Configure(name string, spec *api.NodeSpec) error {
return nil
}

func (v *AWSCloud) Release(name string) error {
return nil
}
23 changes: 19 additions & 4 deletions pkg/cloudprovider/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Interface interface {
Zones() (Zones, bool)
// Clusters returns a clusters interface. Also returns true if the interface is supported, false otherwise.
Clusters() (Clusters, bool)
// Routes returns a routes interface along with whether the interface is supported.
Routes() (Routes, bool)
}

// Clusters is an abstract, pluggable interface for clusters of containers.
Expand Down Expand Up @@ -80,10 +82,23 @@ type Instances interface {
List(filter string) ([]string, error)
// GetNodeResources gets the resources for a particular node
GetNodeResources(name string) (*api.NodeResources, error)
// Configure the specified instance using the spec
Configure(name string, spec *api.NodeSpec) error
// Delete all the configuration related to the instance, including other cloud resources
Release(name string) error
}

// Route is a representation of an advanced routing rule.
type Route struct {
Name string
TargetInstance string
DestinationCIDR string
Description string
}

// Routes is an abstract, pluggable interface for advanced routing rules.
type Routes interface {
ListRoutes(filter string) ([]*Route, error)
// Create the described route
CreateRoute(route *Route) error
// Delete the specified route
DeleteRoute(name string) error
}

// Zone represents the location of a particular machine.
Expand Down
49 changes: 42 additions & 7 deletions pkg/cloudprovider/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package fake_cloud

import (
"fmt"
"net"
"regexp"
"sync"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
Expand All @@ -39,7 +41,7 @@ type FakeUpdateBalancerCall struct {
Hosts []string
}

// FakeCloud is a test-double implementation of Interface, TCPLoadBalancer and Instances. It is useful for testing.
// FakeCloud is a test-double implementation of Interface, TCPLoadBalancer, Instances, and Routes. It is useful for testing.
type FakeCloud struct {
Exists bool
Err error
Expand All @@ -53,6 +55,8 @@ type FakeCloud struct {
ExternalIP net.IP
Balancers []FakeBalancer
UpdateCalls []FakeUpdateBalancerCall
RouteMap map[string]*cloudprovider.Route
Lock sync.Mutex
cloudprovider.Zone
}

Expand Down Expand Up @@ -94,6 +98,10 @@ func (f *FakeCloud) Zones() (cloudprovider.Zones, bool) {
return f, true
}

func (f *FakeCloud) Routes() (cloudprovider.Routes, bool) {
return f, true
}

// GetTCPLoadBalancer is a stub implementation of TCPLoadBalancer.GetTCPLoadBalancer.
func (f *FakeCloud) GetTCPLoadBalancer(name, region string) (endpoint string, exists bool, err error) {
return f.ExternalIP.String(), f.Exists, f.Err
Expand Down Expand Up @@ -160,12 +168,39 @@ func (f *FakeCloud) GetNodeResources(name string) (*api.NodeResources, error) {
return f.NodeResources, f.Err
}

func (f *FakeCloud) Configure(name string, spec *api.NodeSpec) error {
f.addCall("configure")
return f.Err
func (f *FakeCloud) ListRoutes(filter string) ([]*cloudprovider.Route, error) {
f.addCall("list-routes")
f.Lock.Lock()
defer f.Lock.Unlock()
var routes []*cloudprovider.Route
for _, route := range f.RouteMap {
if match, _ := regexp.MatchString(filter, route.Name); match {
routes = append(routes, route)
}
}
return routes, f.Err
}

func (f *FakeCloud) Release(name string) error {
f.addCall("release")
return f.Err
func (f *FakeCloud) CreateRoute(route *cloudprovider.Route) error {
f.addCall("create-route")
f.Lock.Lock()
defer f.Lock.Unlock()
if _, exists := f.RouteMap[route.Name]; exists {
f.Err = fmt.Errorf("route with name %q already exists")
return f.Err
}
f.RouteMap[route.Name] = route
return nil
}

func (f *FakeCloud) DeleteRoute(name string) error {
f.addCall("delete-route")
f.Lock.Lock()
defer f.Lock.Unlock()
if _, exists := f.RouteMap[name]; !exists {
f.Err = fmt.Errorf("no route found with name %q", name)
return f.Err
}
delete(f.RouteMap, name)
return nil
}
44 changes: 37 additions & 7 deletions pkg/cloudprovider/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ func (gce *GCECloud) Zones() (cloudprovider.Zones, bool) {
return gce, true
}

// Routes returns an implementation of Routes for Google Compute Engine.
func (gce *GCECloud) Routes() (cloudprovider.Routes, bool) {
return gce, true
}

func makeHostLink(projectID, zone, host string) string {
host = canonicalizeInstanceName(host)
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/zones/%s/instances/%s",
Expand Down Expand Up @@ -560,14 +565,36 @@ func getMetadataValue(metadata *compute.Metadata, key string) (string, bool) {
return "", false
}

func (gce *GCECloud) Configure(name string, spec *api.NodeSpec) error {
instanceName := canonicalizeInstanceName(name)
func (gce *GCECloud) ListRoutes(filter string) ([]*cloudprovider.Route, error) {
listCall := gce.service.Routes.List(gce.projectID)
if len(filter) > 0 {
listCall = listCall.Filter("name eq " + filter)
}
res, err := listCall.Do()
if err != nil {
return nil, err
}
var routes []*cloudprovider.Route
for _, r := range res.Items {
if path.Base(r.Network) != gce.networkName {
continue
}
target := path.Base(r.NextHopInstance)
routes = append(routes, &cloudprovider.Route{r.Name, target, r.DestRange, r.Description})
}
return routes, nil
}

func (gce *GCECloud) CreateRoute(route *cloudprovider.Route) error {
instanceName := canonicalizeInstanceName(route.TargetInstance)

// TODO (cjcullen): Remove the metadata-setting once kubelet can configure cbr0.
instance, err := gce.service.Instances.Get(gce.projectID, gce.zone, instanceName).Do()
if err != nil {
return err
}
if currentValue, ok := getMetadataValue(instance.Metadata, podCIDRMetadataKey); ok {
if currentValue == spec.PodCIDR {
if currentValue == route.DestinationCIDR {
// IP range already set to proper value.
return nil
}
Expand All @@ -577,7 +604,7 @@ func (gce *GCECloud) Configure(name string, spec *api.NodeSpec) error {
instance.Metadata.Items = append(instance.Metadata.Items,
&compute.MetadataItems{
Key: podCIDRMetadataKey,
Value: spec.PodCIDR,
Value: route.DestinationCIDR,
})
setMetadataCall := gce.service.Instances.SetMetadata(gce.projectID, gce.zone, instanceName, instance.Metadata)
setMetadataOp, err := setMetadataCall.Do()
Expand All @@ -588,12 +615,15 @@ func (gce *GCECloud) Configure(name string, spec *api.NodeSpec) error {
if err != nil {
return err
}
// End of chunk that can come out once we no longer use metadata for CIDRs

insertCall := gce.service.Routes.Insert(gce.projectID, &compute.Route{
Name: instanceName,
DestRange: spec.PodCIDR,
Name: route.Name,
DestRange: route.DestinationCIDR,
NextHopInstance: fmt.Sprintf("zones/%s/instances/%s", gce.zone, instanceName),
Network: fmt.Sprintf("global/networks/%s", gce.networkName),
Priority: 1000,
Description: route.Description,
})
insertOp, err := insertCall.Do()
if err != nil {
Expand All @@ -602,7 +632,7 @@ func (gce *GCECloud) Configure(name string, spec *api.NodeSpec) error {
return gce.waitForGlobalOp(insertOp)
}

func (gce *GCECloud) Release(name string) error {
func (gce *GCECloud) DeleteRoute(name string) error {
instanceName := canonicalizeInstanceName(name)
deleteCall := gce.service.Routes.Delete(gce.projectID, instanceName)
deleteOp, err := deleteCall.Do()
Expand Down
37 changes: 1 addition & 36 deletions pkg/cloudprovider/nodecontroller/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,32 +177,6 @@ func (nc *NodeController) reconcilePodCIDRs(newNodes, registeredNodes *api.NodeL
return newNodes
}

func (nc *NodeController) configureNodeCIDR(node *api.Node) {
instances, ok := nc.cloud.Instances()
if !ok {
glog.Errorf("Error configuring node %s: CloudProvider does not support Instances()", node.Name)
return
}
err := instances.Configure(node.Name, &node.Spec)
if err != nil {
glog.Errorf("Error configuring node %s: %s", node.Name, err)
// The newly assigned CIDR was not properly configured, so don't save it in the API server.
node.Spec.PodCIDR = ""
}
}

func (nc *NodeController) unassignNodeCIDR(nodeName string) {
instances, ok := nc.cloud.Instances()
if !ok {
glog.Errorf("Error deconfiguring node %s: CloudProvider does not support Instances()", nodeName)
return
}
err := instances.Release(nodeName)
if err != nil {
glog.Errorf("Error deconfiguring node %s: %s", nodeName, err)
}
}

// Run creates initial node list and start syncing instances from cloudprovider, if any.
// It also starts syncing or monitoring cluster node status.
// 1. registerNodes() is called only once to register all initial nodes (from cloudprovider
Expand Down Expand Up @@ -274,9 +248,6 @@ func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, ret
go func(n *api.Node) {
defer wg.Done()
for i := 0; i < retryCount; i++ {
if nc.isRunningCloudProvider() && nc.allocateNodeCIDRs {
nc.configureNodeCIDR(n)
}
_, err := nc.kubeClient.Nodes().Create(n)
if err == nil || apierrors.IsAlreadyExists(err) {
glog.Infof("Registered node in registry: %v", n.Name)
Expand All @@ -292,6 +263,7 @@ func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, ret
}
}
wg.Wait()

if int32(toRegister.Len()) != atomic.LoadInt32(&successfullyRegistered) {
return ErrRegistration
} else {
Expand Down Expand Up @@ -340,9 +312,6 @@ func (nc *NodeController) syncCloudNodes() error {
return
}
node.Status.Addresses = nodeList.Items[0].Status.Addresses
if nc.allocateNodeCIDRs {
nc.configureNodeCIDR(node)
}
glog.Infof("Create node in registry: %s", node.Name)
_, err = nc.kubeClient.Nodes().Create(node)
if err != nil {
Expand All @@ -361,9 +330,6 @@ func (nc *NodeController) syncCloudNodes() error {
for nodeID := range nodeMap {
go func(nodeID string) {
defer wg.Done()
if nc.allocateNodeCIDRs {
nc.unassignNodeCIDR(nodeID)
}
glog.Infof("Delete node from registry: %s", nodeID)
err = nc.kubeClient.Nodes().Delete(nodeID)
if err != nil {
Expand All @@ -373,7 +339,6 @@ func (nc *NodeController) syncCloudNodes() error {
}(nodeID)
}
wg.Wait()

return nil
}

Expand Down
12 changes: 4 additions & 8 deletions pkg/cloudprovider/openstack/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,14 +389,6 @@ func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) {
return rsrc, nil
}

func (i *Instances) Configure(name string, spec *api.NodeSpec) error {
return nil
}

func (i *Instances) Release(name string) error {
return nil
}

func (os *OpenStack) Clusters() (cloudprovider.Clusters, bool) {
return nil, false
}
Expand Down Expand Up @@ -669,3 +661,7 @@ func (os *OpenStack) GetZone() (cloudprovider.Zone, error) {

return cloudprovider.Zone{Region: os.region}, nil
}

func (os *OpenStack) Routes() (cloudprovider.Routes, bool) {
return nil, false
}

0 comments on commit 366beaa

Please sign in to comment.