Skip to content

Commit

Permalink
address feedback, include namespaces in the count
Browse files Browse the repository at this point in the history
Signed-off-by: Pranav Gaikwad <pgaikwad@redhat.com>
  • Loading branch information
pranavgaikwad committed Feb 12, 2021
1 parent 7994c83 commit b527a8a
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 108 deletions.
126 changes: 56 additions & 70 deletions pkg/restore/restore.go
Expand Up @@ -421,6 +421,7 @@ func (ctx *restoreContext) execute() (Result, Result) {
}
}()

// i: iteration counter, totalItems: previously discovered items,
totalItems, i, existingNamespaces := 0, 0, sets.NewString()

for _, selectedResource := range selectedResourceCollection {
Expand All @@ -438,11 +439,20 @@ func (ctx *restoreContext) execute() (Result, Result) {
if namespace != "" && !existingNamespaces.Has(selectedItem.targetNamespace) {
logger := ctx.log.WithField("namespace", namespace)
ns := getNamespace(logger, archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", namespace), selectedItem.targetNamespace)
if _, err := kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil {
if _, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(ns, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil {
errs.AddVeleroError(err)
continue
} else {
// add the newly created namespace to the list of restored items
if nsCreated {
itemKey := velero.ResourceIdentifier{
GroupResource: kuberesource.Namespaces,
Namespace: ns.Namespace,
Name: ns.Name,
}
ctx.restoredItems[itemKey] = struct{}{}
}
}

// keep track of namespaces that we know exist so we don't
// have to try to create them multiple times
existingNamespaces.Insert(selectedItem.targetNamespace)
Expand All @@ -453,8 +463,13 @@ func (ctx *restoreContext) execute() (Result, Result) {
continue
}
w, e := ctx.restoreItem(obj, groupResource, selectedItem.targetNamespace)
actualTotalItems := len(ctx.restoredItems) + (totalItems - (i + 1))
i++
// totalItems keeps the count of items previously known
// there may be additional items restored by plugins
// we want to include the additional items by looking at restoredItems
// at the same time, we don't want previously known items counted twice
// as they are present in both restoredItems and totalItems
actualTotalItems := len(ctx.restoredItems) + (totalItems - i)
update <- progressUpdate{
totalItems: actualTotalItems,
itemsRestored: len(ctx.restoredItems),
Expand Down Expand Up @@ -708,54 +723,6 @@ func (ctx *restoreContext) crdAvailable(name string, crdClient client.Dynamic) (
return available, err
}

// restoreResource restores the specified cluster or namespace scoped resource. If namespace is
// empty we are restoring a cluster level resource, otherwise into the specified namespace.
func (ctx *restoreContext) restoreResource(resource, targetNamespace, originalNamespace string, items []string) (Result, Result) {
warnings, errs := Result{}, Result{}

if targetNamespace == "" && boolptr.IsSetToFalse(ctx.restore.Spec.IncludeClusterResources) {
ctx.log.Infof("Skipping resource %s because it's cluster-scoped", resource)
return warnings, errs
}

if targetNamespace == "" && !boolptr.IsSetToTrue(ctx.restore.Spec.IncludeClusterResources) && !ctx.namespaceIncludesExcludes.IncludeEverything() {
ctx.log.Infof("Skipping resource %s because it's cluster-scoped and only specific namespaces are included in the restore", resource)
return warnings, errs
}

if targetNamespace != "" {
ctx.log.Infof("Restoring resource '%s' into namespace '%s'", resource, targetNamespace)
} else {
ctx.log.Infof("Restoring cluster level resource '%s'", resource)
}

if len(items) == 0 {
return warnings, errs
}

groupResource := schema.ParseGroupResource(resource)

for _, item := range items {
itemPath := archive.GetItemFilePath(ctx.restoreDir, resource, originalNamespace, item)

obj, err := archive.Unmarshal(ctx.fileSystem, itemPath)
if err != nil {
errs.Add(targetNamespace, fmt.Errorf("error decoding %q: %v", strings.Replace(itemPath, ctx.restoreDir+"/", "", -1), err))
continue
}

if !ctx.selector.Matches(labels.Set(obj.GetLabels())) {
continue
}

w, e := ctx.restoreItem(obj, groupResource, targetNamespace)
warnings.Merge(&w)
errs.Merge(&e)
}

return warnings, errs
}

func (ctx *restoreContext) getResourceClient(groupResource schema.GroupResource, obj *unstructured.Unstructured, namespace string) (client.Dynamic, error) {
key := resourceClientKey{
resource: groupResource.WithVersion(obj.GroupVersionKind().Version),
Expand Down Expand Up @@ -828,9 +795,19 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
// which the resource is being restored into exists.
// This is the *remapped* namespace that we are ensuring exists.
nsToEnsure := getNamespace(ctx.log, archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", obj.GetNamespace()), namespace)
if _, err := kube.EnsureNamespaceExistsAndIsReady(nsToEnsure, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil {
if _, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(nsToEnsure, ctx.namespaceClient, ctx.resourceTerminatingTimeout); err != nil {
errs.AddVeleroError(err)
return warnings, errs
} else {
// add the newly created namespace to the list of restored items
if nsCreated {
itemKey := velero.ResourceIdentifier{
GroupResource: kuberesource.Namespaces,
Namespace: nsToEnsure.Namespace,
Name: nsToEnsure.Name,
}
ctx.restoredItems[itemKey] = struct{}{}
}
}
} else {
if boolptr.IsSetToFalse(ctx.restore.Spec.IncludeClusterResources) {
Expand Down Expand Up @@ -1472,20 +1449,28 @@ func isCompleted(obj *unstructured.Unstructured, groupResource schema.GroupResou
return false, nil
}

// restoreableResource represents map of individual items of
// each resource identifier grouped by their original namespaces
type restoreableResource struct {
resource string
selectedItemsByNamespace map[string][]restoreableItem
totalItems int
}

// restoreableItem represents an item by its target namespace
// contains enough information required to restore the item
type restoreableItem struct {
path string
targetNamespace string
name string
}

func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[string]*archive.ResourceItems) (restoreResourceCollection []restoreableResource, warnings Result, errs Result) {
// getOrderedResourceCollection iterates over list of ordered resource idenitifiers, applies resource include/exclude criteria,
// and Kubernetes selectors to make a list of resources to be actually restored preserving the original order
func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[string]*archive.ResourceItems) ([]restoreableResource, Result, Result) {
var warnings, errs Result
processedResources := sets.NewString()
restoreResourceCollection := make([]restoreableResource, 0)
// Iterate through an ordered list of resources to restore, checking each one to see if it should be restored.
// Note that resources *may* be in this list twice, i.e. once due to being a prioritized resource, and once due
// to being in the backup tarball. We can't de-dupe this upfront, because it's possible that items in the prioritized
Expand Down Expand Up @@ -1533,7 +1518,7 @@ func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[stri
}

// iterate through each namespace that contains instances of the resource and
// restore them
// append to the list of to-be restored resources
for namespace, items := range resourceList.ItemsByNamespace {
if namespace != "" && !ctx.namespaceIncludesExcludes.ShouldInclude(namespace) {
ctx.log.Infof("Skipping namespace %s", namespace)
Expand All @@ -1547,6 +1532,16 @@ func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[stri
targetNamespace = target
}

if targetNamespace == "" && boolptr.IsSetToFalse(ctx.restore.Spec.IncludeClusterResources) {
ctx.log.Infof("Skipping resource %s because it's cluster-scoped", resource)
continue
}

if targetNamespace == "" && !boolptr.IsSetToTrue(ctx.restore.Spec.IncludeClusterResources) && !ctx.namespaceIncludesExcludes.IncludeEverything() {
ctx.log.Infof("Skipping resource %s because it's cluster-scoped and only specific namespaces are included in the restore", resource)
continue
}

res, w, e := ctx.getSelectedRestoreableItems(groupResource.String(), targetNamespace, namespace, items)
restoreResourceCollection = append(restoreResourceCollection, res)

Expand All @@ -1557,34 +1552,25 @@ func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[stri
// record that we've restored the resource
processedResources.Insert(groupResource.String())
}
return
return restoreResourceCollection, warnings, errs
}

func (ctx *restoreContext) getSelectedRestoreableItems(resource, targetNamespace, originalNamespace string, items []string) (res restoreableResource, warnings Result, errs Result) {
// getSelectedRestoreableItems applies Kubernetes selectors on individual items of each resource type to create
// a list of items which will be actually restored
func (ctx *restoreContext) getSelectedRestoreableItems(resource, targetNamespace, originalNamespace string, items []string) (restoreableResource, Result, Result) {
var res restoreableResource
warnings, errs := Result{}, Result{}
res.resource = resource
if res.selectedItemsByNamespace == nil {
res.selectedItemsByNamespace = make(map[string][]restoreableItem)
}
if targetNamespace == "" && boolptr.IsSetToFalse(ctx.restore.Spec.IncludeClusterResources) {
ctx.log.Infof("Skipping resource %s because it's cluster-scoped", resource)
return
}

if targetNamespace == "" && !boolptr.IsSetToTrue(ctx.restore.Spec.IncludeClusterResources) && !ctx.namespaceIncludesExcludes.IncludeEverything() {
ctx.log.Infof("Skipping resource %s because it's cluster-scoped and only specific namespaces are included in the restore", resource)
return
}

if targetNamespace != "" {
ctx.log.Infof("Resource '%s' will be restored into namespace '%s'", resource, targetNamespace)
} else {
ctx.log.Infof("Resource '%s' will be restored at cluster scope", resource)
}

if len(items) == 0 {
return
}

for _, item := range items {
itemPath := archive.GetItemFilePath(ctx.restoreDir, resource, originalNamespace, item)

Expand All @@ -1606,5 +1592,5 @@ func (ctx *restoreContext) getSelectedRestoreableItems(resource, targetNamespace
res.selectedItemsByNamespace[originalNamespace] = append(res.selectedItemsByNamespace[originalNamespace], selectedItem)
res.totalItems++
}
return
return res, warnings, errs
}
24 changes: 15 additions & 9 deletions pkg/util/kube/utils.go
Expand Up @@ -40,12 +40,16 @@ func NamespaceAndName(objMeta metav1.Object) string {
return fmt.Sprintf("%s/%s", objMeta.GetNamespace(), objMeta.GetName())
}

// EnsureNamespaceExistsAndIsReady attempts to create the provided Kubernetes namespace. It returns two values:
// a bool indicating whether or not the namespace is ready, and an error if the create failed
// EnsureNamespaceExistsAndIsReady attempts to create the provided Kubernetes namespace.
// It returns three values: a bool indicating whether or not the namespace is ready,
// a bool indicating whether or not the namespace was created and an error if the creation failed
// for a reason other than that the namespace already exists. Note that in the case where the
// namespace already exists and is not ready, this function will return (false, nil).
// namespace already exists and is not ready, this function will return (false, false, nil).
// If the namespace exists and is marked for deletion, this function will wait up to the timeout for it to fully delete.
func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client corev1client.NamespaceInterface, timeout time.Duration) (bool, error) {
func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client corev1client.NamespaceInterface, timeout time.Duration) (bool, bool, error) {
// nsCreated tells whether the namespace was created by this method
// required for keeping track of number of restored items
var nsCreated bool
var ready bool
err := wait.PollImmediate(time.Second, timeout, func() (bool, error) {
clusterNS, err := client.Get(context.TODO(), namespace.Name, metav1.GetOptions{})
Expand All @@ -72,26 +76,28 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core

// err will be set if we timed out or encountered issues retrieving the namespace,
if err != nil {
return false, errors.Wrapf(err, "error getting namespace %s", namespace.Name)
return false, nsCreated, errors.Wrapf(err, "error getting namespace %s", namespace.Name)
}

// In the case the namespace already exists and isn't marked for deletion, assume it's ready for use.
if ready {
return true, nil
return true, nsCreated, nil
}

clusterNS, err := client.Create(context.TODO(), namespace, metav1.CreateOptions{})
if apierrors.IsAlreadyExists(err) {
if clusterNS != nil && (clusterNS.GetDeletionTimestamp() != nil || clusterNS.Status.Phase == corev1api.NamespaceTerminating) {
// Somehow created after all our polling and marked for deletion, return an error
return false, errors.Errorf("namespace %s created and marked for termination after timeout", namespace.Name)
return false, nsCreated, errors.Errorf("namespace %s created and marked for termination after timeout", namespace.Name)
}
} else if err != nil {
return false, errors.Wrapf(err, "error creating namespace %s", namespace.Name)
return false, nsCreated, errors.Wrapf(err, "error creating namespace %s", namespace.Name)
} else {
nsCreated = true
}

// The namespace created successfully
return true, nil
return true, nsCreated, nil
}

// GetVolumeDirectory gets the name of the directory on the host, under /var/lib/kubelet/pods/<podUID>/volumes/,
Expand Down
66 changes: 37 additions & 29 deletions pkg/util/kube/utils_test.go
Expand Up @@ -44,46 +44,53 @@ func TestNamespaceAndName(t *testing.T) {

func TestEnsureNamespaceExistsAndIsReady(t *testing.T) {
tests := []struct {
name string
expectNSFound bool
nsPhase corev1.NamespacePhase
nsDeleting bool
expectCreate bool
alreadyExists bool
expectedResult bool
name string
expectNSFound bool
nsPhase corev1.NamespacePhase
nsDeleting bool
expectCreate bool
alreadyExists bool
expectedResult bool
expectedCreatedResult bool
}{
{
name: "namespace found, not deleting",
expectNSFound: true,
expectedResult: true,
name: "namespace found, not deleting",
expectNSFound: true,
expectedResult: true,
expectedCreatedResult: false,
},
{
name: "namespace found, terminating phase",
expectNSFound: true,
nsPhase: corev1.NamespaceTerminating,
expectedResult: false,
name: "namespace found, terminating phase",
expectNSFound: true,
nsPhase: corev1.NamespaceTerminating,
expectedResult: false,
expectedCreatedResult: false,
},
{
name: "namespace found, deletiontimestamp set",
expectNSFound: true,
nsDeleting: true,
expectedResult: false,
name: "namespace found, deletiontimestamp set",
expectNSFound: true,
nsDeleting: true,
expectedResult: false,
expectedCreatedResult: false,
},
{
name: "namespace not found, successfully created",
expectCreate: true,
expectedResult: true,
name: "namespace not found, successfully created",
expectCreate: true,
expectedResult: true,
expectedCreatedResult: true,
},
{
name: "namespace not found initially, create returns already exists error, returned namespace is ready",
alreadyExists: true,
expectedResult: true,
name: "namespace not found initially, create returns already exists error, returned namespace is ready",
alreadyExists: true,
expectedResult: true,
expectedCreatedResult: false,
},
{
name: "namespace not found initially, create returns already exists error, returned namespace is terminating",
alreadyExists: true,
nsPhase: corev1.NamespaceTerminating,
expectedResult: false,
name: "namespace not found initially, create returns already exists error, returned namespace is terminating",
alreadyExists: true,
nsPhase: corev1.NamespaceTerminating,
expectedResult: false,
expectedCreatedResult: false,
},
}

Expand Down Expand Up @@ -122,9 +129,10 @@ func TestEnsureNamespaceExistsAndIsReady(t *testing.T) {
nsClient.On("Create", namespace).Return(namespace, nil)
}

result, _ := EnsureNamespaceExistsAndIsReady(namespace, nsClient, timeout)
result, nsCreated, _ := EnsureNamespaceExistsAndIsReady(namespace, nsClient, timeout)

assert.Equal(t, test.expectedResult, result)
assert.Equal(t, test.expectedCreatedResult, nsCreated)
})
}

Expand Down

0 comments on commit b527a8a

Please sign in to comment.