Skip to content

Commit

Permalink
Add context to crd utils
Browse files Browse the repository at this point in the history
Signed-off-by: Tamal Saha <tamal@appscode.com>
  • Loading branch information
tamalsaha committed May 21, 2020
1 parent 59478af commit 723f4de
Show file tree
Hide file tree
Showing 12 changed files with 281 additions and 256 deletions.
45 changes: 21 additions & 24 deletions apis/invoker.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package apis

import (
"context"
"fmt"

"stash.appscode.dev/apimachinery/apis/stash/v1alpha1"
Expand Down Expand Up @@ -73,7 +74,7 @@ func ExtractBackupInvokerInfo(stashClient cs.Interface, invokerType, invokerName
switch invokerType {
case v1beta1.ResourceKindBackupBatch:
// get BackupBatch
backupBatch, err := stashClient.StashV1beta1().BackupBatches(namespace).Get(invokerName, metav1.GetOptions{})
backupBatch, err := stashClient.StashV1beta1().BackupBatches(namespace).Get(context.TODO(), invokerName, metav1.GetOptions{})
if err != nil {
return invoker, err
}
Expand Down Expand Up @@ -114,23 +115,21 @@ func ExtractBackupInvokerInfo(stashClient cs.Interface, invokerType, invokerName
})
}
invoker.AddFinalizer = func() error {
_, _, err := v1beta1_util.PatchBackupBatch(stashClient.StashV1beta1(), backupBatch, func(in *v1beta1.BackupBatch) *v1beta1.BackupBatch {
_, _, err := v1beta1_util.PatchBackupBatch(context.TODO(), stashClient.StashV1beta1(), backupBatch, func(in *v1beta1.BackupBatch) *v1beta1.BackupBatch {
in.ObjectMeta = core_util.AddFinalizer(in.ObjectMeta, v1beta1.StashKey)
return in

})
}, metav1.PatchOptions{})
return err
}
invoker.RemoveFinalizer = func() error {
_, _, err := v1beta1_util.PatchBackupBatch(stashClient.StashV1beta1(), backupBatch, func(in *v1beta1.BackupBatch) *v1beta1.BackupBatch {
_, _, err := v1beta1_util.PatchBackupBatch(context.TODO(), stashClient.StashV1beta1(), backupBatch, func(in *v1beta1.BackupBatch) *v1beta1.BackupBatch {
in.ObjectMeta = core_util.RemoveFinalizer(in.ObjectMeta, v1beta1.StashKey)
return in

})
}, metav1.PatchOptions{})
return err
}
invoker.HasCondition = func(target *v1beta1.TargetRef, condType v1beta1.BackupInvokerCondition) (bool, error) {
backupBatch, err := stashClient.StashV1beta1().BackupBatches(namespace).Get(invokerName, metav1.GetOptions{})
backupBatch, err := stashClient.StashV1beta1().BackupBatches(namespace).Get(context.TODO(), invokerName, metav1.GetOptions{})
if err != nil {
return false, err
}
Expand All @@ -140,7 +139,7 @@ func ExtractBackupInvokerInfo(stashClient cs.Interface, invokerType, invokerName
return kmapi.HasCondition(backupBatch.Status.Conditions, string(condType)), nil
}
invoker.GetCondition = func(target *v1beta1.TargetRef, condType v1beta1.BackupInvokerCondition) (int, *kmapi.Condition, error) {
backupBatch, err := stashClient.StashV1beta1().BackupBatches(namespace).Get(invokerName, metav1.GetOptions{})
backupBatch, err := stashClient.StashV1beta1().BackupBatches(namespace).Get(context.TODO(), invokerName, metav1.GetOptions{})
if err != nil {
return -1, nil, err
}
Expand All @@ -153,18 +152,18 @@ func ExtractBackupInvokerInfo(stashClient cs.Interface, invokerType, invokerName

}
invoker.SetCondition = func(target *v1beta1.TargetRef, condition kmapi.Condition) error {
_, err = v1beta1_util.UpdateBackupBatchStatus(stashClient.StashV1beta1(), backupBatch.ObjectMeta, func(in *v1beta1.BackupBatchStatus) *v1beta1.BackupBatchStatus {
_, err = v1beta1_util.UpdateBackupBatchStatus(context.TODO(), stashClient.StashV1beta1(), backupBatch.ObjectMeta, func(in *v1beta1.BackupBatchStatus) *v1beta1.BackupBatchStatus {
if target != nil {
in.MemberConditions = setMemberCondition(in.MemberConditions, *target, condition)
} else {
in.Conditions = kmapi.SetCondition(in.Conditions, condition)
}
return in
})
}, metav1.UpdateOptions{})
return err
}
invoker.IsConditionTrue = func(target *v1beta1.TargetRef, condType v1beta1.BackupInvokerCondition) (bool, error) {
backupBatch, err := stashClient.StashV1beta1().BackupBatches(namespace).Get(invokerName, metav1.GetOptions{})
backupBatch, err := stashClient.StashV1beta1().BackupBatches(namespace).Get(context.TODO(), invokerName, metav1.GetOptions{})
if err != nil {
return false, err
}
Expand All @@ -175,7 +174,7 @@ func ExtractBackupInvokerInfo(stashClient cs.Interface, invokerType, invokerName
}
case v1beta1.ResourceKindBackupConfiguration:
// get BackupConfiguration
backupConfig, err := stashClient.StashV1beta1().BackupConfigurations(namespace).Get(invokerName, metav1.GetOptions{})
backupConfig, err := stashClient.StashV1beta1().BackupConfigurations(namespace).Get(context.TODO(), invokerName, metav1.GetOptions{})
if err != nil {
return invoker, err
}
Expand Down Expand Up @@ -213,45 +212,43 @@ func ExtractBackupInvokerInfo(stashClient cs.Interface, invokerType, invokerName
Hooks: backupConfig.Spec.Hooks,
})
invoker.AddFinalizer = func() error {
_, _, err := v1beta1_util.PatchBackupConfiguration(stashClient.StashV1beta1(), backupConfig, func(in *v1beta1.BackupConfiguration) *v1beta1.BackupConfiguration {
_, _, err := v1beta1_util.PatchBackupConfiguration(context.TODO(), stashClient.StashV1beta1(), backupConfig, func(in *v1beta1.BackupConfiguration) *v1beta1.BackupConfiguration {
in.ObjectMeta = core_util.AddFinalizer(in.ObjectMeta, v1beta1.StashKey)
return in

})
}, metav1.PatchOptions{})
return err
}
invoker.RemoveFinalizer = func() error {
_, _, err := v1beta1_util.PatchBackupConfiguration(stashClient.StashV1beta1(), backupConfig, func(in *v1beta1.BackupConfiguration) *v1beta1.BackupConfiguration {
_, _, err := v1beta1_util.PatchBackupConfiguration(context.TODO(), stashClient.StashV1beta1(), backupConfig, func(in *v1beta1.BackupConfiguration) *v1beta1.BackupConfiguration {
in.ObjectMeta = core_util.RemoveFinalizer(in.ObjectMeta, v1beta1.StashKey)
return in

})
}, metav1.PatchOptions{})
return err
}
invoker.HasCondition = func(target *v1beta1.TargetRef, condType v1beta1.BackupInvokerCondition) (bool, error) {
backupConfig, err := stashClient.StashV1beta1().BackupConfigurations(namespace).Get(invokerName, metav1.GetOptions{})
backupConfig, err := stashClient.StashV1beta1().BackupConfigurations(namespace).Get(context.TODO(), invokerName, metav1.GetOptions{})
if err != nil {
return false, err
}
return kmapi.HasCondition(backupConfig.Status.Conditions, string(condType)), nil
}
invoker.GetCondition = func(target *v1beta1.TargetRef, condType v1beta1.BackupInvokerCondition) (int, *kmapi.Condition, error) {
backupConfig, err := stashClient.StashV1beta1().BackupConfigurations(namespace).Get(invokerName, metav1.GetOptions{})
backupConfig, err := stashClient.StashV1beta1().BackupConfigurations(namespace).Get(context.TODO(), invokerName, metav1.GetOptions{})
if err != nil {
return -1, nil, err
}
idx, cond := kmapi.GetCondition(backupConfig.Status.Conditions, string(condType))
return idx, cond, nil
}
invoker.SetCondition = func(target *v1beta1.TargetRef, condition kmapi.Condition) error {
_, err = v1beta1_util.UpdateBackupConfigurationStatus(stashClient.StashV1beta1(), backupConfig.ObjectMeta, func(in *v1beta1.BackupConfigurationStatus) *v1beta1.BackupConfigurationStatus {
_, err = v1beta1_util.UpdateBackupConfigurationStatus(context.TODO(), stashClient.StashV1beta1(), backupConfig.ObjectMeta, func(in *v1beta1.BackupConfigurationStatus) *v1beta1.BackupConfigurationStatus {
in.Conditions = kmapi.SetCondition(in.Conditions, condition)
return in
})
}, metav1.UpdateOptions{})
return err
}
invoker.IsConditionTrue = func(target *v1beta1.TargetRef, condType v1beta1.BackupInvokerCondition) (bool, error) {
backupConfig, err := stashClient.StashV1beta1().BackupConfigurations(namespace).Get(invokerName, metav1.GetOptions{})
backupConfig, err := stashClient.StashV1beta1().BackupConfigurations(namespace).Get(context.TODO(), invokerName, metav1.GetOptions{})
if err != nil {
return false, err
}
Expand Down
61 changes: 22 additions & 39 deletions client/clientset/versioned/typed/stash/v1alpha1/util/recovery.go
Expand Up @@ -17,8 +17,8 @@ limitations under the License.
package util

import (
"context"
"fmt"
"time"

api "stash.appscode.dev/apimachinery/apis/stash/v1alpha1"
cs "stash.appscode.dev/apimachinery/client/clientset/versioned/typed/stash/v1alpha1"
Expand All @@ -32,29 +32,32 @@ import (
kutil "kmodules.xyz/client-go"
)

func CreateOrPatchRecovery(c cs.StashV1alpha1Interface, meta metav1.ObjectMeta, transform func(alert *api.Recovery) *api.Recovery) (*api.Recovery, kutil.VerbType, error) {
cur, err := c.Recoveries(meta.Namespace).Get(meta.Name, metav1.GetOptions{})
func CreateOrPatchRecovery(ctx context.Context, c cs.StashV1alpha1Interface, meta metav1.ObjectMeta, transform func(in *api.Recovery) *api.Recovery, opts metav1.PatchOptions) (*api.Recovery, kutil.VerbType, error) {
cur, err := c.Recoveries(meta.Namespace).Get(ctx, meta.Name, metav1.GetOptions{})
if kerr.IsNotFound(err) {
glog.V(3).Infof("Creating Recovery %s/%s.", meta.Namespace, meta.Name)
out, err := c.Recoveries(meta.Namespace).Create(transform(&api.Recovery{
out, err := c.Recoveries(meta.Namespace).Create(ctx, transform(&api.Recovery{
TypeMeta: metav1.TypeMeta{
Kind: "Recovery",
Kind: api.ResourceKindRecovery,
APIVersion: api.SchemeGroupVersion.String(),
},
ObjectMeta: meta,
}))
}), metav1.CreateOptions{
DryRun: opts.DryRun,
FieldManager: opts.FieldManager,
})
return out, kutil.VerbCreated, err
} else if err != nil {
return nil, kutil.VerbUnchanged, err
}
return PatchRecovery(c, cur, transform)
return PatchRecovery(ctx, c, cur, transform, opts)
}

func PatchRecovery(c cs.StashV1alpha1Interface, cur *api.Recovery, transform func(*api.Recovery) *api.Recovery) (*api.Recovery, kutil.VerbType, error) {
return PatchRecoveryObject(c, cur, transform(cur.DeepCopy()))
func PatchRecovery(ctx context.Context, c cs.StashV1alpha1Interface, cur *api.Recovery, transform func(*api.Recovery) *api.Recovery, opts metav1.PatchOptions) (*api.Recovery, kutil.VerbType, error) {
return PatchRecoveryObject(ctx, c, cur, transform(cur.DeepCopy()), opts)
}

func PatchRecoveryObject(c cs.StashV1alpha1Interface, cur, mod *api.Recovery) (*api.Recovery, kutil.VerbType, error) {
func PatchRecoveryObject(ctx context.Context, c cs.StashV1alpha1Interface, cur, mod *api.Recovery, opts metav1.PatchOptions) (*api.Recovery, kutil.VerbType, error) {
curJson, err := json.Marshal(cur)
if err != nil {
return nil, kutil.VerbUnchanged, err
Expand All @@ -73,19 +76,19 @@ func PatchRecoveryObject(c cs.StashV1alpha1Interface, cur, mod *api.Recovery) (*
return cur, kutil.VerbUnchanged, nil
}
glog.V(3).Infof("Patching Recovery %s/%s with %s.", cur.Namespace, cur.Name, string(patch))
out, err := c.Recoveries(cur.Namespace).Patch(cur.Name, types.MergePatchType, patch)
out, err := c.Recoveries(cur.Namespace).Patch(ctx, cur.Name, types.MergePatchType, patch, opts)
return out, kutil.VerbPatched, err
}

func TryUpdateRecovery(c cs.StashV1alpha1Interface, meta metav1.ObjectMeta, transform func(*api.Recovery) *api.Recovery) (result *api.Recovery, err error) {
func TryUpdateRecovery(ctx context.Context, c cs.StashV1alpha1Interface, meta metav1.ObjectMeta, transform func(*api.Recovery) *api.Recovery, opts metav1.UpdateOptions) (result *api.Recovery, err error) {
attempt := 0
err = wait.PollImmediate(kutil.RetryInterval, kutil.RetryTimeout, func() (bool, error) {
attempt++
cur, e2 := c.Recoveries(meta.Namespace).Get(meta.Name, metav1.GetOptions{})
cur, e2 := c.Recoveries(meta.Namespace).Get(ctx, meta.Name, metav1.GetOptions{})
if kerr.IsNotFound(e2) {
return false, e2
} else if e2 == nil {
result, e2 = c.Recoveries(cur.Namespace).Update(transform(cur.DeepCopy()))
result, e2 = c.Recoveries(cur.Namespace).Update(ctx, transform(cur.DeepCopy()), opts)
return e2 == nil, nil
}
glog.Errorf("Attempt %d failed to update Recovery %s/%s due to %v.", attempt, cur.Namespace, cur.Name, e2)
Expand All @@ -98,32 +101,12 @@ func TryUpdateRecovery(c cs.StashV1alpha1Interface, meta metav1.ObjectMeta, tran
return
}

func SetRecoveryStats(c cs.StashV1alpha1Interface, recovery metav1.ObjectMeta, path string, d time.Duration, phase api.RecoveryPhase) (*api.Recovery, error) {
out, err := UpdateRecoveryStatus(c, recovery, func(in *api.RecoveryStatus) *api.RecoveryStatus {
found := false
for _, stats := range in.Stats {
if stats.Path == path {
found = true
stats.Duration = d.String()
stats.Phase = phase
}
}
if !found {
in.Stats = append(in.Stats, api.RestoreStats{
Path: path,
Duration: d.String(),
Phase: phase,
})
}
return in
})
return out, err
}

func UpdateRecoveryStatus(
ctx context.Context,
c cs.StashV1alpha1Interface,
meta metav1.ObjectMeta,
transform func(*api.RecoveryStatus) *api.RecoveryStatus,
opts metav1.UpdateOptions,
) (result *api.Recovery, err error) {
apply := func(x *api.Recovery) *api.Recovery {
out := &api.Recovery{
Expand All @@ -136,16 +119,16 @@ func UpdateRecoveryStatus(
}

attempt := 0
cur, err := c.Recoveries(meta.Namespace).Get(meta.Name, metav1.GetOptions{})
cur, err := c.Recoveries(meta.Namespace).Get(ctx, meta.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
err = wait.PollImmediate(kutil.RetryInterval, kutil.RetryTimeout, func() (bool, error) {
attempt++
var e2 error
result, e2 = c.Recoveries(meta.Namespace).UpdateStatus(apply(cur))
result, e2 = c.Recoveries(meta.Namespace).UpdateStatus(ctx, apply(cur), opts)
if kerr.IsConflict(e2) {
latest, e3 := c.Recoveries(meta.Namespace).Get(meta.Name, metav1.GetOptions{})
latest, e3 := c.Recoveries(meta.Namespace).Get(ctx, meta.Name, metav1.GetOptions{})
switch {
case e3 == nil:
cur = latest
Expand Down
38 changes: 22 additions & 16 deletions client/clientset/versioned/typed/stash/v1alpha1/util/repository.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package util

import (
"context"
"fmt"

api "stash.appscode.dev/apimachinery/apis/stash/v1alpha1"
Expand All @@ -31,29 +32,32 @@ import (
kutil "kmodules.xyz/client-go"
)

func CreateOrPatchRepository(c cs.StashV1alpha1Interface, meta metav1.ObjectMeta, transform func(alert *api.Repository) *api.Repository) (*api.Repository, kutil.VerbType, error) {
cur, err := c.Repositories(meta.Namespace).Get(meta.Name, metav1.GetOptions{})
func CreateOrPatchRepository(ctx context.Context, c cs.StashV1alpha1Interface, meta metav1.ObjectMeta, transform func(in *api.Repository) *api.Repository, opts metav1.PatchOptions) (*api.Repository, kutil.VerbType, error) {
cur, err := c.Repositories(meta.Namespace).Get(ctx, meta.Name, metav1.GetOptions{})
if kerr.IsNotFound(err) {
glog.V(3).Infof("Creating Repository %s/%s.", meta.Namespace, meta.Name)
out, err := c.Repositories(meta.Namespace).Create(transform(&api.Repository{
out, err := c.Repositories(meta.Namespace).Create(ctx, transform(&api.Repository{
TypeMeta: metav1.TypeMeta{
Kind: "Repository",
Kind: api.ResourceKindRepository,
APIVersion: api.SchemeGroupVersion.String(),
},
ObjectMeta: meta,
}))
}), metav1.CreateOptions{
DryRun: opts.DryRun,
FieldManager: opts.FieldManager,
})
return out, kutil.VerbCreated, err
} else if err != nil {
return nil, kutil.VerbUnchanged, err
}
return PatchRepository(c, cur, transform)
return PatchRepository(ctx, c, cur, transform, opts)
}

func PatchRepository(c cs.StashV1alpha1Interface, cur *api.Repository, transform func(*api.Repository) *api.Repository) (*api.Repository, kutil.VerbType, error) {
return PatchRepositoryObject(c, cur, transform(cur.DeepCopy()))
func PatchRepository(ctx context.Context, c cs.StashV1alpha1Interface, cur *api.Repository, transform func(*api.Repository) *api.Repository, opts metav1.PatchOptions) (*api.Repository, kutil.VerbType, error) {
return PatchRepositoryObject(ctx, c, cur, transform(cur.DeepCopy()), opts)
}

func PatchRepositoryObject(c cs.StashV1alpha1Interface, cur, mod *api.Repository) (*api.Repository, kutil.VerbType, error) {
func PatchRepositoryObject(ctx context.Context, c cs.StashV1alpha1Interface, cur, mod *api.Repository, opts metav1.PatchOptions) (*api.Repository, kutil.VerbType, error) {
curJson, err := json.Marshal(cur)
if err != nil {
return nil, kutil.VerbUnchanged, err
Expand All @@ -72,19 +76,19 @@ func PatchRepositoryObject(c cs.StashV1alpha1Interface, cur, mod *api.Repository
return cur, kutil.VerbUnchanged, nil
}
glog.V(3).Infof("Patching Repository %s/%s with %s.", cur.Namespace, cur.Name, string(patch))
out, err := c.Repositories(cur.Namespace).Patch(cur.Name, types.MergePatchType, patch)
out, err := c.Repositories(cur.Namespace).Patch(ctx, cur.Name, types.MergePatchType, patch, opts)
return out, kutil.VerbPatched, err
}

func TryUpdateRepository(c cs.StashV1alpha1Interface, meta metav1.ObjectMeta, transform func(*api.Repository) *api.Repository) (result *api.Repository, err error) {
func TryUpdateRepository(ctx context.Context, c cs.StashV1alpha1Interface, meta metav1.ObjectMeta, transform func(*api.Repository) *api.Repository, opts metav1.UpdateOptions) (result *api.Repository, err error) {
attempt := 0
err = wait.PollImmediate(kutil.RetryInterval, kutil.RetryTimeout, func() (bool, error) {
attempt++
cur, e2 := c.Repositories(meta.Namespace).Get(meta.Name, metav1.GetOptions{})
cur, e2 := c.Repositories(meta.Namespace).Get(ctx, meta.Name, metav1.GetOptions{})
if kerr.IsNotFound(e2) {
return false, e2
} else if e2 == nil {
result, e2 = c.Repositories(cur.Namespace).Update(transform(cur.DeepCopy()))
result, e2 = c.Repositories(cur.Namespace).Update(ctx, transform(cur.DeepCopy()), opts)
return e2 == nil, nil
}
glog.Errorf("Attempt %d failed to update Repository %s/%s due to %v.", attempt, cur.Namespace, cur.Name, e2)
Expand All @@ -98,9 +102,11 @@ func TryUpdateRepository(c cs.StashV1alpha1Interface, meta metav1.ObjectMeta, tr
}

func UpdateRepositoryStatus(
ctx context.Context,
c cs.StashV1alpha1Interface,
meta metav1.ObjectMeta,
transform func(*api.RepositoryStatus) *api.RepositoryStatus,
opts metav1.UpdateOptions,
) (result *api.Repository, err error) {
apply := func(x *api.Repository) *api.Repository {
out := &api.Repository{
Expand All @@ -113,16 +119,16 @@ func UpdateRepositoryStatus(
}

attempt := 0
cur, err := c.Repositories(meta.Namespace).Get(meta.Name, metav1.GetOptions{})
cur, err := c.Repositories(meta.Namespace).Get(ctx, meta.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
err = wait.PollImmediate(kutil.RetryInterval, kutil.RetryTimeout, func() (bool, error) {
attempt++
var e2 error
result, e2 = c.Repositories(meta.Namespace).UpdateStatus(apply(cur))
result, e2 = c.Repositories(meta.Namespace).UpdateStatus(ctx, apply(cur), opts)
if kerr.IsConflict(e2) {
latest, e3 := c.Repositories(meta.Namespace).Get(meta.Name, metav1.GetOptions{})
latest, e3 := c.Repositories(meta.Namespace).Get(ctx, meta.Name, metav1.GetOptions{})
switch {
case e3 == nil:
cur = latest
Expand Down

0 comments on commit 723f4de

Please sign in to comment.