Skip to content

Commit

Permalink
Merge pull request #266 from nats-io/update-win-pse
Browse files Browse the repository at this point in the history
Update PSE for Windows
  • Loading branch information
derekcollison committed May 6, 2016
2 parents 099aaa3 + 26dab3b commit 0378f6d
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 165 deletions.
341 changes: 220 additions & 121 deletions server/pse/pse_windows.go
@@ -1,169 +1,268 @@
// Copyright 2015-2016 Apcera Inc. All rights reserved.
// +build windows

package pse

import (
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"unsafe"
)

// cache the image name to optimize repeated calls
var imageName string
var imageLock sync.Mutex
var (
pdh = syscall.NewLazyDLL("pdh.dll")
winPdhOpenQuery = pdh.NewProc("PdhOpenQuery")
winPdhAddCounter = pdh.NewProc("PdhAddCounterW")
winPdhCollectQueryData = pdh.NewProc("PdhCollectQueryData")
winPdhGetFormattedCounterValue = pdh.NewProc("PdhGetFormattedCounterValue")
winPdhGetFormattedCounterArray = pdh.NewProc("PdhGetFormattedCounterArrayW")
)

// parseValues parses the results of data returned by typeperf.exe. This
// is a series of comma delimited quoted strings, containing date time,
// pid, pcpu, rss, and vss. All numeric values are floating point.
// eg: "04/17/2016 15.38.00.016", "5123.00000", "1.23400", "123.00000", "123.00000"
func parseValues(line string, pid *int, pcpu *float64, rss, vss *int64) (err error) {
values := strings.Split(line, ",")
if len(values) < 4 {
return errors.New("Invalid result.")
}
// values[0] will be date, time, ignore them
// parse the pid
fVal, err := strconv.ParseFloat(strings.Trim(values[1], "\""), 64)
if err != nil {
return errors.New(fmt.Sprintf("Unable to parse pid: %s", values[1]))
}
*pid = int(fVal)
// global performance counter query handle and counters
var (
pcHandle PDH_HQUERY
pidCounter, cpuCounter, rssCounter, vssCounter PDH_HCOUNTER
prevCPU float64
prevRss int64
prevVss int64
lastSampleTime time.Time
processPid int
pcQueryLock sync.Mutex
initialSample = true
)

// parse pcpu
*pcpu, err = strconv.ParseFloat(strings.Trim(values[2], "\""), 64)
if err != nil {
return errors.New(fmt.Sprintf("Unable to parse percent cpu: %s", values[2]))
}
// maxQuerySize is the number of values to return from a query.
// It represents the maximum # of servers that can be queried
// simultaneously running on a machine.
const maxQuerySize = 512

// parse private working set (rss)
fVal, err = strconv.ParseFloat(strings.Trim(values[3], "\""), 64)
if err != nil {
return errors.New(fmt.Sprintf("Unable to parse working set: %s", values[3]))
}
*rss = int64(fVal)
// Keep static memory around to reuse; this works best for passing
// into the pdh API.
var counterResults [maxQuerySize]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE

// PDH Types
type (
PDH_HQUERY syscall.Handle
PDH_HCOUNTER syscall.Handle
)

// PDH constants used here
const (
PDH_FMT_DOUBLE = 0x00000200
PDH_INVALID_DATA = 0xC0000BC6
PDH_MORE_DATA = 0x800007D2
)

// PDH_FMT_COUNTERVALUE_DOUBLE - double value
type PDH_FMT_COUNTERVALUE_DOUBLE struct {
CStatus uint32
DoubleValue float64
}

// PDH_FMT_COUNTERVALUE_ITEM_DOUBLE is an array
// element of a double value
type PDH_FMT_COUNTERVALUE_ITEM_DOUBLE struct {
SzName *uint16 // pointer to a string
FmtValue PDH_FMT_COUNTERVALUE_DOUBLE
}

// parse virtual bytes (vsz)
fVal, err = strconv.ParseFloat(strings.Trim(values[4], "\""), 64)
if err != nil {
return errors.New(fmt.Sprintf("Unable to parse virtual bytes: %s", values[4]))
func pdhAddCounter(hQuery PDH_HQUERY, szFullCounterPath string, dwUserData uintptr, phCounter *PDH_HCOUNTER) error {
ptxt, _ := syscall.UTF16PtrFromString(szFullCounterPath)
r0, _, _ := winPdhAddCounter.Call(
uintptr(hQuery),
uintptr(unsafe.Pointer(ptxt)),
dwUserData,
uintptr(unsafe.Pointer(phCounter)))

if r0 != 0 {
return fmt.Errorf("pdhAddCounter failed. %d", r0)
}
*vss = int64(fVal)
return nil
}

func pdhOpenQuery(datasrc *uint16, userdata uint32, query *PDH_HQUERY) error {
r0, _, _ := syscall.Syscall(winPdhOpenQuery.Addr(), 3, 0, uintptr(userdata), uintptr(unsafe.Pointer(query)))
if r0 != 0 {
return fmt.Errorf("pdhOpenQuery failed - %d", r0)
}
return nil
}

// getStatsForProcess retrieves process information for a given instance name.
// typeperf.exe is the windows native command line utility to get pcpu, rss,
// and vsz equivalents through queries of performance counters.
// An alternative is to map the Pdh* native windows API from pdh.dll,
// and call those APIs directly - this is a simpler and cleaner approach.
func getStatsForProcess(name string, pcpu *float64, rss, vss *int64, pid *int) (err error) {
// query the counters using typeperf. "-sc","1" requests one
// set of data (versus continuous monitoring)
out, err := exec.Command("typeperf.exe",
fmt.Sprintf("\\Process(%s)\\ID Process", name),
fmt.Sprintf("\\Process(%s)\\%% Processor Time", name),
fmt.Sprintf("\\Process(%s)\\Working Set - Private", name),
fmt.Sprintf("\\Process(%s)\\Virtual Bytes", name),
"-sc", "1").Output()
if err != nil {
// Signal that the command ran, but the image instance was not found
// through a PID of -1.
if strings.Contains(string(out), "The data is not valid") {
*pid = -1
return nil
} else {
// something went wrong executing the command
return errors.New(fmt.Sprintf("typeperf failed: %v", err))
}
func pdhCollectQueryData(hQuery PDH_HQUERY) error {
r0, _, _ := winPdhCollectQueryData.Call(uintptr(hQuery))
if r0 != 0 {
return fmt.Errorf("pdhCollectQueryData failed - %d", r0)
}
return nil
}

results := strings.Split(string(out), "\r\n")
// results[0] = newline
// results[1] = headers
// results[2] = values
// ignore the rest...
if len(results) < 3 {
return errors.New(fmt.Sprintf("unexpected results from typeperf"))
// pdhGetFormattedCounterArrayDouble returns the value of return code
// rather than error, to easily check return codes
func pdhGetFormattedCounterArrayDouble(hCounter PDH_HCOUNTER, lpdwBufferSize *uint32, lpdwBufferCount *uint32, itemBuffer *PDH_FMT_COUNTERVALUE_ITEM_DOUBLE) uint32 {
ret, _, _ := winPdhGetFormattedCounterArray.Call(
uintptr(hCounter),
uintptr(PDH_FMT_DOUBLE),
uintptr(unsafe.Pointer(lpdwBufferSize)),
uintptr(unsafe.Pointer(lpdwBufferCount)),
uintptr(unsafe.Pointer(itemBuffer)))

return uint32(ret)
}

func getCounterArrayData(counter PDH_HCOUNTER) ([]float64, error) {
var bufSize uint32
var bufCount uint32

// Retrieving array data requires two calls, the first which
// requires an adressable empty buffer, and sets size fields.
// The second call returns the data.
initialBuf := make([]PDH_FMT_COUNTERVALUE_ITEM_DOUBLE, 1)
ret := pdhGetFormattedCounterArrayDouble(counter, &bufSize, &bufCount, &initialBuf[0])
if ret == PDH_MORE_DATA {
// we'll likely never get here, but be safe.
if bufCount > maxQuerySize {
bufCount = maxQuerySize
}
ret = pdhGetFormattedCounterArrayDouble(counter, &bufSize, &bufCount, &counterResults[0])
if ret == 0 {
rv := make([]float64, bufCount)
for i := 0; i < int(bufCount); i++ {
rv[i] = counterResults[i].FmtValue.DoubleValue
}
return rv, nil
}
}
if err = parseValues(results[2], pid, pcpu, rss, vss); err != nil {
return err
if ret != 0 {
return nil, fmt.Errorf("getCounterArrayData failed - %d", ret)
}
return nil

return nil, nil
}

// getProcessImageName returns the name of the process image, as expected by
// typeperf.
// the performance counter API.
func getProcessImageName() (name string) {
name = filepath.Base(os.Args[0])
name = strings.TrimRight(name, ".exe")
return
}

// procUsage retrieves process cpu and memory information.
// Under the hood, typeperf is called. Notably, typeperf cannot search
// using a pid, but instead uses a somewhat volatile process image name.
// If there is more than one instance, "#<instancecount>" is appended to
// the image name. Wildcard filters are supported, but result in a very
// complex data set to parse.
func ProcUsage(pcpu *float64, rss, vss *int64) error {
var ppid int = -1
// initialize our counters
func initCounters() (err error) {

imageLock.Lock()
name := imageName
imageLock.Unlock()
processPid = os.Getpid()
// require an addressible nil pointer
var source uint16
if err := pdhOpenQuery(&source, 0, &pcHandle); err != nil {
return err
}

// Get the pid to retrieve the right set of information for this process.
procPid := os.Getpid()
// setup the performance counters, search for all server instances
name := fmt.Sprintf("%s*", getProcessImageName())
pidQuery := fmt.Sprintf("\\Process(%s)\\ID Process", name)
cpuQuery := fmt.Sprintf("\\Process(%s)\\%% Processor Time", name)
rssQuery := fmt.Sprintf("\\Process(%s)\\Working Set - Private", name)
vssQuery := fmt.Sprintf("\\Process(%s)\\Virtual Bytes", name)

// if we have cached the image name, try that first
if name != "" {
err := getStatsForProcess(name, pcpu, rss, vss, &ppid)
if err != nil {
return err
}
// If the instance name's pid matches ours, we're done.
// Otherwise, this instance has been renamed, which is possible
// as other process instances start and stop on the system.
if ppid == procPid {
return nil
}
if err = pdhAddCounter(pcHandle, pidQuery, 0, &pidCounter); err != nil {
return err
}
// If we get here, the instance name is invalid (nil, or out of sync)
// Query pid and counters until the correct image name is found and
// cache it. This is optimized for one or two instances on a windows
// node. An alternative is using a wildcard to first lookup up pids,
// and parse those to find instance name, then lookup the
// performance counters.
prefix := getProcessImageName()
for i := 0; ppid != procPid; i++ {
name = fmt.Sprintf("%s#%d", prefix, i)
err := getStatsForProcess(name, pcpu, rss, vss, &ppid)
if err != nil {
if err = pdhAddCounter(pcHandle, cpuQuery, 0, &cpuCounter); err != nil {
return err
}
if err = pdhAddCounter(pcHandle, rssQuery, 0, &rssCounter); err != nil {
return err
}
if err = pdhAddCounter(pcHandle, vssQuery, 0, &vssCounter); err != nil {
return err
}

// prime the counters by collecting once, and sleep to get somewhat
// useful information the first request. Counters for the CPU require
// at least two collect calls.
if err = pdhCollectQueryData(pcHandle); err != nil {
return err
}
time.Sleep(50)

return nil
}

// ProcUsage returns process CPU and memory statistics
func ProcUsage(pcpu *float64, rss, vss *int64) error {
var err error

// For simplicity, protect the entire call.
// Most simultaneous requests will immediately return
// with cached values.
pcQueryLock.Lock()
defer pcQueryLock.Unlock()

// First time through, initialize counters.
if initialSample {
if err = initCounters(); err != nil {
return err
}
initialSample = false
} else if time.Since(lastSampleTime) < (2 * time.Second) {
// only refresh every two seconds as to minimize impact
// on the server.
*pcpu = prevCPU
*rss = prevRss
*vss = prevVss
return nil
}

// Bail out if an image name is not found.
if ppid < 0 {
break
}
// always save the sample time, even on errors.
defer func() {
lastSampleTime = time.Now()
}()

// if the pids equal, this is the right process and cache our
// image name
if ppid == procPid {
imageLock.Lock()
imageName = name
imageLock.Unlock()
// refresh the performance counter data
if err = pdhCollectQueryData(pcHandle); err != nil {
return err
}

// retrieve the data
var pidAry, cpuAry, rssAry, vssAry []float64
if pidAry, err = getCounterArrayData(pidCounter); err != nil {
return err
}
if cpuAry, err = getCounterArrayData(cpuCounter); err != nil {
return err
}
if rssAry, err = getCounterArrayData(rssCounter); err != nil {
return err
}
if vssAry, err = getCounterArrayData(vssCounter); err != nil {
return err
}
// find the index of the entry for this process
idx := int(-1)
for i := range pidAry {
if int(pidAry[i]) == processPid {
idx = i
break
}
}
if ppid < 0 {
return errors.New("unable to retrieve process counters")
// no pid found...
if idx < 0 {
return fmt.Errorf("could not find pid in performance counter results")
}
// assign values from the performance counters
*pcpu = cpuAry[idx]
*rss = int64(rssAry[idx])
*vss = int64(vssAry[idx])

// save off cache values
prevCPU = *pcpu
prevRss = *rss
prevVss = *vss

return nil
}

0 comments on commit 0378f6d

Please sign in to comment.