Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update master service ports and type via controller. #15791

Merged
merged 1 commit into from
Oct 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
128 changes: 94 additions & 34 deletions pkg/master/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func (c *Controller) Start() {
// If we fail to repair node ports apiserver is useless. We should restart and retry.
glog.Fatalf("Unable to perform initial service nodePort check: %v", err)
}
if err := c.UpdateKubernetesService(); err != nil {
// Service definition is reconciled during first run to correct port and type per expectations.
if err := c.UpdateKubernetesService(true); err != nil {
glog.Errorf("Unable to perform initial Kubernetes service initialization: %v", err)
}

Expand All @@ -97,14 +98,17 @@ func (c *Controller) Start() {
// RunKubernetesService periodically updates the kubernetes service
func (c *Controller) RunKubernetesService(ch chan struct{}) {
util.Until(func() {
if err := c.UpdateKubernetesService(); err != nil {
// Service definition is not reconciled after first
// run, ports and type will be corrected only during
// start.
if err := c.UpdateKubernetesService(false); err != nil {
util.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
}
}, c.EndpointInterval, ch)
}

// UpdateKubernetesService attempts to update the default Kube service.
func (c *Controller) UpdateKubernetesService() error {
func (c *Controller) UpdateKubernetesService(reconcile bool) error {
// Update service & endpoint records.
// TODO: when it becomes possible to change this stuff,
// stop polling and start watching.
Expand All @@ -114,11 +118,11 @@ func (c *Controller) UpdateKubernetesService() error {
}
if c.ServiceIP != nil {
servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts)
if err := c.CreateMasterServiceIfNeeded("kubernetes", c.ServiceIP, servicePorts, serviceType); err != nil {
if err := c.CreateOrUpdateMasterServiceIfNeeded("kubernetes", c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
return err
}
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
if err := c.SetEndpoints("kubernetes", c.PublicIP, endpointPorts); err != nil {
if err := c.ReconcileEndpoints("kubernetes", c.PublicIP, endpointPorts, reconcile); err != nil {
return err
}
}
Expand Down Expand Up @@ -179,10 +183,17 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndp

// CreateMasterServiceIfNeeded will create the specified service if it
// doesn't already exist.
func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []api.ServicePort, serviceType api.ServiceType) error {
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []api.ServicePort, serviceType api.ServiceType, reconcile bool) error {
ctx := api.NewDefaultContext()
if _, err := c.ServiceRegistry.GetService(ctx, serviceName); err == nil {
if s, err := c.ServiceRegistry.GetService(ctx, serviceName); err == nil {
// The service already exists.
if reconcile {
if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
glog.Warningf("Resetting master service %q to %#v", serviceName, svc)
_, err := c.ServiceRegistry.UpdateService(ctx, svc)
return err
}
}
return nil
}
svc := &api.Service{
Expand Down Expand Up @@ -211,20 +222,20 @@ func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP n
return err
}

// SetEndpoints sets the endpoints for the given apiserver service (ro or rw).
// SetEndpoints expects that the endpoints objects it manages will all be
// managed only by SetEndpoints; therefore, to understand this, you need only
// ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw).
// ReconcileEndpoints expects that the endpoints objects it manages will all be
// managed only by ReconcileEndpoints; therefore, to understand this, you need only
// understand the requirements and the body of this function.
//
// Requirements:
// * All apiservers MUST use the same ports for their {rw, ro} services.
// * All apiservers MUST use SetEndpoints and only SetEndpoints to manage the
// * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the
// endpoints for their {rw, ro} services.
// * All apiservers MUST know and agree on the number of apiservers expected
// to be running (c.masterCount).
// * SetEndpoints is called periodically from all apiservers.
// * ReconcileEndpoints is called periodically from all apiservers.
//
func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error {
func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
ctx := api.NewDefaultContext()
e, err := c.EndpointRegistry.GetEndpoints(ctx, serviceName)
if err != nil {
Expand All @@ -238,7 +249,7 @@ func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts [

// First, determine if the endpoint is in the format we expect (one
// subset, ports matching endpointPorts, N IP addresses).
formatCorrect, ipCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, c.MasterCount)
formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, c.MasterCount, reconcilePorts)
if !formatCorrect {
// Something is egregiously wrong, just re-make the endpoints record.
e.Subsets = []api.EndpointSubset{{
Expand All @@ -247,7 +258,11 @@ func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts [
}}
glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
return c.EndpointRegistry.UpdateEndpoints(ctx, e)
} else if !ipCorrect {
}
if ipCorrect && portsCorrect {
return nil
}
if !ipCorrect {
// We *always* add our own IP address.
e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, api.EndpointAddress{IP: ip.String()})

Expand All @@ -271,38 +286,83 @@ func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts [
}
}
}
return c.EndpointRegistry.UpdateEndpoints(ctx, e)
}
// We didn't make any changes, no need to actually call update.
return nil
if !portsCorrect {
// Reset ports.
e.Subsets[0].Ports = endpointPorts
}
glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
return c.EndpointRegistry.UpdateEndpoints(ctx, e)
}

// Determine if the endpoint is in the format SetEndpoints expect (one subset,
// Determine if the endpoint is in the format ReconcileEndpoints expect (one subset,
// correct ports, N IP addresses); and if the specified IP address is present and
// the correct number of ip addresses are found.
func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.EndpointPort, count int) (formatCorrect, ipCorrect bool) {
func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) {
if len(e.Subsets) != 1 {
return false, false
return false, false, false
}
sub := &e.Subsets[0]
if len(sub.Ports) != len(ports) {
return false, false
}
for _, port := range ports {
contains := false
for _, subPort := range sub.Ports {
if port == subPort {
contains = true
}
portsCorrect = true
if reconcilePorts {
if len(sub.Ports) != len(ports) {
portsCorrect = false
}
if !contains {
return false, false
for i, port := range ports {
if len(sub.Ports) <= i || port != sub.Ports[i] {
portsCorrect = false
break
}
}
}
for _, addr := range sub.Addresses {
if addr.IP == ip {
return true, len(sub.Addresses) == count
ipCorrect = len(sub.Addresses) == count
break
}
}
return true, ipCorrect, portsCorrect
}

// * getMasterServiceUpdateIfNeeded sets service attributes for the
// given apiserver service.
// * getMasterServiceUpdateIfNeeded expects that the service object it
// manages will be managed only by getMasterServiceUpdateIfNeeded;
// therefore, to understand this, you need only understand the
// requirements and the body of this function.
// * getMasterServiceUpdateIfNeeded ensures that the correct ports are
// are set.
//
// Requirements:
// * All apiservers MUST use getMasterServiceUpdateIfNeeded and only
// getMasterServiceUpdateIfNeeded to manage service attributes
// * updateMasterService is called periodically from all apiservers.
func getMasterServiceUpdateIfNeeded(svc *api.Service, servicePorts []api.ServicePort, serviceType api.ServiceType) (s *api.Service, updated bool) {
// Determine if the service is in the format we expect
// (servicePorts are present and service type matches)
formatCorrect := checkServiceFormat(svc, servicePorts, serviceType)
if formatCorrect {
return svc, false
}
svc.Spec.Ports = servicePorts
svc.Spec.Type = serviceType
return svc, true
}

// Determine if the service is in the correct format
// getMasterServiceUpdateIfNeeded expects (servicePorts are correct
// and service type matches).
func checkServiceFormat(s *api.Service, ports []api.ServicePort, serviceType api.ServiceType) (formatCorrect bool) {
if s.Spec.Type != serviceType {
return false
}
if len(ports) != len(s.Spec.Ports) {
return false
}
for i, port := range ports {
if port != s.Spec.Ports[i] {
return false
}
}
return true, false
return true
}