Skip to content

Commit

Permalink
Prevent multiple restores
Browse files Browse the repository at this point in the history
  • Loading branch information
mrajashree committed Sep 28, 2020
1 parent 309ca7f commit 3ca1f3f
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 4 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ require (
github.com/sirupsen/logrus v1.5.0
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.18.0
k8s.io/apiextensions-apiserver v0.18.0
k8s.io/apimachinery v0.18.0
k8s.io/apiserver v0.18.0
k8s.io/client-go v0.18.0
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89
)
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
k8sv1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
)

const (
Expand Down Expand Up @@ -85,6 +86,11 @@ func main() {
logrus.Fatalf("Error getting clientSet: %s", err.Error())
}

k8sclient, err := kubernetes.NewForConfig(restKubeConfig)
if err != nil {
logrus.Fatalf("Error getting kubernetes client: %s", err.Error())
}

dynamicInterace, err := dynamic.NewForConfig(restKubeConfig)
if err != nil {
logrus.Fatalf("Error generating dynamic client: %s", err.Error())
Expand Down Expand Up @@ -146,6 +152,7 @@ func main() {
restore.Register(ctx, backups.Resources().V1().Restore(),
backups.Resources().V1().Backup(),
core.Core().V1().Secret(),
k8sclient.CoordinationV1().Leases(ChartNamespace),
clientSet, dynamicInterace, sharedClientFactory, restmapper, defaultMountPath, defaultS3)

if err := start.All(ctx, 2, backups); err != nil {
Expand Down
136 changes: 132 additions & 4 deletions pkg/controllers/restore/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package restore

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/rancher/wrangler/pkg/genericcondition"
"github.com/sirupsen/logrus"

coordinationv1 "k8s.io/api/coordination/v1"
apiext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -29,14 +31,18 @@ import (
"k8s.io/apiserver/pkg/storage/value"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
coordinationclientv1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
"k8s.io/client-go/util/retry"
"k8s.io/utils/pointer"
)

const (
metadataMapKey = "metadata"
ownerRefsMapKey = "ownerReferences"
clusterScoped = "clusterscoped"
namespaceScoped = "namespaceScoped"
metadataMapKey = "metadata"
ownerRefsMapKey = "ownerReferences"
clusterScoped = "clusterscoped"
namespaceScoped = "namespaceScoped"
lockInfoAnnotation = "restore.controller.io/lock-info"
leaseName = "restore-controller"
)

type handler struct {
Expand All @@ -51,6 +57,12 @@ type handler struct {
restmapper meta.RESTMapper
defaultBackupMountPath string
defaultS3BackupLocation *v1.S3ObjectStore
kubernetesLeaseClient coordinationclientv1.LeaseInterface
}

type leaseLockInfo struct {
UID string
Name string
}

type ObjectsFromBackupCR struct {
Expand Down Expand Up @@ -82,6 +94,7 @@ func Register(
restores restoreControllers.RestoreController,
backups restoreControllers.BackupController,
secrets v1core.SecretController,
leaseClient coordinationclientv1.LeaseInterface,
clientSet *clientset.Clientset,
dynamicInterface dynamic.Interface,
sharedClientFactory lasso.SharedClientFactory,
Expand All @@ -101,6 +114,7 @@ func Register(
restmapper: restmapper,
defaultBackupMountPath: defaultLocalBackupLocation,
defaultS3BackupLocation: defaultS3,
kubernetesLeaseClient: leaseClient,
}

// Register handlers
Expand All @@ -115,6 +129,12 @@ func (h *handler) OnRestoreChange(_ string, restore *v1.Restore) (*v1.Restore, e
return restore, nil
}

if _, err := h.Lock(restore); err != nil {
return restore, err
}
defer h.Unlock(string(restore.UID))

logrus.Infof("Processing Restore CR %v", restore.Name)
var backupSource string
backupName := restore.Spec.BackupFilename
logrus.Infof("Restoring from backup %v", restore.Spec.BackupFilename)
Expand Down Expand Up @@ -700,3 +720,111 @@ func (h *handler) setReconcilingCondition(restore *v1.Restore, originalErr error

return restore, err
}

func (h *handler) Lock(restore *v1.Restore) (string, error) {
lease, err := h.kubernetesLeaseClient.Get(h.ctx, leaseName, k8sv1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return "", err
}
lease = &coordinationv1.Lease{
ObjectMeta: k8sv1.ObjectMeta{
Name: leaseName,
Namespace: util.ChartNamespace,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: pointer.StringPtr(string(restore.UID)),
},
}
if err := h.setLeaseHolderAnnotation(restore, lease); err != nil {
return "", err
}

_, err = h.kubernetesLeaseClient.Create(h.ctx, lease, k8sv1.CreateOptions{})
if err != nil {
return "", err
}
return string(restore.UID), nil
}

if lease.Spec.HolderIdentity != nil {
if *lease.Spec.HolderIdentity == string(restore.UID) {
return string(restore.UID), nil
}
// else check if there is a current holder
holderIdentity, err := h.getHolderIdentity(lease)
if err != nil {
return "", err
}
if holderIdentity == "" {
return h.updateLeaseHolderIdentity(restore, lease)
}

// check if this holder (Restore CR by this name exists)
if _, err := h.restores.Get(holderIdentity, k8sv1.GetOptions{}); err != nil && apierrors.IsNotFound(err) {
// restore CR no longer exists in cluster, remove it from holderIdentity
return h.updateLeaseHolderIdentity(restore, lease)
}
return "", fmt.Errorf("restore %v is being processed", *lease.Spec.HolderIdentity)
}
return h.updateLeaseHolderIdentity(restore, lease)
}

func (h *handler) updateLeaseHolderIdentity(restore *v1.Restore, lease *coordinationv1.Lease) (string, error) {
lease.Spec.HolderIdentity = pointer.StringPtr(string(restore.UID))
if err := h.setLeaseHolderAnnotation(restore, lease); err != nil {
return "", err
}

_, err := h.kubernetesLeaseClient.Update(h.ctx, lease, k8sv1.UpdateOptions{})
if err != nil {
return "", err
}
return string(restore.UID), nil
}

func (h *handler) setLeaseHolderAnnotation(restore *v1.Restore, lease *coordinationv1.Lease) error {
info := leaseLockInfo{
UID: string(restore.UID),
Name: restore.Name,
}
infoBytes, err := json.Marshal(info)
if err != nil {
return err
}
if lease.Annotations == nil {
lease.Annotations = make(map[string]string)
}
lease.Annotations[lockInfoAnnotation] = string(infoBytes)
return nil
}

func (h *handler) getHolderIdentity(lease *coordinationv1.Lease) (string, error) {
holderData := lease.Annotations[lockInfoAnnotation]
if holderData == "" {
return "", nil
}
var holderInfo leaseLockInfo
if err := json.Unmarshal([]byte(holderData), &holderInfo); err != nil {
return "", fmt.Errorf("err getting holder info: %v", err)
}
return holderInfo.Name, nil
}

func (h *handler) Unlock(id string) error {
lease, err := h.kubernetesLeaseClient.Get(h.ctx, leaseName, k8sv1.GetOptions{})
if err != nil {
return err
}
if lease.Spec.HolderIdentity == nil {
return fmt.Errorf("controller is already unlocked")
}

if *lease.Spec.HolderIdentity != id {
return fmt.Errorf("lock ID %q does not match existing lock", id)
}
lease.Spec.HolderIdentity = nil
lease.Annotations[lockInfoAnnotation] = ""
_, err = h.kubernetesLeaseClient.Update(h.ctx, lease, k8sv1.UpdateOptions{})
return err
}

0 comments on commit 3ca1f3f

Please sign in to comment.