Skip to content

Commit

Permalink
pkg/k8s: fix service update bug fix
Browse files Browse the repository at this point in the history
When a service changes its ports, we should remove the services from the
datapath as well. The same goes with the endpoints, if an endpoint
disappears or changes its ports we should make sure we remove
underlaying backends.

Signed-off-by: André Martins <andre@cilium.io>
  • Loading branch information
aanm committed Nov 8, 2019
1 parent b071e1f commit bfcf829
Show file tree
Hide file tree
Showing 3 changed files with 703 additions and 31 deletions.
17 changes: 11 additions & 6 deletions pkg/k8s/service_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type ServiceEvent struct {
// Service is the service structure
Service *Service

// OldService is the service structure
OldService *Service

// Endpoints is the endpoints structured correlated with the service
Endpoints *Endpoints

Expand Down Expand Up @@ -121,7 +124,8 @@ func (s *ServiceCache) UpdateService(k8sSvc *types.Service, swg *lock.StoppableW
s.mutex.Lock()
defer s.mutex.Unlock()

if oldService, ok := s.services[svcID]; ok {
oldService, ok := s.services[svcID]
if ok {
if oldService.DeepEquals(newService) {
return svcID
}
Expand All @@ -134,11 +138,12 @@ func (s *ServiceCache) UpdateService(k8sSvc *types.Service, swg *lock.StoppableW
if serviceReady {
swg.Add()
s.Events <- ServiceEvent{
Action: UpdateService,
ID: svcID,
Service: newService,
Endpoints: endpoints,
SWG: swg,
Action: UpdateService,
ID: svcID,
Service: newService,
OldService: oldService,
Endpoints: endpoints,
SWG: swg,
}
}

Expand Down
85 changes: 60 additions & 25 deletions pkg/k8s/watchers/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,15 @@ func (k *K8sWatcher) k8sServiceHandler() {
})

scopedLog.WithFields(logrus.Fields{
"action": event.Action.String(),
"service": event.Service.String(),
"endpoints": event.Endpoints.String(),
"action": event.Action.String(),
"service": event.Service.String(),
"old-service": event.OldService.String(),
"endpoints": event.Endpoints.String(),
}).Debug("Kubernetes service definition changed")

switch event.Action {
case k8s.UpdateService:
if err := k.addK8sSVCs(event.ID, svc, event.Endpoints); err != nil {
if err := k.addK8sSVCs(event.ID, event.OldService, svc, event.Endpoints); err != nil {
scopedLog.WithError(err).Error("Unable to add/update service to implement k8s event")
}

Expand Down Expand Up @@ -541,27 +542,8 @@ func genCartesianProduct(
return svcs
}

func (k *K8sWatcher) addK8sSVCs(svcID k8s.ServiceID, svc *k8s.Service, endpoints *k8s.Endpoints) error {
// If east-west load balancing is disabled, we should not sync(add or delete)
// K8s service to a cilium service.
if option.Config.DisableK8sServices {
return nil
}

// Headless services do not need any datapath implementation
if svc.IsHeadless {
return nil
}

scopedLog := log.WithFields(logrus.Fields{
logfields.K8sSvcName: svcID.Name,
logfields.K8sNamespace: svcID.Namespace,
})

var (
svcs []loadbalancer.SVC
)

// datapathSVCs returns all services that should be set in the datapath.
func datapathSVCs(svc *k8s.Service, endpoints *k8s.Endpoints) (svcs []loadbalancer.SVC) {
uniqPorts := svc.UniquePorts()

clusterIPPorts := map[loadbalancer.FEPortName]*loadbalancer.L4Addr{}
Expand All @@ -586,6 +568,59 @@ func (k *K8sWatcher) addK8sSVCs(svcID k8s.ServiceID, svc *k8s.Service, endpoints
svcs = append(svcs, dpSVC...)
}
}
return svcs
}

// hashSVCMap returns a mapping of all frontend's hash to the its corresponded
// value.
func hashSVCMap(svcs []loadbalancer.SVC) map[string]loadbalancer.L3n4Addr {
m := map[string]loadbalancer.L3n4Addr{}
for _, svc := range svcs {
m[svc.Frontend.L3n4Addr.Hash()] = svc.Frontend.L3n4Addr
}
return m
}

func (k *K8sWatcher) addK8sSVCs(svcID k8s.ServiceID, oldSvc, svc *k8s.Service, endpoints *k8s.Endpoints) error {
// If east-west load balancing is disabled, we should not sync(add or delete)
// K8s service to a cilium service.
if option.Config.DisableK8sServices {
return nil
}

// Headless services do not need any datapath implementation
if svc.IsHeadless {
return nil
}

scopedLog := log.WithFields(logrus.Fields{
logfields.K8sSvcName: svcID.Name,
logfields.K8sNamespace: svcID.Namespace,
})

svcs := datapathSVCs(svc, endpoints)
svcMap := hashSVCMap(svcs)

if oldSvc != nil {
// If we have oldService then we need to detect which frontends
// are no longer in the updated service and delete them in the datapath.

oldSVCs := datapathSVCs(oldSvc, endpoints)
oldSVCMap := hashSVCMap(oldSVCs)

for svcHash, oldSvc := range oldSVCMap {
if _, ok := svcMap[svcHash]; !ok {
if found, err := k.svcManager.DeleteService(oldSvc); err != nil {
scopedLog.WithError(err).WithField(logfields.Object, logfields.Repr(oldSvc)).
Warn("Error deleting service by frontend")
} else if !found {
scopedLog.WithField(logfields.Object, logfields.Repr(oldSvc)).Warn("service not found")
} else {
scopedLog.Debugf("# cilium lb delete-service %s %d 0", oldSvc.IP, oldSvc.Port)
}
}
}
}

for _, dpSvc := range svcs {
if _, _, err := k.svcManager.UpsertService(dpSvc.Frontend, dpSvc.Backends, dpSvc.Type, svcID.Name, svcID.Namespace); err != nil {
Expand Down
Loading

0 comments on commit bfcf829

Please sign in to comment.