Skip to content

Commit

Permalink
Headless service port consolidation (#1936)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Valdivia <18384552+dvaldivia@users.noreply.github.com>
  • Loading branch information
dvaldivia committed Jan 11, 2024
1 parent 796ce19 commit 60981d2
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 56 deletions.
63 changes: 7 additions & 56 deletions pkg/controller/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ import (
minioscheme "github.com/minio/operator/pkg/client/clientset/versioned/scheme"
informers "github.com/minio/operator/pkg/client/informers/externalversions/minio.min.io/v2"
stsInformers "github.com/minio/operator/pkg/client/informers/externalversions/sts.min.io/v1alpha1"
"github.com/minio/operator/pkg/resources/services"
"github.com/minio/operator/pkg/resources/statefulsets"
)

Expand Down Expand Up @@ -915,59 +914,11 @@ func (c *Controller) syncHandler(key string) (Result, error) {
return WrapResult(Result{}, err)
}

// Handle the Internal Headless Service for Tenant StatefulSet
hlSvc, err := c.serviceLister.Services(tenant.Namespace).Get(tenant.MinIOHLServiceName())
// Check MinIO Headless Service used for internode communication
err = c.checkMinIOHLSvc(ctx, tenant, nsName)
if err != nil {
if k8serrors.IsNotFound(err) {
if tenant, err = c.updateTenantStatus(ctx, tenant, StatusProvisioningHLService, 0); err != nil {
return WrapResult(Result{}, err)
}
klog.V(2).Infof("Creating a new Headless Service for cluster %q", nsName)
// Create the headless service for the tenant
hlSvc = services.NewHeadlessForMinIO(tenant)
_, err = c.kubeClientSet.CoreV1().Services(tenant.Namespace).Create(ctx, hlSvc, cOpts)
if err != nil {
return WrapResult(Result{}, err)
}
c.recorder.Event(tenant, corev1.EventTypeNormal, "SvcCreated", "Headless Service created")
} else {
return WrapResult(Result{}, err)
}
} else {
existingPorts := hlSvc.Spec.Ports
sftpPortFound := false
for _, port := range existingPorts {
if port.Name == miniov2.MinIOServiceSFTPPortName {
sftpPortFound = true
break
}
}
var newPorts []corev1.ServicePort
if tenant.Spec.Features != nil && tenant.Spec.Features.EnableSFTP != nil && *tenant.Spec.Features.EnableSFTP {
if !sftpPortFound {
newPorts = existingPorts
newPorts = append(newPorts, corev1.ServicePort{Port: miniov2.MinIOSFTPPort, Name: miniov2.MinIOServiceSFTPPortName})
hlSvc.Spec.Ports = newPorts
_, err := c.kubeClientSet.CoreV1().Services(tenant.Namespace).Update(ctx, hlSvc, metav1.UpdateOptions(cOpts))
if err != nil {
return WrapResult(Result{}, err)
}
}
} else {
if sftpPortFound {
for _, port := range existingPorts {
if port.Name == miniov2.MinIOServiceSFTPPortName {
continue
}
newPorts = append(newPorts, port)
}
hlSvc.Spec.Ports = newPorts
_, err := c.kubeClientSet.CoreV1().Services(tenant.Namespace).Update(ctx, hlSvc, metav1.UpdateOptions(cOpts))
if err != nil {
return WrapResult(Result{}, err)
}
}
}
klog.V(2).Infof("error consolidating headless service: %s", err.Error())
return WrapResult(Result{}, err)
}

// List all MinIO Tenants in this namespace.
Expand Down Expand Up @@ -1087,7 +1038,7 @@ func (c *Controller) syncHandler(key string) (Result, error) {
SkipEnvVars: skipEnvVars,
Pool: &pool,
PoolStatus: &tenant.Status.Pools[i],
ServiceName: hlSvc.Name,
ServiceName: tenant.MinIOHLServiceName(),
HostsTemplate: c.hostsTemplate,
OperatorVersion: c.operatorVersion,
OperatorCATLS: operatorCATLSExists,
Expand Down Expand Up @@ -1297,7 +1248,7 @@ func (c *Controller) syncHandler(key string) (Result, error) {
SkipEnvVars: skipEnvVars,
Pool: &pool,
PoolStatus: &tenant.Status.Pools[i],
ServiceName: hlSvc.Name,
ServiceName: tenant.MinIOHLServiceName(),
HostsTemplate: c.hostsTemplate,
OperatorVersion: c.operatorVersion,
OperatorCATLS: operatorCATLSExists,
Expand Down Expand Up @@ -1347,7 +1298,7 @@ func (c *Controller) syncHandler(key string) (Result, error) {
SkipEnvVars: skipEnvVars,
Pool: &pool,
PoolStatus: &tenant.Status.Pools[i],
ServiceName: hlSvc.Name,
ServiceName: tenant.MinIOHLServiceName(),
HostsTemplate: c.hostsTemplate,
OperatorVersion: c.operatorVersion,
OperatorCATLS: operatorCATLSExists,
Expand Down
51 changes: 51 additions & 0 deletions pkg/controller/minio-services.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,54 @@ func minioSvcMatchesSpecification(svc *v1.Service, expectedSvc *v1.Service) (boo
}
return true, nil
}

// checkMinIOHLSvc validates the existence of the MinIO headless service and validate its status against what
// the specification states
func (c *Controller) checkMinIOHLSvc(ctx context.Context, tenant *miniov2.Tenant, nsName types.NamespacedName) error {
// Handle the Internal Headless Service for Tenant StatefulSet
hlSvc, err := c.serviceLister.Services(tenant.Namespace).Get(tenant.MinIOHLServiceName())
if err != nil {
if k8serrors.IsNotFound(err) {
if tenant, err = c.updateTenantStatus(ctx, tenant, StatusProvisioningHLService, 0); err != nil {
return err
}
klog.V(2).Infof("Creating a new Headless Service for cluster %q", nsName)
// Create the headless service for the tenant
hlSvc = services.NewHeadlessForMinIO(tenant)
_, err = c.kubeClientSet.CoreV1().Services(tenant.Namespace).Create(ctx, hlSvc, metav1.CreateOptions{})
if err != nil {
return err
}
c.recorder.Event(tenant, corev1.EventTypeNormal, "SvcCreated", "Headless Service created")
} else {
return err
}
}
// compare the current version of the service to what we expect
expectedHlSvc := services.NewHeadlessForMinIO(tenant)
// does the current service matches our specification?
minioSvcMatchesSpec, err := minioSvcMatchesSpecification(hlSvc, expectedHlSvc)

// check the specification of the MinIO ClusterIP service
if !minioSvcMatchesSpec {
if err != nil {
klog.Infof("Headless Services don't match: %s", err)
}

// impose what we care about
hlSvc.ObjectMeta.Annotations = expectedHlSvc.ObjectMeta.Annotations
hlSvc.ObjectMeta.Labels = expectedHlSvc.ObjectMeta.Labels
hlSvc.Spec.Ports = expectedHlSvc.Spec.Ports

// update the selector
hlSvc.Spec.Selector = expectedHlSvc.Spec.Selector

_, err = c.kubeClientSet.CoreV1().Services(tenant.Namespace).Update(ctx, hlSvc, metav1.UpdateOptions{})
if err != nil {
return err
}
c.recorder.Event(tenant, corev1.EventTypeNormal, "Updated", "Headless Service Updated")

}
return err
}

0 comments on commit 60981d2

Please sign in to comment.