Skip to content

Commit

Permalink
api: WaitForUpdatesEx & DestroyPropertyFilter
Browse files Browse the repository at this point in the history
This patch introduces Ex variants for many of the property
collector types/methods to take advantage of the remote API
DestroyPropertyFilter. Now it is possible to use a single
property collector for waiting for updates and remove
previously added property filters.

BREAKING: The semantics around the helper functions in the
          property package have changed. Please review any
          code that calls this package to ensure it is
          compatible with the new behaviors.
  • Loading branch information
akutz committed Jan 10, 2024
1 parent 2ed0a9d commit 95aa257
Show file tree
Hide file tree
Showing 24 changed files with 1,035 additions and 330 deletions.
2 changes: 1 addition & 1 deletion find/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ func (f *Finder) networkByID(ctx context.Context, path string) (object.NetworkRe
}
defer v.Destroy(ctx)

filter := property.Filter{
filter := property.Match{
"config.logicalSwitchUuid": path,
"config.segmentId": path,
}
Expand Down
6 changes: 3 additions & 3 deletions govc/object/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type collect struct {
kind kinds
wait time.Duration

filter property.Filter
filter property.Match
obj string
}

Expand Down Expand Up @@ -263,7 +263,7 @@ func (cmd *collect) match(update types.ObjectUpdate) bool {
}

for _, c := range update.ChangeSet {
if cmd.filter.MatchProperty(types.DynamicProperty{Name: c.Name, Val: c.Val}) {
if cmd.filter.Property(types.DynamicProperty{Name: c.Name, Val: c.Val}) {
return true
}
}
Expand All @@ -279,7 +279,7 @@ func (cmd *collect) toFilter(f *flag.FlagSet, props []string) ([]string, error)
return props, nil
}

cmd.filter = property.Filter{props[0][1:]: props[1]}
cmd.filter = property.Match{props[0][1:]: props[1]}

return cmd.filter.Keys(), nil
}
Expand Down
4 changes: 2 additions & 2 deletions govc/object/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ Examples:
}

