diff --git a/Gopkg.lock b/Gopkg.lock index 2d9883f04bed8..76a36e7cb2a8a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1064,7 +1064,7 @@ version = "v1.0.0" [[projects]] - digest = "1:f9fe29bf856d49f9a51d6001588cb5ee5d65c8a7ff5e8b0dd5423c3a510f0833" + digest = "1:6af52ce6dae9a912aa3113f247a63cd82599760ddc328a6721c3ef0426d31ca2" name = "github.com/vmware/govmomi" packages = [ ".", @@ -1090,8 +1090,8 @@ "vim25/xml", ] pruneopts = "" - revision = "e3a01f9611c32b2362366434bcd671516e78955d" - version = "v0.18.0" + revision = "3617f28d167d448f93f282a867870f109516d2a5" + version = "v0.19.0" [[projects]] digest = "1:c1855527c165f0224708fbc7d76843b4b20bcb74b328f212f8d0e9c855d4c49d" diff --git a/Gopkg.toml b/Gopkg.toml index 3e430b4c318a9..b875ec2081f56 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -228,7 +228,7 @@ [[constraint]] name = "github.com/vmware/govmomi" - version = "0.18.0" + version = "0.19.0" [[constraint]] name = "github.com/Azure/go-autorest" diff --git a/plugins/inputs/vsphere/README.md b/plugins/inputs/vsphere/README.md index 7ba323bc73e9e..4bccbb2c880e8 100644 --- a/plugins/inputs/vsphere/README.md +++ b/plugins/inputs/vsphere/README.md @@ -122,17 +122,17 @@ vm_metric_exclude = [ "*" ] ## Clusters # cluster_metric_include = [] ## if omitted or empty, all metrics are collected # cluster_metric_exclude = [] ## Nothing excluded by default - # cluster_instances = true ## true by default + # cluster_instances = false ## false by default ## Datastores # datastore_metric_include = [] ## if omitted or empty, all metrics are collected # datastore_metric_exclude = [] ## Nothing excluded by default - # datastore_instances = false ## false by default for Datastores only + # datastore_instances = false ## false by default ## Datacenters datacenter_metric_include = [] ## if omitted or empty, all metrics are collected datacenter_metric_exclude = [ "*" ] ## Datacenters are not collected by default. - # datacenter_instances = false ## false by default for Datastores only + # datacenter_instances = false ## false by default ## Plugin Settings ## separator character to use for measurement and field names (default: "_") diff --git a/plugins/inputs/vsphere/client.go b/plugins/inputs/vsphere/client.go index ebad2bea79d30..8b1c4866ac9df 100644 --- a/plugins/inputs/vsphere/client.go +++ b/plugins/inputs/vsphere/client.go @@ -3,6 +3,7 @@ package vsphere import ( "context" "crypto/tls" + "fmt" "log" "net/url" "strconv" @@ -18,6 +19,7 @@ import ( "github.com/vmware/govmomi/vim25" "github.com/vmware/govmomi/vim25/methods" "github.com/vmware/govmomi/vim25/soap" + "github.com/vmware/govmomi/vim25/types" ) // The highest number of metrics we can query for, no matter what settings @@ -76,7 +78,7 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) { ctx2, cancel2 := context.WithTimeout(ctx, cf.parent.Timeout.Duration) defer cancel2() if cf.client.Client.SessionManager.Login(ctx2, url.UserPassword(cf.parent.Username, cf.parent.Password)) != nil { - return nil, err + return nil, fmt.Errorf("Renewing authentication failed: %v", err) } } @@ -205,6 +207,8 @@ func (c *Client) close() { // GetServerTime returns the time at the vCenter server func (c *Client) GetServerTime(ctx context.Context) (time.Time, error) { + ctx, cancel := context.WithTimeout(ctx, c.Timeout) + defer cancel() t, err := methods.GetCurrentTime(ctx, c.Client) if err != nil { return time.Time{}, err @@ -235,7 +239,7 @@ func (c *Client) GetMaxQueryMetrics(ctx context.Context) (int, error) { // Fall through version-based inference if value isn't usable } } else { - log.Println("I! [input.vsphere] Option query for maxQueryMetrics failed. Using default") + log.Println("D! [input.vsphere] Option query for maxQueryMetrics failed. Using default") } // No usable maxQueryMetrics setting. Infer based on version @@ -255,3 +259,38 @@ func (c *Client) GetMaxQueryMetrics(ctx context.Context) (int, error) { } return 256, nil } + +// QueryMetrics wraps performance.Query to give it proper timeouts +func (c *Client) QueryMetrics(ctx context.Context, pqs []types.PerfQuerySpec) ([]performance.EntityMetric, error) { + ctx1, cancel1 := context.WithTimeout(ctx, c.Timeout) + defer cancel1() + metrics, err := c.Perf.Query(ctx1, pqs) + if err != nil { + return nil, err + } + + ctx2, cancel2 := context.WithTimeout(ctx, c.Timeout) + defer cancel2() + return c.Perf.ToMetricSeries(ctx2, metrics) +} + +// CounterInfoByName wraps performance.CounterInfoByName to give it proper timeouts +func (c *Client) CounterInfoByName(ctx context.Context) (map[string]*types.PerfCounterInfo, error) { + ctx1, cancel1 := context.WithTimeout(ctx, c.Timeout) + defer cancel1() + return c.Perf.CounterInfoByName(ctx1) +} + +// CounterInfoByKey wraps performance.CounterInfoByKey to give it proper timeouts +func (c *Client) CounterInfoByKey(ctx context.Context) (map[int32]*types.PerfCounterInfo, error) { + ctx1, cancel1 := context.WithTimeout(ctx, c.Timeout) + defer cancel1() + return c.Perf.CounterInfoByKey(ctx1) +} + +// ListResources wraps property.Collector.Retrieve to give it proper timeouts +func (c *Client) ListResources(ctx context.Context, root *view.ContainerView, kind []string, ps []string, dst interface{}) error { + ctx1, cancel1 := context.WithTimeout(ctx, c.Timeout) + defer cancel1() + return root.Retrieve(ctx1, kind, ps, dst) +} diff --git a/plugins/inputs/vsphere/endpoint.go b/plugins/inputs/vsphere/endpoint.go index dbc67dd959366..27aca331c9e02 100644 --- a/plugins/inputs/vsphere/endpoint.go +++ b/plugins/inputs/vsphere/endpoint.go @@ -2,8 +2,10 @@ package vsphere import ( "context" + "errors" "fmt" "log" + "math/rand" "net/url" "regexp" "strconv" @@ -24,15 +26,19 @@ import ( var isolateLUN = regexp.MustCompile(".*/([^/]+)/?$") -const metricLookback = 3 +const metricLookback = 3 // Number of time periods to look back at for non-realtime metrics + +const rtMetricLookback = 3 // Number of time periods to look back at for realtime metrics + +const maxSampleConst = 10 // Absolute maximim number of samples regardless of period + +const maxMetadataSamples = 100 // Number of resources to sample for metric metadata // Endpoint is a high-level representation of a connected vCenter endpoint. It is backed by the lower // level Client type. type Endpoint struct { Parent *VSphere URL *url.URL - lastColls map[string]time.Time - instanceInfo map[string]resourceInfo resourceKinds map[string]resourceKind hwMarks *TSCache lun2ds map[string]string @@ -52,8 +58,14 @@ type resourceKind struct { sampling int32 objects objectMap filters filter.Filter + include []string + simple bool + metrics performance.MetricList collectInstances bool - getObjects func(context.Context, *Endpoint, *view.ContainerView) (objectMap, error) + parent string + getObjects func(context.Context, *Client, *Endpoint, *view.ContainerView) (objectMap, error) + latestSample time.Time + lastColl time.Time } type metricEntry struct { @@ -74,33 +86,22 @@ type objectRef struct { dcname string } -type resourceInfo struct { - name string - metrics performance.MetricList - parentRef *types.ManagedObjectReference -} - -type metricQRequest struct { - res *resourceKind - obj objectRef -} - -type metricQResponse struct { - obj objectRef - metrics *performance.MetricList +func (e *Endpoint) getParent(obj *objectRef, res *resourceKind) (*objectRef, bool) { + if pKind, ok := e.resourceKinds[res.parent]; ok { + if p, ok := pKind.objects[obj.parentRef.Value]; ok { + return &p, true + } + } + return nil, false } -type multiError []error - // NewEndpoint returns a new connection to a vCenter based on the URL and configuration passed // as parameters. func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, error) { e := Endpoint{ URL: url, Parent: parent, - lastColls: make(map[string]time.Time), hwMarks: NewTSCache(1 * time.Hour), - instanceInfo: make(map[string]resourceInfo), lun2ds: make(map[string]string), initialized: false, clientFactory: NewClientFactory(ctx, url, parent), @@ -116,8 +117,11 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, sampling: 300, objects: make(objectMap), filters: newFilterOrPanic(parent.DatacenterMetricInclude, parent.DatacenterMetricExclude), + simple: isSimple(parent.DatacenterMetricInclude, parent.DatacenterMetricExclude), + include: parent.DatacenterMetricInclude, collectInstances: parent.DatacenterInstances, getObjects: getDatacenters, + parent: "", }, "cluster": { name: "cluster", @@ -128,8 +132,11 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, sampling: 300, objects: make(objectMap), filters: newFilterOrPanic(parent.ClusterMetricInclude, parent.ClusterMetricExclude), + simple: isSimple(parent.ClusterMetricInclude, parent.ClusterMetricExclude), + include: parent.ClusterMetricInclude, collectInstances: parent.ClusterInstances, getObjects: getClusters, + parent: "datacenter", }, "host": { name: "host", @@ -140,8 +147,11 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, sampling: 20, objects: make(objectMap), filters: newFilterOrPanic(parent.HostMetricInclude, parent.HostMetricExclude), + simple: isSimple(parent.HostMetricInclude, parent.HostMetricExclude), + include: parent.HostMetricInclude, collectInstances: parent.HostInstances, getObjects: getHosts, + parent: "cluster", }, "vm": { name: "vm", @@ -152,8 +162,11 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, sampling: 20, objects: make(objectMap), filters: newFilterOrPanic(parent.VMMetricInclude, parent.VMMetricExclude), + simple: isSimple(parent.VMMetricInclude, parent.VMMetricExclude), + include: parent.VMMetricInclude, collectInstances: parent.VMInstances, getObjects: getVMs, + parent: "host", }, "datastore": { name: "datastore", @@ -163,8 +176,11 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, sampling: 300, objects: make(objectMap), filters: newFilterOrPanic(parent.DatastoreMetricInclude, parent.DatastoreMetricExclude), + simple: isSimple(parent.DatastoreMetricInclude, parent.DatastoreMetricExclude), + include: parent.DatastoreMetricInclude, collectInstances: parent.DatastoreInstances, getObjects: getDatastores, + parent: "", }, } @@ -174,24 +190,6 @@ func NewEndpoint(ctx context.Context, parent *VSphere, url *url.URL) (*Endpoint, return &e, err } -func (m multiError) Error() string { - switch len(m) { - case 0: - return "No error recorded. Something is wrong!" - case 1: - return m[0].Error() - default: - s := "Multiple errors detected concurrently: " - for i, e := range m { - if i != 0 { - s += ", " - } - s += e.Error() - } - return s - } -} - func anythingEnabled(ex []string) bool { for _, s := range ex { if s == "*" { @@ -209,6 +207,18 @@ func newFilterOrPanic(include []string, exclude []string) filter.Filter { return f } +func isSimple(include []string, exclude []string) bool { + if len(exclude) > 0 || len(include) == 0 { + return false + } + for _, s := range include { + if strings.Contains(s, "*") { + return false + } + } + return true +} + func (e *Endpoint) startDiscovery(ctx context.Context) { e.discoveryTicker = time.NewTicker(e.Parent.ObjectDiscoveryInterval.Duration) go func() { @@ -249,7 +259,9 @@ func (e *Endpoint) init(ctx context.Context) error { } else { // Otherwise, just run it in the background. We'll probably have an incomplete first metric // collection this way. - go e.initalDiscovery(ctx) + go func() { + e.initalDiscovery(ctx) + }() } } e.initialized = true @@ -262,10 +274,7 @@ func (e *Endpoint) getMetricNameMap(ctx context.Context) (map[int32]string, erro return nil, err } - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - mn, err := client.Perf.CounterInfoByName(ctx1) - + mn, err := client.CounterInfoByName(ctx) if err != nil { return nil, err } @@ -276,20 +285,19 @@ func (e *Endpoint) getMetricNameMap(ctx context.Context) (map[int32]string, erro return names, nil } -func (e *Endpoint) getMetadata(ctx context.Context, in interface{}) interface{} { +func (e *Endpoint) getMetadata(ctx context.Context, obj objectRef, sampling int32) (performance.MetricList, error) { client, err := e.clientFactory.GetClient(ctx) if err != nil { - return err + return nil, err } - rq := in.(*metricQRequest) ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) defer cancel1() - metrics, err := client.Perf.AvailableMetric(ctx1, rq.obj.ref.Reference(), rq.res.sampling) - if err != nil && err != context.Canceled { - log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) + metrics, err := client.Perf.AvailableMetric(ctx1, obj.ref.Reference(), sampling) + if err != nil { + return nil, err } - return &metricQResponse{metrics: &metrics, obj: rq.obj} + return metrics, nil } func (e *Endpoint) getDatacenterName(ctx context.Context, client *Client, cache map[string]string, r types.ManagedObjectReference) string { @@ -349,17 +357,17 @@ func (e *Endpoint) discover(ctx context.Context) error { } log.Printf("D! [input.vsphere]: Discover new objects for %s", e.URL.Host) - - instInfo := make(map[string]resourceInfo) resourceKinds := make(map[string]resourceKind) dcNameCache := make(map[string]string) + numRes := int64(0) + // Populate resource objects, and endpoint instance info. for k, res := range e.resourceKinds { log.Printf("D! [input.vsphere] Discovering resources for %s", res.name) // Need to do this for all resource types even if they are not enabled if res.enabled || k != "vm" { - objects, err := res.getObjects(ctx, e, client.Root) + objects, err := res.getObjects(ctx, client, e, client.Root) if err != nil { return err } @@ -374,42 +382,19 @@ func (e *Endpoint) discover(ctx context.Context) error { } } - // Set up a worker pool for processing metadata queries concurrently - wp := NewWorkerPool(10) - wp.Run(ctx, e.getMetadata, e.Parent.DiscoverConcurrency) - - // Fill the input channels with resources that need to be queried - // for metadata. - wp.Fill(ctx, func(ctx context.Context, f PushFunc) { - for _, obj := range objects { - f(ctx, &metricQRequest{obj: obj, res: &res}) - } - }) - - // Drain the resulting metadata and build instance infos. - wp.Drain(ctx, func(ctx context.Context, in interface{}) bool { - switch resp := in.(type) { - case *metricQResponse: - mList := make(performance.MetricList, 0) - if res.enabled { - for _, m := range *resp.metrics { - if m.Instance != "" && !res.collectInstances { - continue - } - if res.filters.Match(metricNames[m.CounterId]) { - mList = append(mList, m) - } - } - } - instInfo[resp.obj.ref.Value] = resourceInfo{name: resp.obj.name, metrics: mList, parentRef: resp.obj.parentRef} - case error: - log.Printf("W! [input.vsphere]: Error while discovering resources: %s", resp) - return false + // No need to collect metric metadata if resource type is not enabled + if res.enabled { + if res.simple { + e.simpleMetadataSelect(ctx, client, &res) + } else { + e.complexMetadataSelect(ctx, &res, objects, metricNames) } - return true - }) + } res.objects = objects resourceKinds[k] = res + + SendInternalCounterWithTags("discovered_objects", e.URL.Host, map[string]string{"type": res.name}, int64(len(objects))) + numRes += int64(len(objects)) } } @@ -428,20 +413,100 @@ func (e *Endpoint) discover(ctx context.Context) error { e.collectMux.Lock() defer e.collectMux.Unlock() - e.instanceInfo = instInfo e.resourceKinds = resourceKinds e.lun2ds = l2d sw.Stop() - SendInternalCounter("discovered_objects", e.URL.Host, int64(len(instInfo))) + SendInternalCounterWithTags("discovered_objects", e.URL.Host, map[string]string{"type": "instance-total"}, numRes) return nil } -func getDatacenters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { +func (e *Endpoint) simpleMetadataSelect(ctx context.Context, client *Client, res *resourceKind) { + log.Printf("D! [input.vsphere] Using fast metric metadata selection for %s", res.name) + m, err := client.CounterInfoByName(ctx) + if err != nil { + log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) + return + } + res.metrics = make(performance.MetricList, 0, len(res.include)) + for _, s := range res.include { + if pci, ok := m[s]; ok { + cnt := types.PerfMetricId{ + CounterId: pci.Key, + } + if res.collectInstances { + cnt.Instance = "*" + } else { + cnt.Instance = "" + } + res.metrics = append(res.metrics, cnt) + } else { + log.Printf("W! [input.vsphere] Metric name %s is unknown. Will not be collected", s) + } + } +} + +func (e *Endpoint) complexMetadataSelect(ctx context.Context, res *resourceKind, objects objectMap, metricNames map[int32]string) { + // We're only going to get metadata from maxMetadataSamples resources. If we have + // more resources than that, we pick maxMetadataSamples samples at random. + sampledObjects := make([]objectRef, len(objects)) + i := 0 + for _, obj := range objects { + sampledObjects[i] = obj + i++ + } + n := len(sampledObjects) + if n > maxMetadataSamples { + // Shuffle samples into the maxMetadatSamples positions + for i := 0; i < maxMetadataSamples; i++ { + j := int(rand.Int31n(int32(i + 1))) + t := sampledObjects[i] + sampledObjects[i] = sampledObjects[j] + sampledObjects[j] = t + } + sampledObjects = sampledObjects[0:maxMetadataSamples] + } + + instInfoMux := sync.Mutex{} + te := NewThrottledExecutor(e.Parent.DiscoverConcurrency) + for _, obj := range sampledObjects { + func(obj objectRef) { + te.Run(ctx, func() { + metrics, err := e.getMetadata(ctx, obj, res.sampling) + if err != nil { + log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err) + } + mMap := make(map[string]types.PerfMetricId) + for _, m := range metrics { + if m.Instance != "" && res.collectInstances { + m.Instance = "*" + } else { + m.Instance = "" + } + if res.filters.Match(metricNames[m.CounterId]) { + mMap[strconv.Itoa(int(m.CounterId))+"|"+m.Instance] = m + } + } + log.Printf("D! [input.vsphere] Found %d metrics for %s", len(mMap), obj.name) + instInfoMux.Lock() + defer instInfoMux.Unlock() + if len(mMap) > len(res.metrics) { + res.metrics = make(performance.MetricList, len(mMap)) + i := 0 + for _, m := range mMap { + res.metrics[i] = m + i++ + } + } + }) + }(obj) + } + te.Wait() +} + +func getDatacenters(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.Datacenter - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - err := root.Retrieve(ctx1, []string{"Datacenter"}, []string{"name", "parent"}, &resources) + err := client.ListResources(ctx, root, []string{"Datacenter"}, []string{"name", "parent"}, &resources) if err != nil { return nil, err } @@ -453,11 +518,9 @@ func getDatacenters(ctx context.Context, e *Endpoint, root *view.ContainerView) return m, nil } -func getClusters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { +func getClusters(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.ClusterComputeResource - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - err := root.Retrieve(ctx1, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources) + err := client.ListResources(ctx, root, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources) if err != nil { return nil, err } @@ -487,9 +550,9 @@ func getClusters(ctx context.Context, e *Endpoint, root *view.ContainerView) (ob return m, nil } -func getHosts(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { +func getHosts(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.HostSystem - err := root.Retrieve(ctx, []string{"HostSystem"}, []string{"name", "parent"}, &resources) + err := client.ListResources(ctx, root, []string{"HostSystem"}, []string{"name", "parent"}, &resources) if err != nil { return nil, err } @@ -501,16 +564,17 @@ func getHosts(ctx context.Context, e *Endpoint, root *view.ContainerView) (objec return m, nil } -func getVMs(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { +func getVMs(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.VirtualMachine - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - err := root.Retrieve(ctx1, []string{"VirtualMachine"}, []string{"name", "runtime.host", "config.guestId", "config.uuid"}, &resources) + err := client.ListResources(ctx, root, []string{"VirtualMachine"}, []string{"name", "runtime.host", "runtime.powerState", "config.guestId", "config.uuid"}, &resources) if err != nil { return nil, err } m := make(objectMap) for _, r := range resources { + if r.Runtime.PowerState != "poweredOn" { + continue + } guest := "unknown" uuid := "" // Sometimes Config is unknown and returns a nil pointer @@ -525,11 +589,9 @@ func getVMs(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectM return m, nil } -func getDatastores(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) { +func getDatastores(ctx context.Context, client *Client, e *Endpoint, root *view.ContainerView) (objectMap, error) { var resources []mo.Datastore - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - err := root.Retrieve(ctx1, []string{"Datastore"}, []string{"name", "parent", "info"}, &resources) + err := client.ListResources(ctx, root, []string{"Datastore"}, []string{"name", "parent", "info"}, &resources) if err != nil { return nil, err } @@ -555,10 +617,10 @@ func (e *Endpoint) Close() { // Collect runs a round of data collections as specified in the configuration. func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error { + // If we never managed to do a discovery, collection will be a no-op. Therefore, // we need to check that a connection is available, or the collection will // silently fail. - // if _, err := e.clientFactory.GetClient(ctx); err != nil { return err } @@ -571,28 +633,41 @@ func (e *Endpoint) Collect(ctx context.Context, acc telegraf.Accumulator) error } // If discovery interval is disabled (0), discover on each collection cycle - // if e.Parent.ObjectDiscoveryInterval.Duration == 0 { err := e.discover(ctx) if err != nil { return err } } + var wg sync.WaitGroup for k, res := range e.resourceKinds { if res.enabled { - err := e.collectResource(ctx, k, acc) - if err != nil { - return err - } + wg.Add(1) + go func(k string) { + defer wg.Done() + err := e.collectResource(ctx, k, acc) + if err != nil { + acc.AddError(err) + } + }(k) } } + wg.Wait() // Purge old timestamps from the cache e.hwMarks.Purge() return nil } -func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, now time.Time, latest time.Time) { +// Workaround to make sure pqs is a copy of the loop variable and won't change. +func submitChunkJob(ctx context.Context, te *ThrottledExecutor, job func([]types.PerfQuerySpec), pqs []types.PerfQuerySpec) { + te.Run(ctx, func() { + job(pqs) + }) +} + +func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Time, latest time.Time, acc telegraf.Accumulator, job func([]types.PerfQuerySpec)) { + te := NewThrottledExecutor(e.Parent.CollectConcurrency) maxMetrics := e.Parent.MaxQueryMetrics if maxMetrics < 1 { maxMetrics = 1 @@ -609,38 +684,30 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n total := 0 nRes := 0 for _, object := range res.objects { - info, found := e.instanceInfo[object.ref.Value] - if !found { - log.Printf("E! [input.vsphere]: Internal error: Instance info not found for MOID %s", object.ref) - } - mr := len(info.metrics) + mr := len(res.metrics) for mr > 0 { mc := mr headroom := maxMetrics - metrics if !res.realTime && mc > headroom { // Metric query limit only applies to non-realtime metrics mc = headroom } - fm := len(info.metrics) - mr + fm := len(res.metrics) - mr pq := types.PerfQuerySpec{ Entity: object.ref, - MaxSample: 1, - MetricId: info.metrics[fm : fm+mc], + MaxSample: maxSampleConst, + MetricId: res.metrics[fm : fm+mc], IntervalId: res.sampling, Format: "normal", } - // For non-realtime metrics, we need to look back a few samples in case - // the vCenter is late reporting metrics. - if !res.realTime { - pq.MaxSample = metricLookback + start, ok := e.hwMarks.Get(object.ref.Value) + if !ok { + // Look back 3 sampling periods by default + start = latest.Add(time.Duration(-res.sampling) * time.Second * (metricLookback - 1)) } + pq.StartTime = &start + pq.EndTime = &now - // Look back 3 sampling periods - start := latest.Add(time.Duration(-res.sampling) * time.Second * (metricLookback - 1)) - if !res.realTime { - pq.StartTime = &start - pq.EndTime = &now - } pqs = append(pqs, pq) mr -= mc metrics += mc @@ -648,17 +715,18 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n // We need to dump the current chunk of metrics for one of two reasons: // 1) We filled up the metric quota while processing the current resource // 2) We are at the last resource and have no more data to process. - if mr > 0 || (!res.realTime && metrics >= maxMetrics) || nRes >= e.Parent.MaxQueryObjects { + // 3) The query contains more than 100,000 individual metrics + if mr > 0 || nRes >= e.Parent.MaxQueryObjects || len(pqs) > 100000 { log.Printf("D! [input.vsphere]: Queueing query: %d objects, %d metrics (%d remaining) of type %s for %s. Processed objects: %d. Total objects %d", len(pqs), metrics, mr, res.name, e.URL.Host, total+1, len(res.objects)) - // To prevent deadlocks, don't send work items if the context has been cancelled. + // Don't send work items if the context has been cancelled. if ctx.Err() == context.Canceled { return } - // Call push function - f(ctx, pqs) + // Run collection job + submitChunkJob(ctx, te, job, pqs) pqs = make([]types.PerfQuerySpec, 0, e.Parent.MaxQueryObjects) metrics = 0 nRes = 0 @@ -667,19 +735,19 @@ func (e *Endpoint) chunker(ctx context.Context, f PushFunc, res *resourceKind, n total++ nRes++ } - // There may be dangling stuff in the queue. Handle them - // + // Handle final partially filled chunk if len(pqs) > 0 { - // Call push function + // Run collection job log.Printf("D! [input.vsphere]: Queuing query: %d objects, %d metrics (0 remaining) of type %s for %s. Total objects %d (final chunk)", len(pqs), metrics, res.name, e.URL.Host, len(res.objects)) - f(ctx, pqs) + submitChunkJob(ctx, te, job, pqs) } + + // Wait for background collection to finish + te.Wait() } func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc telegraf.Accumulator) error { - - // Do we have new data yet? res := e.resourceKinds[resourceType] client, err := e.clientFactory.GetClient(ctx) if err != nil { @@ -689,13 +757,23 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc if err != nil { return err } - latest, hasLatest := e.lastColls[resourceType] - if hasLatest { + + // Estimate the interval at which we're invoked. Use local time (not server time) + // since this is about how we got invoked locally. + localNow := time.Now() + estInterval := time.Duration(time.Minute) + if !res.lastColl.IsZero() { + estInterval = localNow.Sub(res.lastColl).Truncate(time.Duration(res.sampling) * time.Second) + } + log.Printf("D! [inputs.vsphere] Interval estimated to %s", estInterval) + + latest := res.latestSample + if !latest.IsZero() { elapsed := now.Sub(latest).Seconds() + 5.0 // Allow 5 second jitter. - log.Printf("D! [input.vsphere]: Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType) + log.Printf("D! [inputs.vsphere]: Latest: %s, elapsed: %f, resource: %s", latest, elapsed, resourceType) if !res.realTime && elapsed < float64(res.sampling) { - // No new data would be available. We're outta herE! [input.vsphere]: - log.Printf("D! [input.vsphere]: Sampling period for %s of %d has not elapsed on %s", + // No new data would be available. We're outta here! + log.Printf("D! [inputs.vsphere]: Sampling period for %s of %d has not elapsed on %s", resourceType, res.sampling, e.URL.Host) return nil } @@ -706,91 +784,108 @@ func (e *Endpoint) collectResource(ctx context.Context, resourceType string, acc internalTags := map[string]string{"resourcetype": resourceType} sw := NewStopwatchWithTags("gather_duration", e.URL.Host, internalTags) - log.Printf("D! [input.vsphere]: Collecting metrics for %d objects of type %s for %s", + log.Printf("D! [inputs.vsphere]: Collecting metrics for %d objects of type %s for %s", len(res.objects), resourceType, e.URL.Host) count := int64(0) - // Set up a worker pool for collecting chunk metrics - wp := NewWorkerPool(10) - wp.Run(ctx, func(ctx context.Context, in interface{}) interface{} { - chunk := in.([]types.PerfQuerySpec) - n, err := e.collectChunk(ctx, chunk, resourceType, res, acc) - log.Printf("D! [input.vsphere] CollectChunk for %s returned %d metrics", resourceType, n) - if err != nil { - return err - } - atomic.AddInt64(&count, int64(n)) - return nil - - }, e.Parent.CollectConcurrency) + var tsMux sync.Mutex + latestSample := time.Time{} - // Fill the input channel of the worker queue by running the chunking - // logic implemented in chunker() - wp.Fill(ctx, func(ctx context.Context, f PushFunc) { - e.chunker(ctx, f, &res, now, latest) - }) - - // Drain the pool. We're getting errors back. They should all be nil - var mux sync.Mutex - merr := make(multiError, 0) - wp.Drain(ctx, func(ctx context.Context, in interface{}) bool { - if in != nil { - mux.Lock() - defer mux.Unlock() - merr = append(merr, in.(error)) - return false - } - return true - }) - e.lastColls[resourceType] = now // Use value captured at the beginning to avoid blind spots. + // Divide workload into chunks and process them concurrently + e.chunkify(ctx, &res, now, latest, acc, + func(chunk []types.PerfQuerySpec) { + n, localLatest, err := e.collectChunk(ctx, chunk, &res, acc, now, estInterval) + log.Printf("D! [inputs.vsphere] CollectChunk for %s returned %d metrics", resourceType, n) + if err != nil { + acc.AddError(errors.New("While collecting " + res.name + ": " + err.Error())) + } + atomic.AddInt64(&count, int64(n)) + tsMux.Lock() + defer tsMux.Unlock() + if localLatest.After(latestSample) && !localLatest.IsZero() { + latestSample = localLatest + } + }) + log.Printf("D! [inputs.vsphere] Latest sample for %s set to %s", resourceType, latestSample) + if !latestSample.IsZero() { + res.latestSample = latestSample + } sw.Stop() SendInternalCounterWithTags("gather_count", e.URL.Host, internalTags, count) - if len(merr) > 0 { - return merr - } return nil } -func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, resourceType string, - res resourceKind, acc telegraf.Accumulator) (int, error) { +func alignSamples(info []types.PerfSampleInfo, values []int64, interval time.Duration) ([]types.PerfSampleInfo, []float64) { + rInfo := make([]types.PerfSampleInfo, 0, len(info)) + rValues := make([]float64, 0, len(values)) + bi := 1.0 + var lastBucket time.Time + for idx := range info { + // According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted + // data coming back with missing values. Take care of that gracefully! + if idx >= len(values) { + log.Printf("D! [inputs.vsphere] len(SampleInfo)>len(Value) %d > %d", len(info), len(values)) + break + } + v := float64(values[idx]) + if v < 0 { + continue + } + ts := info[idx].Timestamp + roundedTs := ts.Truncate(interval) + + // Are we still working on the same bucket? + if roundedTs == lastBucket { + bi++ + p := len(rValues) - 1 + rValues[p] = ((bi-1)/bi)*float64(rValues[p]) + v/bi + } else { + rValues = append(rValues, v) + roundedInfo := types.PerfSampleInfo{ + Timestamp: roundedTs, + Interval: info[idx].Interval, + } + rInfo = append(rInfo, roundedInfo) + bi = 1.0 + lastBucket = roundedTs + } + } + //log.Printf("D! [inputs.vsphere] Aligned samples: %d collapsed into %d", len(info), len(rInfo)) + return rInfo, rValues +} + +func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, res *resourceKind, acc telegraf.Accumulator, now time.Time, interval time.Duration) (int, time.Time, error) { + log.Printf("D! [inputs.vsphere] Query for %s has %d QuerySpecs", res.name, len(pqs)) + latestSample := time.Time{} count := 0 + resourceType := res.name prefix := "vsphere" + e.Parent.Separator + resourceType client, err := e.clientFactory.GetClient(ctx) if err != nil { - return 0, err + return count, latestSample, err } - ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel1() - metricInfo, err := client.Perf.CounterInfoByName(ctx1) + metricInfo, err := client.CounterInfoByName(ctx) if err != nil { - return count, err + return count, latestSample, err } - ctx2, cancel2 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel2() - metrics, err := client.Perf.Query(ctx2, pqs) + ems, err := client.QueryMetrics(ctx, pqs) if err != nil { - return count, err + return count, latestSample, err } - ctx3, cancel3 := context.WithTimeout(ctx, e.Parent.Timeout.Duration) - defer cancel3() - ems, err := client.Perf.ToMetricSeries(ctx3, metrics) - if err != nil { - return count, err - } - log.Printf("D! [input.vsphere] Query for %s returned metrics for %d objects", resourceType, len(ems)) + log.Printf("D! [inputs.vsphere] Query for %s returned metrics for %d objects", resourceType, len(ems)) // Iterate through results for _, em := range ems { moid := em.Entity.Reference().Value - instInfo, found := e.instanceInfo[moid] + instInfo, found := res.objects[moid] if !found { - log.Printf("E! [input.vsphere]: MOID %s not found in cache. Skipping! (This should not happen!)", moid) + log.Printf("E! [inputs.vsphere]: MOID %s not found in cache. Skipping! (This should not happen!)", moid) continue } buckets := make(map[string]metricEntry) @@ -805,26 +900,28 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, // Populate tags objectRef, ok := res.objects[moid] if !ok { - log.Printf("E! [input.vsphere]: MOID %s not found in cache. Skipping", moid) + log.Printf("E! [inputs.vsphere]: MOID %s not found in cache. Skipping", moid) continue } - e.populateTags(&objectRef, resourceType, &res, t, &v) + e.populateTags(&objectRef, resourceType, res, t, &v) - // Now deal with the values. Iterate backwards so we start with the latest value - tsKey := moid + "|" + name + "|" + v.Instance - for idx := len(v.Value) - 1; idx >= 0; idx-- { - ts := em.SampleInfo[idx].Timestamp + nValues := 0 + alignedInfo, alignedValues := alignSamples(em.SampleInfo, v.Value, interval) // TODO: Estimate interval - // Since non-realtime metrics are queries with a lookback, we need to check the high-water mark - // to determine if this should be included. Only samples not seen before should be included. - if !(res.realTime || e.hwMarks.IsNew(tsKey, ts)) { - continue + for idx, sample := range alignedInfo { + // According to the docs, SampleInfo and Value should have the same length, but we've seen corrupted + // data coming back with missing values. Take care of that gracefully! + if idx >= len(alignedValues) { + log.Printf("D! [inputs.vsphere] len(SampleInfo)>len(Value) %d > %d", len(alignedInfo), len(alignedValues)) + break } - value := v.Value[idx] + ts := sample.Timestamp + if ts.After(latestSample) { + latestSample = ts + } + nValues++ // Organize the metrics into a bucket per measurement. - // Data SHOULD be presented to us with the same timestamp for all samples, but in case - // they don't we use the measurement name + timestamp as the key for the bucket. mn, fn := e.makeMetricIdentifier(prefix, name) bKey := mn + " " + v.Instance + " " + strconv.FormatInt(ts.UnixNano(), 10) bucket, found := buckets[bKey] @@ -832,27 +929,26 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, bucket = metricEntry{name: mn, ts: ts, fields: make(map[string]interface{}), tags: t} buckets[bKey] = bucket } - if value < 0 { - log.Printf("D! [input.vsphere]: Negative value for %s on %s. Indicates missing samples", name, objectRef.name) - continue - } // Percentage values must be scaled down by 100. info, ok := metricInfo[name] if !ok { - log.Printf("E! [input.vsphere]: Could not determine unit for %s. Skipping", name) + log.Printf("E! [inputs.vsphere]: Could not determine unit for %s. Skipping", name) } + v := alignedValues[idx] if info.UnitInfo.GetElementDescription().Key == "percent" { - bucket.fields[fn] = float64(value) / 100.0 + bucket.fields[fn] = float64(v) / 100.0 } else { - bucket.fields[fn] = value + bucket.fields[fn] = v } count++ - // Update highwater marks for non-realtime metrics. - if !res.realTime { - e.hwMarks.Put(tsKey, ts) - } + // Update highwater marks + e.hwMarks.Put(moid, ts) + } + if nValues == 0 { + log.Printf("D! [inputs.vsphere]: Missing value for: %s, %s", name, objectRef.name) + continue } } // We've iterated through all the metrics and collected buckets for each @@ -861,17 +957,7 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec, acc.AddFields(bucket.name, bucket.fields, bucket.tags, bucket.ts) } } - return count, nil -} - -func (e *Endpoint) getParent(obj resourceInfo) (resourceInfo, bool) { - p := obj.parentRef - if p == nil { - log.Printf("D! [input.vsphere] No parent found for %s", obj.name) - return resourceInfo{}, false - } - r, ok := e.instanceInfo[p.Value] - return r, ok + return count, latestSample, nil } func (e *Endpoint) populateTags(objectRef *objectRef, resourceType string, resource *resourceKind, t map[string]string, v *performance.MetricSeries) { @@ -885,14 +971,14 @@ func (e *Endpoint) populateTags(objectRef *objectRef, resourceType string, resou } // Map parent reference - parent, found := e.instanceInfo[objectRef.parentRef.Value] + parent, found := e.getParent(objectRef, resource) if found { t[resource.parentTag] = parent.name if resourceType == "vm" { if objectRef.guest != "" { t["guest"] = objectRef.guest } - if c, ok := e.getParent(parent); ok { + if c, ok := e.resourceKinds["cluster"].objects[parent.parentRef.Value]; ok { t["clustername"] = c.name } } diff --git a/plugins/inputs/vsphere/throttled_exec.go b/plugins/inputs/vsphere/throttled_exec.go new file mode 100644 index 0000000000000..ac95b496c97fa --- /dev/null +++ b/plugins/inputs/vsphere/throttled_exec.go @@ -0,0 +1,45 @@ +package vsphere + +import ( + "context" + "sync" +) + +// ThrottledExecutor provides a simple mechanism for running jobs in separate +// goroutines while limit the number of concurrent jobs running at any given time. +type ThrottledExecutor struct { + limiter chan struct{} + wg sync.WaitGroup +} + +// NewThrottledExecutor creates a new ThrottlesExecutor with a specified maximum +// number of concurrent jobs +func NewThrottledExecutor(limit int) *ThrottledExecutor { + if limit == 0 { + panic("Limit must be > 0") + } + return &ThrottledExecutor{limiter: make(chan struct{}, limit)} +} + +// Run schedules a job for execution as soon as possible while respecting the +// maximum concurrency limit. +func (t *ThrottledExecutor) Run(ctx context.Context, job func()) { + t.wg.Add(1) + go func() { + defer t.wg.Done() + select { + case t.limiter <- struct{}{}: + defer func() { + <-t.limiter + }() + job() + case <-ctx.Done(): + return + } + }() +} + +// Wait blocks until all scheduled jobs have finished +func (t *ThrottledExecutor) Wait() { + t.wg.Wait() +} diff --git a/plugins/inputs/vsphere/tscache.go b/plugins/inputs/vsphere/tscache.go index 9abe24ea725c5..1d1f00ebea3cc 100644 --- a/plugins/inputs/vsphere/tscache.go +++ b/plugins/inputs/vsphere/tscache.go @@ -49,6 +49,14 @@ func (t *TSCache) IsNew(key string, tm time.Time) bool { return !tm.Before(v) } +// Get returns a timestamp (if present) +func (t *TSCache) Get(key string) (time.Time, bool) { + t.mux.RLock() + defer t.mux.RUnlock() + ts, ok := t.table[key] + return ts, ok +} + // Put updates the latest timestamp for the supplied key. func (t *TSCache) Put(key string, time time.Time) { t.mux.Lock() diff --git a/plugins/inputs/vsphere/vsphere.go b/plugins/inputs/vsphere/vsphere.go index f0bb5dca99c38..13186634fb51d 100644 --- a/plugins/inputs/vsphere/vsphere.go +++ b/plugins/inputs/vsphere/vsphere.go @@ -155,7 +155,7 @@ var sampleConfig = ` ## Clusters # cluster_metric_include = [] ## if omitted or empty, all metrics are collected # cluster_metric_exclude = [] ## Nothing excluded by default - # cluster_instances = true ## true by default + # cluster_instances = false ## false by default ## Datastores # datastore_metric_include = [] ## if omitted or empty, all metrics are collected @@ -260,7 +260,6 @@ func (v *VSphere) Stop() { // Gather is the main data collection function called by the Telegraf core. It performs all // the data collection and writes all metrics into the Accumulator passed as an argument. func (v *VSphere) Gather(acc telegraf.Accumulator) error { - merr := make(multiError, 0) var wg sync.WaitGroup for _, ep := range v.endpoints { wg.Add(1) @@ -274,15 +273,11 @@ func (v *VSphere) Gather(acc telegraf.Accumulator) error { } if err != nil { acc.AddError(err) - merr = append(merr, err) } }(ep) } wg.Wait() - if len(merr) > 0 { - return merr - } return nil } @@ -291,7 +286,7 @@ func init() { return &VSphere{ Vcenters: []string{}, - ClusterInstances: true, + ClusterInstances: false, ClusterMetricInclude: nil, ClusterMetricExclude: nil, HostInstances: true, diff --git a/plugins/inputs/vsphere/vsphere_test.go b/plugins/inputs/vsphere/vsphere_test.go index 4eb3d28f810e6..a4b931bd96568 100644 --- a/plugins/inputs/vsphere/vsphere_test.go +++ b/plugins/inputs/vsphere/vsphere_test.go @@ -7,8 +7,11 @@ import ( "regexp" "sort" "strings" + "sync" + "sync/atomic" "testing" "time" + "unsafe" "github.com/influxdata/telegraf/internal" itls "github.com/influxdata/telegraf/internal/tls" @@ -175,6 +178,8 @@ func defaultVSphere() *VSphere { ObjectDiscoveryInterval: internal.Duration{Duration: time.Second * 300}, Timeout: internal.Duration{Duration: time.Second * 20}, ForceDiscoverOnInit: true, + DiscoverConcurrency: 1, + CollectConcurrency: 1, } } @@ -205,32 +210,43 @@ func TestParseConfig(t *testing.T) { } func TestWorkerPool(t *testing.T) { - wp := NewWorkerPool(100) - ctx := context.Background() - wp.Run(ctx, func(ctx context.Context, p interface{}) interface{} { - return p.(int) * 2 - }, 10) - - n := 100000 - wp.Fill(ctx, func(ctx context.Context, f PushFunc) { - for i := 0; i < n; i++ { - f(ctx, i) - } - }) - results := make([]int, n) - i := 0 - wp.Drain(ctx, func(ctx context.Context, p interface{}) bool { - results[i] = p.(int) - i++ - return true - }) + max := int64(0) + ngr := int64(0) + n := 10000 + var mux sync.Mutex + results := make([]int, 0, n) + te := NewThrottledExecutor(5) + for i := 0; i < n; i++ { + func(i int) { + te.Run(context.Background(), func() { + atomic.AddInt64(&ngr, 1) + mux.Lock() + defer mux.Unlock() + results = append(results, i*2) + if ngr > max { + max = ngr + } + time.Sleep(100 * time.Microsecond) + atomic.AddInt64(&ngr, -1) + }) + }(i) + } + te.Wait() sort.Ints(results) for i := 0; i < n; i++ { - require.Equal(t, results[i], i*2) + require.Equal(t, results[i], i*2, "Some jobs didn't run") } + require.Equal(t, int64(5), max, "Wrong number of goroutines spawned") } func TestTimeout(t *testing.T) { + // Don't run test on 32-bit machines due to bug in simulator. + // https://github.com/vmware/govmomi/issues/1330 + var i int + if unsafe.Sizeof(i) < 8 { + return + } + m, s, err := createSim() if err != nil { t.Fatal(err) @@ -245,7 +261,7 @@ func TestTimeout(t *testing.T) { require.NoError(t, v.Start(nil)) // We're not using the Accumulator, so it can be nil. defer v.Stop() err = v.Gather(&acc) - require.NotNil(t, err, "Error should not be nil here") + require.True(t, len(acc.Errors) > 0, "Errors should not be empty here") // The accumulator must contain exactly one error and it must be a deadline exceeded. require.Equal(t, 1, len(acc.Errors)) @@ -253,6 +269,12 @@ func TestTimeout(t *testing.T) { } func TestMaxQuery(t *testing.T) { + // Don't run test on 32-bit machines due to bug in simulator. + // https://github.com/vmware/govmomi/issues/1330 + var i int + if unsafe.Sizeof(i) < 8 { + return + } m, s, err := createSim() if err != nil { t.Fatal(err) @@ -290,6 +312,13 @@ func TestMaxQuery(t *testing.T) { } func TestAll(t *testing.T) { + // Don't run test on 32-bit machines due to bug in simulator. + // https://github.com/vmware/govmomi/issues/1330 + var i int + if unsafe.Sizeof(i) < 8 { + return + } + m, s, err := createSim() if err != nil { t.Fatal(err) @@ -300,7 +329,8 @@ func TestAll(t *testing.T) { var acc testutil.Accumulator v := defaultVSphere() v.Vcenters = []string{s.URL.String()} - v.Start(nil) // We're not using the Accumulator, so it can be nil. + v.Start(&acc) defer v.Stop() require.NoError(t, v.Gather(&acc)) + require.Equal(t, 0, len(acc.Errors), fmt.Sprintf("Errors found: %s", acc.Errors)) } diff --git a/plugins/inputs/vsphere/workerpool.go b/plugins/inputs/vsphere/workerpool.go deleted file mode 100644 index 6695735ce3a22..0000000000000 --- a/plugins/inputs/vsphere/workerpool.go +++ /dev/null @@ -1,119 +0,0 @@ -package vsphere - -import ( - "context" - "log" - "sync" -) - -// WorkerFunc is a function that is supposed to do the actual work -// of the WorkerPool. It is similar to the "map" portion of the -// map/reduce semantics, in that it takes a single value as an input, -// does some processing and returns a single result. -type WorkerFunc func(context.Context, interface{}) interface{} - -// PushFunc is called from a FillerFunc to push a workitem onto -// the input channel. Wraps some logic for gracefulk shutdowns. -type PushFunc func(context.Context, interface{}) bool - -// DrainerFunc represents a function used to "drain" the WorkerPool, -// i.e. pull out all the results generated by the workers and processing -// them. The DrainerFunc is called once per result produced. -// If the function returns false, the draining of the pool is aborted. -type DrainerFunc func(context.Context, interface{}) bool - -// FillerFunc represents a function for filling the WorkerPool with jobs. -// It is called once and is responsible for pushing jobs onto the supplied channel. -type FillerFunc func(context.Context, PushFunc) - -// WorkerPool implements a simple work pooling mechanism. It runs a predefined -// number of goroutines to process jobs. Jobs are inserted using the Fill call -// and results are retrieved through the Drain function. -type WorkerPool struct { - wg sync.WaitGroup - In chan interface{} - Out chan interface{} -} - -// NewWorkerPool creates a worker pool -func NewWorkerPool(bufsize int) *WorkerPool { - return &WorkerPool{ - In: make(chan interface{}, bufsize), - Out: make(chan interface{}, bufsize), - } -} - -func (w *WorkerPool) push(ctx context.Context, job interface{}) bool { - select { - case w.In <- job: - return true - case <-ctx.Done(): - return false - } -} - -func (w *WorkerPool) pushOut(ctx context.Context, result interface{}) bool { - select { - case w.Out <- result: - return true - case <-ctx.Done(): - return false - } -} - -// Run takes a WorkerFunc and runs it in 'n' goroutines. -func (w *WorkerPool) Run(ctx context.Context, f WorkerFunc, n int) bool { - w.wg.Add(1) - go func() { - defer w.wg.Done() - var localWg sync.WaitGroup - localWg.Add(n) - for i := 0; i < n; i++ { - go func() { - defer localWg.Done() - for { - select { - case job, ok := <-w.In: - if !ok { - return - } - w.pushOut(ctx, f(ctx, job)) - case <-ctx.Done(): - log.Printf("D! [input.vsphere]: Stop requested for worker pool. Exiting.") - return - } - } - }() - } - localWg.Wait() - close(w.Out) - }() - return ctx.Err() == nil -} - -// Fill runs a FillerFunc responsible for supplying work to the pool. You may only -// call Fill once. Calling it twice will panic. -func (w *WorkerPool) Fill(ctx context.Context, f FillerFunc) bool { - w.wg.Add(1) - go func() { - defer w.wg.Done() - f(ctx, w.push) - close(w.In) - }() - return true -} - -// Drain runs a DrainerFunc for each result generated by the workers. -func (w *WorkerPool) Drain(ctx context.Context, f DrainerFunc) bool { - w.wg.Add(1) - go func() { - defer w.wg.Done() - for result := range w.Out { - if !f(ctx, result) { - break - } - } - }() - w.wg.Wait() - return ctx.Err() != nil -}