Skip to content

Commit

Permalink
Cleanup TCPPortMapping when updating/deleting a shadow service
Browse files Browse the repository at this point in the history
  • Loading branch information
jspdown committed Mar 4, 2020
1 parent ffb9836 commit b27daf7
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 170 deletions.
5 changes: 3 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ import (
type TCPPortMapper interface {
Find(svc k8s.ServiceWithPort) (int32, bool)
Add(svc *k8s.ServiceWithPort) (int32, error)
Remove(svc k8s.ServiceWithPort) (int32, error)
}

// ServiceManager is capable of managing kubernetes services.
type ServiceManager interface {
Create(userSvc *corev1.Service) error
Update(userSvc *corev1.Service) (*corev1.Service, error)
Delete(serviceName, serviceNamespace string) error
Update(oldUserSvc, newUserSvc *corev1.Service) (*corev1.Service, error)
Delete(userSvc *corev1.Service) error
}

// Controller hold controller configuration.
Expand Down
12 changes: 9 additions & 3 deletions pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,21 @@ func (h *Handler) OnAdd(obj interface{}) {
}

// OnUpdate executed when an object is updated.
func (h *Handler) OnUpdate(_, newObj interface{}) {
func (h *Handler) OnUpdate(oldObj, newObj interface{}) {
// Assert the type to an object to pull out relevant data.
switch obj := newObj.(type) {
case *corev1.Service:
if h.ignored.IsIgnored(obj.ObjectMeta) {
return
}

if _, err := h.serviceManager.Update(obj); err != nil {
oldSvc, ok := oldObj.(*corev1.Service)
if !ok {
log.Errorf("Old object is not a kubernetes Service")
return
}

if _, err := h.serviceManager.Update(oldSvc, obj); err != nil {
log.Errorf("Could not update mesh service: %v", err)
}

Expand Down Expand Up @@ -108,7 +114,7 @@ func (h *Handler) OnDelete(obj interface{}) {

log.Debugf("MeshControllerHandler ObjectDeleted with type: *corev1.Service: %s/%s", obj.Namespace, obj.Name)

if err := h.serviceManager.Delete(obj.Name, obj.Namespace); err != nil {
if err := h.serviceManager.Delete(obj); err != nil {
log.Errorf("Could not delete mesh service: %v", err)
}
case *corev1.Endpoints:
Expand Down
168 changes: 87 additions & 81 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,47 +42,24 @@ func NewShadowServiceManager(lister listers.ServiceLister, namespace string, tcp

// Create creates a new shadow service based on the given service.
func (s *ShadowServiceManager) Create(userSvc *corev1.Service) error {
meshSvcName := s.userServiceToMeshServiceName(userSvc.Name, userSvc.Namespace)
log.Debugf("Creating mesh service: %s", meshSvcName)
name := s.getShadowServiceName(userSvc.Name, userSvc.Namespace)

_, err := s.lister.Services(s.namespace).Get(meshSvcName)
if !kerrors.IsNotFound(err) {
// nil will be return if the service already exists.
return err
}

var ports []corev1.ServicePort
log.Debugf("Creating mesh service: %s", name)

svcMode := userSvc.Annotations[k8s.AnnotationServiceType]
if svcMode == "" {
svcMode = s.defaultMode
_, err := s.lister.Services(s.namespace).Get(name)
if err == nil {
return nil
}

for id, sp := range userSvc.Spec.Ports {
if sp.Protocol != corev1.ProtocolTCP {
log.Warnf("Unsupported port type: %s, skipping port %s on service %s/%s", sp.Protocol, sp.Name, userSvc.Namespace, userSvc.Name)
continue
}

var targetPort int32

targetPort, err = s.getTargetPort(svcMode, id, userSvc.Name, userSvc.Namespace, sp.Port)
if err != nil {
log.Errorf("Unable to find available %s port: %v, skipping port %s on service %s/%s", sp.Name, err, sp.Name, userSvc.Namespace, userSvc.Name)
continue
}

ports = append(ports, corev1.ServicePort{
Name: sp.Name,
Port: sp.Port,
Protocol: sp.Protocol,
TargetPort: intstr.FromInt(int(targetPort)),
})
if !kerrors.IsNotFound(err) {
return fmt.Errorf("unable to get shadow service %q: %w", name, err)
}

ports := s.getShadowServicePorts(userSvc)

svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: meshSvcName,
Name: name,
Namespace: s.namespace,
Labels: map[string]string{
"app": "maesh",
Expand All @@ -103,51 +80,22 @@ func (s *ShadowServiceManager) Create(userSvc *corev1.Service) error {
return nil
}

// Update updates shadow service based on an update made on the given service.
func (s *ShadowServiceManager) Update(userSvc *v1.Service) (*v1.Service, error) {
meshSvcName := s.userServiceToMeshServiceName(userSvc.Name, userSvc.Namespace)
// Update updates the shadow service associated with the old user service following the content of the new user service.
func (s *ShadowServiceManager) Update(oldUserSvc *v1.Service, newUserSvc *v1.Service) (*v1.Service, error) {
name := s.getShadowServiceName(newUserSvc.Name, newUserSvc.Namespace)

s.cleanupPortMapping(oldUserSvc, newUserSvc)

var updatedSvc *corev1.Service

retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
service, err := s.lister.Services(s.namespace).Get(meshSvcName)
svc, err := s.lister.Services(s.namespace).Get(name)
if err != nil {
return err
}

var ports []corev1.ServicePort

svcMode := userSvc.Annotations[k8s.AnnotationServiceType]
if svcMode == "" {
svcMode = s.defaultMode
}

for id, sp := range userSvc.Spec.Ports {
if sp.Protocol != corev1.ProtocolTCP {
log.Warnf("Unsupported port type: %s, skipping port %s on service %s/%s", sp.Protocol, sp.Name, userSvc.Namespace, userSvc.Name)
continue
}

var targetPort int32

targetPort, err = s.getTargetPort(svcMode, id, userSvc.Name, userSvc.Namespace, sp.Port)
if err != nil {
log.Errorf("Unable to find available %s port: %v, skipping port %s on service %s/%s", sp.Name, err, sp.Name, userSvc.Namespace, userSvc.Name)
continue
}

meshPort := corev1.ServicePort{
Name: sp.Name,
Port: sp.Port,
Protocol: sp.Protocol,
TargetPort: intstr.FromInt(int(targetPort)),
}

ports = append(ports, meshPort)
return fmt.Errorf("unable to get shadow service %q: %w", name, err)
}

newSvc := service.DeepCopy()
newSvc.Spec.Ports = ports
newSvc := svc.DeepCopy()
newSvc.Spec.Ports = s.getShadowServicePorts(newUserSvc)

if updatedSvc, err = s.kubeClient.CoreV1().Services(s.namespace).Update(newSvc); err != nil {
return fmt.Errorf("unable to update kubernetes service: %w", err)
Expand All @@ -157,35 +105,93 @@ func (s *ShadowServiceManager) Update(userSvc *v1.Service) (*v1.Service, error)
})

if retryErr != nil {
return nil, fmt.Errorf("unable to update service %q: %v", meshSvcName, retryErr)
return nil, fmt.Errorf("unable to update service %q: %v", name, retryErr)
}

log.Debugf("Updated service: %s/%s", s.namespace, meshSvcName)
log.Debugf("Updated service: %s/%s", s.namespace, name)

return updatedSvc, nil
}

// Delete deletes a shadow service based on the given service.
func (s *ShadowServiceManager) Delete(svcName, svcNamespace string) error {
meshSvcName := s.userServiceToMeshServiceName(svcName, svcNamespace)
// Delete deletes a shadow service associated with the given user service.
func (s *ShadowServiceManager) Delete(userSvc *v1.Service) error {
name := s.getShadowServiceName(userSvc.Name, userSvc.Namespace)

s.cleanupPortMapping(userSvc, nil)

_, err := s.lister.Services(s.namespace).Get(meshSvcName)
_, err := s.lister.Services(s.namespace).Get(name)
if err != nil {
return err
}

// Service exists, delete
if err := s.kubeClient.CoreV1().Services(s.namespace).Delete(meshSvcName, &metav1.DeleteOptions{}); err != nil {
if err := s.kubeClient.CoreV1().Services(s.namespace).Delete(name, &metav1.DeleteOptions{}); err != nil {
return err
}

log.Debugf("Deleted service: %s/%s", s.namespace, meshSvcName)
log.Debugf("Deleted service: %s/%s", s.namespace, name)

return nil
}

func (s *ShadowServiceManager) cleanupPortMapping(oldUserSvc *corev1.Service, newUserSvc *corev1.Service) {
for _, old := range oldUserSvc.Spec.Ports {
var found bool

if newUserSvc != nil {
for _, new := range newUserSvc.Spec.Ports {
if old.Port == new.Port {
found = true
break
}
}
}

if !found {
_, err := s.tcpStateTable.Remove(k8s.ServiceWithPort{
Namespace: oldUserSvc.Namespace,
Name: oldUserSvc.Name,
Port: old.Port,
})
if err != nil {
log.Warnf("Unable to remove port mapping for %s/%s on port %d", oldUserSvc.Namespace, oldUserSvc.Name, old.Port)
}
}
}
}

func (s *ShadowServiceManager) getShadowServicePorts(svc *corev1.Service) []corev1.ServicePort {
var ports []corev1.ServicePort

svcMode := svc.Annotations[k8s.AnnotationServiceType]
if svcMode == "" {
svcMode = s.defaultMode
}

for i, sp := range svc.Spec.Ports {
if sp.Protocol != corev1.ProtocolTCP {
log.Warnf("Unsupported port type: %s, skipping port %s on service %s/%s", sp.Protocol, sp.Name, svc.Namespace, svc.Name)
continue
}

targetPort, err := s.getTargetPort(svcMode, i, svc.Name, svc.Namespace, sp.Port)
if err != nil {
log.Errorf("Unable to find available %s port: %v, skipping port %s on service %s/%s", sp.Name, err, sp.Name, svc.Namespace, svc.Name)
continue
}

ports = append(ports, corev1.ServicePort{
Name: sp.Name,
Port: sp.Port,
Protocol: sp.Protocol,
TargetPort: intstr.FromInt(int(targetPort)),
})
}

return ports
}

// userServiceToMeshServiceName converts a User service with a namespace to a mesh service name.
func (s *ShadowServiceManager) userServiceToMeshServiceName(name string, namespace string) string {
func (s *ShadowServiceManager) getShadowServiceName(name string, namespace string) string {
return fmt.Sprintf("%s-%s-6d61657368-%s", s.namespace, name, namespace)
}

Expand Down

0 comments on commit b27daf7

Please sign in to comment.