Skip to content

Commit

Permalink
core: add context parameter to k8sutil service
Browse files Browse the repository at this point in the history
This commit adds context parameter to k8sutil service functions. By
this, we can handle cancellation during API call of service resource.

Signed-off-by: Yuichiro Ueno <y1r.ueno@gmail.com>
  • Loading branch information
y1r committed Dec 11, 2021
1 parent 3309e9a commit 03e976b
Show file tree
Hide file tree
Showing 9 changed files with 15 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/cluster_external.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (c *ClusterController) configureExternalClusterMonitoring(context *clusterd
return err
}
logger.Info("creating mgr external monitoring service")
_, err = k8sutil.CreateOrUpdateService(context.Clientset, cluster.Namespace, service)
_, err = k8sutil.CreateOrUpdateService(cluster.ClusterInfo.Context, context.Clientset, cluster.Namespace, service)
if err != nil && !kerrors.IsAlreadyExists(err) {
return errors.Wrap(err, "failed to create or update mgr service")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/mgr/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (c *Cluster) configureDashboardService(activeDaemon string) error {
}
if c.spec.Dashboard.Enabled {
// expose the dashboard service
if _, err := k8sutil.CreateOrUpdateService(c.context.Clientset, c.clusterInfo.Namespace, dashboardService); err != nil {
if _, err := k8sutil.CreateOrUpdateService(c.clusterInfo.Context, c.context.Clientset, c.clusterInfo.Namespace, dashboardService); err != nil {
return errors.Wrap(err, "failed to configure dashboard svc")
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/mgr/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (c *Cluster) reconcileServices(activeDaemon string) error {
if err != nil {
return err
}
if _, err := k8sutil.CreateOrUpdateService(c.context.Clientset, c.clusterInfo.Namespace, service); err != nil {
if _, err := k8sutil.CreateOrUpdateService(c.clusterInfo.Context, c.context.Clientset, c.clusterInfo.Namespace, service); err != nil {
return errors.Wrap(err, "failed to create mgr metrics service")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cluster/mon/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (c *Cluster) createService(mon *monConfig) (string, error) {
}
}

s, err := k8sutil.CreateOrUpdateService(c.context.Clientset, c.Namespace, svcDef)
s, err := k8sutil.CreateOrUpdateService(c.ClusterInfo.Context, c.context.Clientset, c.Namespace, svcDef)
if err != nil {
return "", errors.Wrapf(err, "failed to create service for mon %s", mon.DaemonName)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/cr_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (o *Operator) startCRDManager(context context.Context, mgrErrorCh chan erro
return
}
if isPresent {
err := createWebhookService(o.context)
err := createWebhookService(context, o.context)
if err != nil {
mgrErrorCh <- errors.Wrap(err, "failed to create admission webhook service")
return
Expand Down
6 changes: 3 additions & 3 deletions pkg/operator/ceph/csi/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func (r *ReconcileCSI) startDrivers(ver *version.Info, ownerInfo *k8sutil.OwnerI
if err != nil {
return errors.Wrapf(err, "failed to set owner reference to rbd service %q", rbdService)
}
_, err = k8sutil.CreateOrUpdateService(r.context.Clientset, r.opConfig.OperatorNamespace, rbdService)
_, err = k8sutil.CreateOrUpdateService(r.opManagerContext, r.context.Clientset, r.opConfig.OperatorNamespace, rbdService)
if err != nil {
return errors.Wrapf(err, "failed to create rbd service %q", rbdService.Name)
}
Expand Down Expand Up @@ -525,7 +525,7 @@ func (r *ReconcileCSI) startDrivers(ver *version.Info, ownerInfo *k8sutil.OwnerI
if err != nil {
return errors.Wrapf(err, "failed to set owner reference to cephfs service %q", cephfsService)
}
_, err = k8sutil.CreateOrUpdateService(r.context.Clientset, r.opConfig.OperatorNamespace, cephfsService)
_, err = k8sutil.CreateOrUpdateService(r.opManagerContext, r.context.Clientset, r.opConfig.OperatorNamespace, cephfsService)
if err != nil {
return errors.Wrapf(err, "failed to create cephfs service %q", cephfsService.Name)
}
Expand Down Expand Up @@ -587,7 +587,7 @@ func (r *ReconcileCSI) deleteCSIDriverResources(ver *version.Info, daemonset, de
succeeded = false
}

err = k8sutil.DeleteService(r.context.Clientset, r.opConfig.OperatorNamespace, service)
err = k8sutil.DeleteService(r.opManagerContext, r.context.Clientset, r.opConfig.OperatorNamespace, service)
if err != nil {
logger.Errorf("failed to delete the %q. %v", service, err)
succeeded = false
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func (c *clusterConfig) reconcileService(cephObjectStore *cephv1.CephObjectStore
return "", errors.Wrapf(err, "failed to set owner reference to ceph object store service %q", service.Name)
}

svc, err := k8sutil.CreateOrUpdateService(c.context.Clientset, cephObjectStore.Namespace, service)
svc, err := k8sutil.CreateOrUpdateService(c.clusterInfo.Context, c.context.Clientset, cephObjectStore.Namespace, service)
if err != nil {
return "", errors.Wrapf(err, "failed to create or update object store %q service", cephObjectStore.Name)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func isSecretPresent(ctx context.Context, context *clusterd.Context) (bool, erro
return true, nil
}

func createWebhookService(context *clusterd.Context) error {
func createWebhookService(ctx context.Context, context *clusterd.Context) error {
webhookService := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: admissionControllerAppName,
Expand All @@ -93,7 +93,7 @@ func createWebhookService(context *clusterd.Context) error {
},
}

_, err := k8sutil.CreateOrUpdateService(context.Clientset, namespace, &webhookService)
_, err := k8sutil.CreateOrUpdateService(ctx, context.Clientset, namespace, &webhookService)
if err != nil {
return err
}
Expand Down
11 changes: 4 additions & 7 deletions pkg/operator/k8sutil/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ import (

// CreateOrUpdateService creates a service or updates the service declaratively if it already exists.
func CreateOrUpdateService(
clientset kubernetes.Interface, namespace string, serviceDefinition *v1.Service,
ctx context.Context, clientset kubernetes.Interface, namespace string, serviceDefinition *v1.Service,
) (*v1.Service, error) {
ctx := context.TODO()
name := serviceDefinition.Name
logger.Debugf("creating service %s", name)

Expand All @@ -39,7 +38,7 @@ func CreateOrUpdateService(
if !errors.IsAlreadyExists(err) {
return nil, fmt.Errorf("failed to create service %s. %+v", name, err)
}
s, err = UpdateService(clientset, namespace, serviceDefinition)
s, err = UpdateService(ctx, clientset, namespace, serviceDefinition)
if err != nil {
return nil, fmt.Errorf("failed to update service %s. %+v", name, err)
}
Expand All @@ -52,9 +51,8 @@ func CreateOrUpdateService(
// UpdateService updates a service declaratively. If the service does not exist this is considered
// an error condition.
func UpdateService(
clientset kubernetes.Interface, namespace string, serviceDefinition *v1.Service,
ctx context.Context, clientset kubernetes.Interface, namespace string, serviceDefinition *v1.Service,
) (*v1.Service, error) {
ctx := context.TODO()
name := serviceDefinition.Name
logger.Debugf("updating service %s", name)
existing, err := clientset.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
Expand All @@ -69,8 +67,7 @@ func UpdateService(
}

// DeleteService deletes a Service and returns the error if any
func DeleteService(clientset kubernetes.Interface, namespace, name string) error {
ctx := context.TODO()
func DeleteService(ctx context.Context, clientset kubernetes.Interface, namespace, name string) error {
err := clientset.CoreV1().Services(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil {
if errors.IsNotFound(err) {
Expand Down

0 comments on commit 03e976b

Please sign in to comment.