// rootMatch returns true if the root object path should be printed
func (cmd *find) rootMatch(ctx context.Context, root object.Reference, client *vim25.Client, filter property.Filter) bool {
func (cmd *find) rootMatch(ctx context.Context, root object.Reference, client *vim25.Client, filter property.Match) bool {
ref := root.Reference()

if !cmd.kind.wanted(ref.Type) {
Expand Down Expand Up @@ -285,7 +285,7 @@ func (cmd *find) Run(ctx context.Context, f *flag.FlagSet) error {
}
}

filter := property.Filter{}
filter := property.Match{}

if len(props)%2 != 0 {
return flag.ErrHelp
Expand Down
2 changes: 1 addition & 1 deletion object/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func ExampleCustomFieldsManager_Set() {
}

// filter used to find objects with "backup=true"
filter := property.Filter{"customValue": &types.CustomFieldStringValue{
filter := property.Match{"customValue": &types.CustomFieldStringValue{
CustomFieldValue: types.CustomFieldValue{Key: field.Key},
Value: "true",
}}
Expand Down
44 changes: 40 additions & 4 deletions object/task.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
Copyright (c) 2015 VMware, Inc. All Rights Reserved.
Copyright (c) 2015-2024 VMware, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,6 +18,7 @@ package object

import (
"context"
"fmt"

"github.com/vmware/govmomi/property"
"github.com/vmware/govmomi/task"
Expand All @@ -43,18 +44,53 @@ func NewTask(c *vim25.Client, ref types.ManagedObjectReference) *Task {
return &t
}

// Deprecated: Please use WaitEx instead.
func (t *Task) Wait(ctx context.Context) error {
_, err := t.WaitForResult(ctx, nil)
return err
}

func (t *Task) WaitForResult(ctx context.Context, s ...progress.Sinker) (*types.TaskInfo, error) {
// Deprecated: Please use WaitForResultEx instead.
func (t *Task) WaitForResult(ctx context.Context, s ...progress.Sinker) (taskInfo *types.TaskInfo, result error) {
var pr progress.Sinker
if len(s) == 1 {
pr = s[0]
}
p, err := property.DefaultCollector(t.c).Create(ctx)
if err != nil {
return nil, err
}

// Attempt to destroy the collector using the background context, as the
// specified context may have timed out or have been canceled.
defer func() {
if err := p.Destroy(context.Background()); err != nil {
if result == nil {
result = err
} else {
result = fmt.Errorf(
"destroy property collector failed with %s after failing to wait for updates: %w",
err,
result)
}
}
}()

return task.WaitEx(ctx, t.Reference(), p, pr)
}

func (t *Task) WaitEx(ctx context.Context) error {
_, err := t.WaitForResultEx(ctx, nil)
return err
}

func (t *Task) WaitForResultEx(ctx context.Context, s ...progress.Sinker) (*types.TaskInfo, error) {
var pr progress.Sinker
if len(s) == 1 {
pr = s[0]
}
p := property.DefaultCollector(t.c)
return task.Wait(ctx, t.Reference(), p, pr)
return task.WaitEx(ctx, t.Reference(), p, pr)
}

func (t *Task) Cancel(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion pbm/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestClient(t *testing.T) {
} else {
var cluster mo.ClusterComputeResource

err = v.RetrieveWithFilter(ctx, kind, []string{"datastore"}, &cluster, property.Filter{"name": clusterName})
err = v.RetrieveWithFilter(ctx, kind, []string{"datastore"}, &cluster, property.Match{"name": clusterName})
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pbm/simulator/simulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestSimulator(t *testing.T) {
} else {
var cluster mo.ClusterComputeResource

err = v.RetrieveWithFilter(ctx, kind, []string{"datastore"}, &cluster, property.Filter{"name": clusterName})
err = v.RetrieveWithFilter(ctx, kind, []string{"datastore"}, &cluster, property.Match{"name": clusterName})
if err != nil {
t.Fatal(err)
}
Expand Down
115 changes: 105 additions & 10 deletions property/collector.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2015-2023 VMware, Inc. All Rights Reserved.
Copyright (c) 2015-2024 VMware, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,8 @@ package property
import (
"context"
"errors"
"fmt"
"sync"

"github.com/vmware/govmomi/vim25"
"github.com/vmware/govmomi/vim25/methods"
Expand All @@ -27,11 +29,19 @@ import (
"github.com/vmware/govmomi/vim25/types"
)

// ErrConcurrentCollector is returned from WaitForUpdates, WaitForUpdatesEx,
// or CheckForUpdates if any of those calls are unable to obtain an exclusive
// lock for the property collector.
var ErrConcurrentCollector = fmt.Errorf(
"only one goroutine may invoke WaitForUpdates, WaitForUpdatesEx, " +
"or CheckForUpdates on a given PropertyCollector")

// Collector models the PropertyCollector managed object.
//
// For more information, see:
// http://pubs.vmware.com/vsphere-60/index.jsp?topic=%2Fcom.vmware.wssdk.apiref.doc%2Fvmodl.query.PropertyCollector.html
type Collector struct {
mu sync.Mutex
roundTripper soap.RoundTripper
reference types.ManagedObjectReference
}
Expand All @@ -46,7 +56,7 @@ func DefaultCollector(c *vim25.Client) *Collector {
return &p
}

func (p Collector) Reference() types.ManagedObjectReference {
func (p *Collector) Reference() types.ManagedObjectReference {
return p.reference
}

Expand Down Expand Up @@ -85,18 +95,28 @@ func (p *Collector) Destroy(ctx context.Context) error {
return nil
}

func (p *Collector) CreateFilter(ctx context.Context, req types.CreateFilter) error {
func (p *Collector) CreateFilter(ctx context.Context, req types.CreateFilter) (*Filter, error) {
req.This = p.Reference()

_, err := methods.CreateFilter(ctx, p.roundTripper, &req)
resp, err := methods.CreateFilter(ctx, p.roundTripper, &req)
if err != nil {
return err
return nil, err
}

return nil
return &Filter{roundTripper: p.roundTripper, reference: resp.Returnval}, nil
}

func (p *Collector) WaitForUpdates(ctx context.Context, version string, opts ...*types.WaitOptions) (*types.UpdateSet, error) {
// Deprecated: Please use WaitForUpdatesEx instead.
func (p *Collector) WaitForUpdates(
ctx context.Context,
version string,
opts ...*types.WaitOptions) (*types.UpdateSet, error) {

if !p.mu.TryLock() {
return nil, ErrConcurrentCollector
}
defer p.mu.Unlock()

req := types.WaitForUpdatesEx{
This: p.Reference(),
Version: version,
Expand Down Expand Up @@ -187,8 +207,15 @@ func (p *Collector) Retrieve(ctx context.Context, objs []types.ManagedObjectRefe
return mo.LoadObjectContent(res.Returnval, dst)
}

// RetrieveWithFilter populates dst as Retrieve does, but only for entities matching the given filter.
func (p *Collector) RetrieveWithFilter(ctx context.Context, objs []types.ManagedObjectReference, ps []string, dst interface{}, filter Filter) error {
// RetrieveWithFilter populates dst as Retrieve does, but only for entities
// that match the specified filter.
func (p *Collector) RetrieveWithFilter(
ctx context.Context,
objs []types.ManagedObjectReference,
ps []string,
dst interface{},
filter Match) error {

if len(filter) == 0 {
return p.Retrieve(ctx, objs, ps, dst)
}
Expand All @@ -200,7 +227,7 @@ func (p *Collector) RetrieveWithFilter(ctx context.Context, objs []types.Managed
return err
}

objs = filter.MatchObjectContent(content)
objs = filter.ObjectContent(content)

if len(objs) == 0 {
return nil
Expand All @@ -214,3 +241,71 @@ func (p *Collector) RetrieveOne(ctx context.Context, obj types.ManagedObjectRefe
var objs = []types.ManagedObjectReference{obj}
return p.Retrieve(ctx, objs, ps, dst)
}

// WaitForUpdatesEx waits for any of the specified properties of the specified
// managed object to change. It calls the specified function for every update it
// receives. If this function returns false, it continues waiting for
// subsequent updates. If this function returns true, it stops waiting and
// returns.
//
// If the Context is canceled, a call to CancelWaitForUpdates() is made and its
// error value is returned.
//
// By default, ObjectUpdate.MissingSet faults are not propagated to the returned
// error, set WaitFilter.PropagateMissing=true to enable MissingSet fault
// propagation.
func (p *Collector) WaitForUpdatesEx(
ctx context.Context,
opts WaitOptions,
onUpdatesFn func([]types.ObjectUpdate) bool) error {

if !p.mu.TryLock() {
return ErrConcurrentCollector
}
defer p.mu.Unlock()

req := types.WaitForUpdatesEx{
This: p.Reference(),
Options: opts.Options,
}

for {
res, err := methods.WaitForUpdatesEx(ctx, p.roundTripper, &req)
if err != nil {
if ctx.Err() == context.Canceled {
return p.CancelWaitForUpdates(context.Background())
}
return err
}

set := res.Returnval
if set == nil {
if req.Options != nil && req.Options.MaxWaitSeconds != nil {
return nil // WaitOptions.MaxWaitSeconds exceeded
}
// Retry if the result came back empty
continue
}

req.Version = set.Version
opts.Truncated = false
if set.Truncated != nil {
opts.Truncated = *set.Truncated
}

for _, fs := range set.FilterSet {
if opts.PropagateMissing {
for i := range fs.ObjectSet {
for _, p := range fs.ObjectSet[i].MissingSet {
// Same behavior as mo.ObjectContentToType()
return soap.WrapVimFault(p.Fault.Fault)
}
}
}

if onUpdatesFn(fs.ObjectSet) {
return nil
}
}
}
}

0 comments on commit 95aa257

Please sign in to comment.