Skip to content

Commit

Permalink
Prevent multiple restores
Browse files Browse the repository at this point in the history
  • Loading branch information
mrajashree committed Sep 29, 2020
1 parent 309ca7f commit 566438a
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 4 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ require (
github.com/sirupsen/logrus v1.5.0
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.18.0
k8s.io/apiextensions-apiserver v0.18.0
k8s.io/apimachinery v0.18.0
k8s.io/apiserver v0.18.0
k8s.io/client-go v0.18.0
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89
)
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
k8sv1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
)

const (
Expand Down Expand Up @@ -85,6 +86,11 @@ func main() {
logrus.Fatalf("Error getting clientSet: %s", err.Error())
}

k8sclient, err := kubernetes.NewForConfig(restKubeConfig)
if err != nil {
logrus.Fatalf("Error getting kubernetes client: %s", err.Error())
}

dynamicInterace, err := dynamic.NewForConfig(restKubeConfig)
if err != nil {
logrus.Fatalf("Error generating dynamic client: %s", err.Error())
Expand Down Expand Up @@ -146,6 +152,7 @@ func main() {
restore.Register(ctx, backups.Resources().V1().Restore(),
backups.Resources().V1().Backup(),
core.Core().V1().Secret(),
k8sclient.CoordinationV1().Leases(ChartNamespace),
clientSet, dynamicInterace, sharedClientFactory, restmapper, defaultMountPath, defaultS3)

if err := start.All(ctx, 2, backups); err != nil {
Expand Down
92 changes: 88 additions & 4 deletions pkg/controllers/restore/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/rancher/wrangler/pkg/genericcondition"
"github.com/sirupsen/logrus"

coordinationv1 "k8s.io/api/coordination/v1"
apiext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -29,14 +30,18 @@ import (
"k8s.io/apiserver/pkg/storage/value"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
coordinationclientv1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
"k8s.io/client-go/util/retry"
"k8s.io/utils/pointer"
)

const (
metadataMapKey = "metadata"
ownerRefsMapKey = "ownerReferences"
clusterScoped = "clusterscoped"
namespaceScoped = "namespaceScoped"
metadataMapKey = "metadata"
ownerRefsMapKey = "ownerReferences"
clusterScoped = "clusterscoped"
namespaceScoped = "namespaceScoped"
lockInfoAnnotation = "restore.controller.io/lock-info"
leaseName = "restore-controller"
)

type handler struct {
Expand All @@ -51,6 +56,12 @@ type handler struct {
restmapper meta.RESTMapper
defaultBackupMountPath string
defaultS3BackupLocation *v1.S3ObjectStore
kubernetesLeaseClient coordinationclientv1.LeaseInterface
}

type leaseLockInfo struct {
UID string
Name string
}

type ObjectsFromBackupCR struct {
Expand Down Expand Up @@ -82,6 +93,7 @@ func Register(
restores restoreControllers.RestoreController,
backups restoreControllers.BackupController,
secrets v1core.SecretController,
leaseClient coordinationclientv1.LeaseInterface,
clientSet *clientset.Clientset,
dynamicInterface dynamic.Interface,
sharedClientFactory lasso.SharedClientFactory,
Expand All @@ -101,6 +113,7 @@ func Register(
restmapper: restmapper,
defaultBackupMountPath: defaultLocalBackupLocation,
defaultS3BackupLocation: defaultS3,
kubernetesLeaseClient: leaseClient,
}

// Register handlers
Expand All @@ -115,6 +128,12 @@ func (h *handler) OnRestoreChange(_ string, restore *v1.Restore) (*v1.Restore, e
return restore, nil
}

if err := h.Lock(restore); err != nil {
return restore, err
}
defer h.Unlock(fmt.Sprintf("%s:%s", restore.Name, string(restore.UID)))

logrus.Infof("Processing Restore CR %v", restore.Name)
var backupSource string
backupName := restore.Spec.BackupFilename
logrus.Infof("Restoring from backup %v", restore.Spec.BackupFilename)
Expand Down Expand Up @@ -700,3 +719,68 @@ func (h *handler) setReconcilingCondition(restore *v1.Restore, originalErr error

return restore, err
}

func (h *handler) Lock(restore *v1.Restore) error {
lease, err := h.kubernetesLeaseClient.Get(h.ctx, leaseName, k8sv1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}
lease = &coordinationv1.Lease{
ObjectMeta: k8sv1.ObjectMeta{
Name: leaseName,
Namespace: util.ChartNamespace,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: pointer.StringPtr(fmt.Sprintf("%s:%s", restore.Name, string(restore.UID))),
},
}
_, err = h.kubernetesLeaseClient.Create(h.ctx, lease, k8sv1.CreateOptions{})
return err
}

if lease.Spec.HolderIdentity != nil {
// check that the current holder exists and is not deleted from cluster (if Restore CR gets deleted, reset lease)
split := strings.SplitN(*lease.Spec.HolderIdentity, ":", 2)
if len(split) != 2 {
logrus.Errorf("Invalid holderIdentity on lease, resetting to current Restore")
return h.updateLeaseHolderIdentity(restore, lease)
}
restoreName, restoreUID := split[0], split[1]
restoreInProgress, err := h.restores.Get(restoreName, k8sv1.GetOptions{})
if err == nil {
// restore CR by this name exists.
if string(restoreInProgress.UID) != restoreUID {
// the Restore CR by this name has a different UID, update lease lock with it
return h.updateLeaseHolderIdentity(restoreInProgress, lease)
}
} else if apierrors.IsNotFound(err) {
// restore CR no longer exists in cluster, remove it from holderIdentity and set current Restore CR as holder
return h.updateLeaseHolderIdentity(restore, lease)
}
return fmt.Errorf("restore %v is in progress", restoreName)
}
return h.updateLeaseHolderIdentity(restore, lease)
}

func (h *handler) updateLeaseHolderIdentity(restore *v1.Restore, lease *coordinationv1.Lease) error {
lease.Spec.HolderIdentity = pointer.StringPtr(fmt.Sprintf("%s:%s", restore.Name, string(restore.UID)))
_, err := h.kubernetesLeaseClient.Update(h.ctx, lease, k8sv1.UpdateOptions{})
return err
}

func (h *handler) Unlock(id string) error {
lease, err := h.kubernetesLeaseClient.Get(h.ctx, leaseName, k8sv1.GetOptions{})
if err != nil {
return err
}
if lease.Spec.HolderIdentity == nil {
return nil
}
if *lease.Spec.HolderIdentity != id {
return fmt.Errorf("lease holder ID %q does not match existing restore CR", id)
}
lease.Spec.HolderIdentity = nil
_, err = h.kubernetesLeaseClient.Update(h.ctx, lease, k8sv1.UpdateOptions{})
return err
}

0 comments on commit 566438a

Please sign in to comment.