diff --git a/server/pse/pse_windows.go b/server/pse/pse_windows.go index b54c76aff6..40cf1293af 100644 --- a/server/pse/pse_windows.go +++ b/server/pse/pse_windows.go @@ -1,169 +1,268 @@ // Copyright 2015-2016 Apcera Inc. All rights reserved. +// +build windows package pse import ( - "errors" "fmt" "os" - "os/exec" "path/filepath" - "strconv" "strings" "sync" + "syscall" + "time" + "unsafe" ) -// cache the image name to optimize repeated calls -var imageName string -var imageLock sync.Mutex +var ( + pdh = syscall.NewLazyDLL("pdh.dll") + winPdhOpenQuery = pdh.NewProc("PdhOpenQuery") + winPdhAddCounter = pdh.NewProc("PdhAddCounterW") + winPdhCollectQueryData = pdh.NewProc("PdhCollectQueryData") + winPdhGetFormattedCounterValue = pdh.NewProc("PdhGetFormattedCounterValue") + winPdhGetFormattedCounterArray = pdh.NewProc("PdhGetFormattedCounterArrayW") +) -// parseValues parses the results of data returned by typeperf.exe. This -// is a series of comma delimited quoted strings, containing date time, -// pid, pcpu, rss, and vss. All numeric values are floating point. -// eg: "04/17/2016 15.38.00.016", "5123.00000", "1.23400", "123.00000", "123.00000" -func parseValues(line string, pid *int, pcpu *float64, rss, vss *int64) (err error) { - values := strings.Split(line, ",") - if len(values) < 4 { - return errors.New("Invalid result.") - } - // values[0] will be date, time, ignore them - // parse the pid - fVal, err := strconv.ParseFloat(strings.Trim(values[1], "\""), 64) - if err != nil { - return errors.New(fmt.Sprintf("Unable to parse pid: %s", values[1])) - } - *pid = int(fVal) +// global performance counter query handle and counters +var ( + pcHandle PDH_HQUERY + pidCounter, cpuCounter, rssCounter, vssCounter PDH_HCOUNTER + prevCPU float64 + prevRss int64 + prevVss int64 + lastSampleTime time.Time + processPid int + pcQueryLock sync.Mutex + initialSample = true +) - // parse pcpu - *pcpu, err = strconv.ParseFloat(strings.Trim(values[2], "\""), 64) - if err != nil { - return errors.New(fmt.Sprintf("Unable to parse percent cpu: %s", values[2])) - } +// maxQuerySize is the number of values to return from a query. +// It represents the maximum # of servers that can be queried +// simultaneously running on a machine. +const maxQuerySize = 512 - // parse private working set (rss) - fVal, err = strconv.ParseFloat(strings.Trim(values[3], "\""), 64) - if err != nil { - return errors.New(fmt.Sprintf("Unable to parse working set: %s", values[3])) - } - *rss = int64(fVal) +// Keep static memory around to reuse; this works best for passing +// into the pdh API. +var counterResults [maxQuerySize]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE + +// PDH Types +type ( + PDH_HQUERY syscall.Handle + PDH_HCOUNTER syscall.Handle +) + +// PDH constants used here +const ( + PDH_FMT_DOUBLE = 0x00000200 + PDH_INVALID_DATA = 0xC0000BC6 + PDH_MORE_DATA = 0x800007D2 +) + +// PDH_FMT_COUNTERVALUE_DOUBLE - double value +type PDH_FMT_COUNTERVALUE_DOUBLE struct { + CStatus uint32 + DoubleValue float64 +} + +// PDH_FMT_COUNTERVALUE_ITEM_DOUBLE is an array +// element of a double value +type PDH_FMT_COUNTERVALUE_ITEM_DOUBLE struct { + SzName *uint16 // pointer to a string + FmtValue PDH_FMT_COUNTERVALUE_DOUBLE +} - // parse virtual bytes (vsz) - fVal, err = strconv.ParseFloat(strings.Trim(values[4], "\""), 64) - if err != nil { - return errors.New(fmt.Sprintf("Unable to parse virtual bytes: %s", values[4])) +func pdhAddCounter(hQuery PDH_HQUERY, szFullCounterPath string, dwUserData uintptr, phCounter *PDH_HCOUNTER) error { + ptxt, _ := syscall.UTF16PtrFromString(szFullCounterPath) + r0, _, _ := winPdhAddCounter.Call( + uintptr(hQuery), + uintptr(unsafe.Pointer(ptxt)), + dwUserData, + uintptr(unsafe.Pointer(phCounter))) + + if r0 != 0 { + return fmt.Errorf("pdhAddCounter failed. %d", r0) } - *vss = int64(fVal) + return nil +} +func pdhOpenQuery(datasrc *uint16, userdata uint32, query *PDH_HQUERY) error { + r0, _, _ := syscall.Syscall(winPdhOpenQuery.Addr(), 3, 0, uintptr(userdata), uintptr(unsafe.Pointer(query))) + if r0 != 0 { + return fmt.Errorf("pdhOpenQuery failed - %d", r0) + } return nil } -// getStatsForProcess retrieves process information for a given instance name. -// typeperf.exe is the windows native command line utility to get pcpu, rss, -// and vsz equivalents through queries of performance counters. -// An alternative is to map the Pdh* native windows API from pdh.dll, -// and call those APIs directly - this is a simpler and cleaner approach. -func getStatsForProcess(name string, pcpu *float64, rss, vss *int64, pid *int) (err error) { - // query the counters using typeperf. "-sc","1" requests one - // set of data (versus continuous monitoring) - out, err := exec.Command("typeperf.exe", - fmt.Sprintf("\\Process(%s)\\ID Process", name), - fmt.Sprintf("\\Process(%s)\\%% Processor Time", name), - fmt.Sprintf("\\Process(%s)\\Working Set - Private", name), - fmt.Sprintf("\\Process(%s)\\Virtual Bytes", name), - "-sc", "1").Output() - if err != nil { - // Signal that the command ran, but the image instance was not found - // through a PID of -1. - if strings.Contains(string(out), "The data is not valid") { - *pid = -1 - return nil - } else { - // something went wrong executing the command - return errors.New(fmt.Sprintf("typeperf failed: %v", err)) - } +func pdhCollectQueryData(hQuery PDH_HQUERY) error { + r0, _, _ := winPdhCollectQueryData.Call(uintptr(hQuery)) + if r0 != 0 { + return fmt.Errorf("pdhCollectQueryData failed - %d", r0) } + return nil +} - results := strings.Split(string(out), "\r\n") - // results[0] = newline - // results[1] = headers - // results[2] = values - // ignore the rest... - if len(results) < 3 { - return errors.New(fmt.Sprintf("unexpected results from typeperf")) +// pdhGetFormattedCounterArrayDouble returns the value of return code +// rather than error, to easily check return codes +func pdhGetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER, lpdwBufferSize *uint32, lpdwBufferCount *uint32, itemBuffer *PDH_FMT_COUNTERVALUE_ITEM_DOUBLE) uint32 { + ret, _, _ := winPdhGetFormattedCounterArray.Call( + uintptr(hCounter), + uintptr(PDH_FMT_DOUBLE), + uintptr(unsafe.Pointer(lpdwBufferSize)), + uintptr(unsafe.Pointer(lpdwBufferCount)), + uintptr(unsafe.Pointer(itemBuffer))) + + return uint32(ret) +} + +func getCounterArrayData(counter PDH_HCOUNTER) ([]float64, error) { + var bufSize uint32 + var bufCount uint32 + + // Retrieving array data requires two calls, the first which + // requires an adressable empty buffer, and sets size fields. + // The second call returns the data. + initialBuf := make([]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE, 1) + ret := pdhGetFormattedCounterArrayDouble(counter, &bufSize, &bufCount, &initialBuf[0]) + if ret == PDH_MORE_DATA { + // we'll likely never get here, but be safe. + if bufCount > maxQuerySize { + bufCount = maxQuerySize + } + ret = pdhGetFormattedCounterArrayDouble(counter, &bufSize, &bufCount, &counterResults[0]) + if ret == 0 { + rv := make([]float64, bufCount) + for i := 0; i < int(bufCount); i++ { + rv[i] = counterResults[i].FmtValue.DoubleValue + } + return rv, nil + } } - if err = parseValues(results[2], pid, pcpu, rss, vss); err != nil { - return err + if ret != 0 { + return nil, fmt.Errorf("getCounterArrayData failed - %d", ret) } - return nil + + return nil, nil } // getProcessImageName returns the name of the process image, as expected by -// typeperf. +// the performance counter API. func getProcessImageName() (name string) { name = filepath.Base(os.Args[0]) name = strings.TrimRight(name, ".exe") return } -// procUsage retrieves process cpu and memory information. -// Under the hood, typeperf is called. Notably, typeperf cannot search -// using a pid, but instead uses a somewhat volatile process image name. -// If there is more than one instance, "#" is appended to -// the image name. Wildcard filters are supported, but result in a very -// complex data set to parse. -func ProcUsage(pcpu *float64, rss, vss *int64) error { - var ppid int = -1 +// initialize our counters +func initCounters() (err error) { - imageLock.Lock() - name := imageName - imageLock.Unlock() + processPid = os.Getpid() + // require an addressible nil pointer + var source uint16 + if err := pdhOpenQuery(&source, 0, &pcHandle); err != nil { + return err + } - // Get the pid to retrieve the right set of information for this process. - procPid := os.Getpid() + // setup the performance counters, search for all server instances + name := fmt.Sprintf("%s*", getProcessImageName()) + pidQuery := fmt.Sprintf("\\Process(%s)\\ID Process", name) + cpuQuery := fmt.Sprintf("\\Process(%s)\\%% Processor Time", name) + rssQuery := fmt.Sprintf("\\Process(%s)\\Working Set - Private", name) + vssQuery := fmt.Sprintf("\\Process(%s)\\Virtual Bytes", name) - // if we have cached the image name, try that first - if name != "" { - err := getStatsForProcess(name, pcpu, rss, vss, &ppid) - if err != nil { - return err - } - // If the instance name's pid matches ours, we're done. - // Otherwise, this instance has been renamed, which is possible - // as other process instances start and stop on the system. - if ppid == procPid { - return nil - } + if err = pdhAddCounter(pcHandle, pidQuery, 0, &pidCounter); err != nil { + return err } - // If we get here, the instance name is invalid (nil, or out of sync) - // Query pid and counters until the correct image name is found and - // cache it. This is optimized for one or two instances on a windows - // node. An alternative is using a wildcard to first lookup up pids, - // and parse those to find instance name, then lookup the - // performance counters. - prefix := getProcessImageName() - for i := 0; ppid != procPid; i++ { - name = fmt.Sprintf("%s#%d", prefix, i) - err := getStatsForProcess(name, pcpu, rss, vss, &ppid) - if err != nil { + if err = pdhAddCounter(pcHandle, cpuQuery, 0, &cpuCounter); err != nil { + return err + } + if err = pdhAddCounter(pcHandle, rssQuery, 0, &rssCounter); err != nil { + return err + } + if err = pdhAddCounter(pcHandle, vssQuery, 0, &vssCounter); err != nil { + return err + } + + // prime the counters by collecting once, and sleep to get somewhat + // useful information the first request. Counters for the CPU require + // at least two collect calls. + if err = pdhCollectQueryData(pcHandle); err != nil { + return err + } + time.Sleep(50) + + return nil +} + +// ProcUsage returns process CPU and memory statistics +func ProcUsage(pcpu *float64, rss, vss *int64) error { + var err error + + // For simplicity, protect the entire call. + // Most simultaneous requests will immediately return + // with cached values. + pcQueryLock.Lock() + defer pcQueryLock.Unlock() + + // First time through, initialize counters. + if initialSample { + if err = initCounters(); err != nil { return err } + initialSample = false + } else if time.Since(lastSampleTime) < (2 * time.Second) { + // only refresh every two seconds as to minimize impact + // on the server. + *pcpu = prevCPU + *rss = prevRss + *vss = prevVss + return nil + } - // Bail out if an image name is not found. - if ppid < 0 { - break - } + // always save the sample time, even on errors. + defer func() { + lastSampleTime = time.Now() + }() - // if the pids equal, this is the right process and cache our - // image name - if ppid == procPid { - imageLock.Lock() - imageName = name - imageLock.Unlock() + // refresh the performance counter data + if err = pdhCollectQueryData(pcHandle); err != nil { + return err + } + + // retrieve the data + var pidAry, cpuAry, rssAry, vssAry []float64 + if pidAry, err = getCounterArrayData(pidCounter); err != nil { + return err + } + if cpuAry, err = getCounterArrayData(cpuCounter); err != nil { + return err + } + if rssAry, err = getCounterArrayData(rssCounter); err != nil { + return err + } + if vssAry, err = getCounterArrayData(vssCounter); err != nil { + return err + } + // find the index of the entry for this process + idx := int(-1) + for i := range pidAry { + if int(pidAry[i]) == processPid { + idx = i break } } - if ppid < 0 { - return errors.New("unable to retrieve process counters") + // no pid found... + if idx < 0 { + return fmt.Errorf("could not find pid in performance counter results") } + // assign values from the performance counters + *pcpu = cpuAry[idx] + *rss = int64(rssAry[idx]) + *vss = int64(vssAry[idx]) + + // save off cache values + prevCPU = *pcpu + prevRss = *rss + prevVss = *vss + return nil } diff --git a/server/pse/pse_windows_test.go b/server/pse/pse_windows_test.go index a868c3ee00..978ce833ff 100644 --- a/server/pse/pse_windows_test.go +++ b/server/pse/pse_windows_test.go @@ -26,45 +26,20 @@ func checkValues(t *testing.T, pcpu, tPcpu float64, rss, tRss int64) { if delta < 0 { delta = -delta } - if delta > 200*1024 { // 200k + if delta > 200*1024 { // 200k - basically sanity check t.Fatalf("RSSs did not match close enough: %d vs %d", rss, tRss) } } } -func testParseValues(t *testing.T) { - var pid int - var pcpu float64 - var rss, vss int64 - - err := parseValues("invalid", &pid, &pcpu, &rss, &vss) - if err == nil { - t.Fatal("Did not receive expected error.") - } - err = parseValues( - "\"date time\",\"invalid float\",\"invalid float\",\"invalid float\"", - &pid, &pcpu, &rss, &vss) - if err == nil { - t.Fatal("Did not receive expected error.") - } - err = parseValues( - "\"date time\",\"1234.00000\",\"invalid float\",\"invalid float\"", - &pid, &pcpu, &rss, &vss) - if err == nil { - t.Fatal("Did not receive expected error.") - } - err = parseValues( - "\"date time\",\"1234.00000\",\"1234.00000\",\"invalid float\"", - &pid, &pcpu, &rss, &vss) - if err == nil { - t.Fatal("Did not receive expected error.") - } -} - func TestPSEmulationWin(t *testing.T) { var pcpu, tPcpu float64 var rss, vss, tRss int64 + if err := ProcUsage(&pcpu, &rss, &vss); err != nil { + t.Fatalf("Error: %v", err) + } + imageName := getProcessImageName() // query the counters using typeperf out, err := exec.Command("typeperf.exe", @@ -83,31 +58,21 @@ func TestPSEmulationWin(t *testing.T) { // parse pcpu tPcpu, err = strconv.ParseFloat(strings.Trim(values[1], "\""), 64) if err != nil { - t.Fatal("Unable to parse percent cpu: %s", values[1]) + t.Fatalf("Unable to parse percent cpu: %s", values[1]) } // parse private bytes (rss) fval, err := strconv.ParseFloat(strings.Trim(values[2], "\""), 64) if err != nil { - t.Fatal("Unable to parse private bytes: %s", values[2]) + t.Fatalf("Unable to parse private bytes: %s", values[2]) } tRss = int64(fval) - if err = ProcUsage(&pcpu, &rss, &vss); err != nil { - t.Fatal("Error: %v", err) - } checkValues(t, pcpu, tPcpu, rss, tRss) - // Again to test image name caching + // Again to test caching if err = ProcUsage(&pcpu, &rss, &vss); err != nil { - t.Fatal("Error: %v", err) + t.Fatalf("Error: %v", err) } checkValues(t, pcpu, tPcpu, rss, tRss) - - testParseValues(t) - - var ppid int - if err = getStatsForProcess("invalid", &pcpu, &rss, &vss, &ppid); err != nil { - t.Fatal("Did not receive expected error.") - } }