Skip to content

Commit

Permalink
Add a web server to report memory utilization
Browse files Browse the repository at this point in the history
New -p (port) flag and environment variable specifies listen port.
Coalesce fmt.Print some statements to reduce stdout race window.
And improve code comments in pingtimes and cw_put.
  • Loading branch information
JohnADilley committed May 22, 2019
1 parent e90a0f4 commit 610d090
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 33 deletions.
18 changes: 18 additions & 0 deletions README.md
Expand Up @@ -326,6 +326,23 @@ if the application encountered an issue. You can also get a shell to try
out the application locally. (The shell does not work in firefox, so use
chrome or safari for this feature.)

## Looking at Memory Utilization

I was curious how much memory I was using in my workload, and if I had a
memory leak. Debugging this in a docker is not so easy (especially a
minimal one), and doing so in multiple locations is even harder. So I
added a simple web server function to my application to return memory usage
information using the golang `runtime.MemStats` feature.

Docker version v5 includes this new flag. Here's an example, using a
not-so-random port, testing against google every 30 seconds:

``` shell
docker run perftest:v5 -d 30 -s 52378 https://www.google.com/
```

View memory stats by pointing your browser to `localhost:52378/memstats`.

## Viewing Workload Results

There are a couple ways to see what is happening.
Expand All @@ -345,3 +362,4 @@ You'll note that the Rafay workload picks up a location label automatically
from the environment: we put REP_LOCATION, and a few other items, into the
shell environment of every container. You can see them by running a
go-httpbin testapp we also provide (at the /env/ endpoint).

50 changes: 42 additions & 8 deletions perftest.go
Expand Up @@ -3,7 +3,6 @@
// application response time. It can publish data to Cloudwatch, publish the details to
// a webhook, such as a StreamSets endpoint, or trigger alerts via Twilio.
// From https://github.com/davecheney/httpstat, from https://github.com/reorx/httpstat.

package main

import (
Expand Down Expand Up @@ -56,6 +55,7 @@ var (
alertInterval = flag.Int64("M", 300, "minimum time interval between generated alerts (seconds)")
cwFlag = flag.Bool("c", false, "Publish metrics to CloudWatch (requires AWS credentials in env)")
webhook = flag.String("W", "", "Webhook target URL to receive JSON log details via POST")
portFlag = flag.Int("p", 0, "run web server on this port (if non-zero) to report stats")
qf = flag.Bool("q", false, "be quiet, not verbose")
vf1 = flag.Bool("v", false, "be verbose")
vf2 = flag.Bool("V", false, "be more verbose")
Expand Down Expand Up @@ -217,8 +217,33 @@ func main() {
log.Println("testing ", urls, "from", util.LocationOrIp(&myLocation))
}

if !*jsonFlag {
util.TextHeader(os.Stdout)
serverPort := 0
if portEnv, found := os.LookupEnv("PERFTEST_LISTEN_PORT"); found {
val, err := strconv.Atoi(portEnv)
if err != nil {
log.Println("ERROR: cannot parse PERFTEST_LISTEN_PORT as an int:", err)
return
}
serverPort = val
}

if *portFlag > 0 {
if serverPort > 0 {
log.Println("NOTE: command line port", *portFlag, "overrides listen port from env", serverPort)
}
serverPort = *portFlag
}

if serverPort > 0 {
if serverPort > 65535 {
log.Println("bad port > 65535", serverPort)
return
}

if verbose > 0 {
log.Printf("Starting responder on http://localhost:%d\n", serverPort)
}
go util.StartServer(serverPort)
}

////
Expand All @@ -234,11 +259,14 @@ func main() {
signal.Notify(sigchan, syscall.SIGTERM)
go func() {
for sig := range sigchan {
fmt.Println("\nreceived", sig, "signal, terminating")
if verbose > 1 {
fmt.Println("\nreceived", sig, "signal, terminating")
}
if doneChan != nil {
close(doneChan)
doneChan = nil
}
close(sigchan)
}
}()

Expand All @@ -251,6 +279,12 @@ func main() {
if verbose > 1 {
log.Println("waiting for children to exit")
}

if !*jsonFlag {
// put header after any debug messages, but there's a race condition here :-)
util.TextHeader(os.Stdout)
}

wg.Wait()

if verbose > 2 {
Expand Down Expand Up @@ -307,11 +341,10 @@ func testHttp(uri string, numTries int, done <-chan int, wg *sync.WaitGroup) {
defer func() { // summary printer, runs upon return
elapsed := hhmmss(time.Now().Unix() - ptSummary.Start.Unix())

fmt.Printf("\nRecorded %d samples in %s, average values:\n",
count, elapsed)
fc := float64(count) // count will be 1 by time this runs
util.TextHeader(os.Stdout)
fmt.Printf("%d %-6s\t%.03f\t%.03f\t%.03f\t%.03f\t%.03f\t%.03f\t\t%d\t%s\t%s\n\n",
fmt.Printf("\nRecorded %d samples in %s, average values:\n"+"%s"+
"%d %-6s\t%.03f\t%.03f\t%.03f\t%.03f\t%.03f\t%.03f\t\t%d\t%s\t%s\n\n",
count, elapsed, util.PingTimesHeader(),
count, elapsed,
util.Msec(ptSummary.DnsLk)/fc,
util.Msec(ptSummary.TcpHs)/fc,
Expand All @@ -324,6 +357,7 @@ func testHttp(uri string, numTries int, done <-chan int, wg *sync.WaitGroup) {
"", // TODO: report summary of each from location?
*ptSummary.DestUrl)
}()
// TODO: report all summaries just before program exit (not thread exit)
} else {
ptSummary.DnsLk += pt.DnsLk
ptSummary.TcpHs += pt.TcpHs
Expand Down
13 changes: 2 additions & 11 deletions util/cw_put.go
Expand Up @@ -9,8 +9,6 @@ import (
"time"
)

// To publish AWS inventory (as a one time task) see awsutil/cw_inventory/put_inventory.go.

// Publish resonse time in milliseconds as metric "RespTime" in CloudWatch namespace "Perf Demo".
// Could be made more generic of course. This is for Rafay Feb Demo A.
//
Expand All @@ -19,13 +17,6 @@ import (
// AWS_ACCESS_KEY_ID
// AWS_SECRET_ACCESS_KEY
func PublishRespTime(location, url, respCode string, respTime float64) {

/* region := os.Getenv("AWS_CW_REGION")
// set AWS_REGION in environment instead -- used by default by AWS API
if len(region) == 0 {
region = "us-west-2"
}*/

// Load credentials from the shared credentials file ~/.aws/credentials
// and configuration from the shared configuration file ~/.aws/config.
sess := session.Must(session.NewSession())
Expand All @@ -44,7 +35,7 @@ func PublishRespTime(location, url, respCode string, respTime float64) {

MetricData: []*cloudwatch.MetricDatum{
&cloudwatch.MetricDatum{
Timestamp: &timestamp, // TODO: take from PingTime data?
Timestamp: &timestamp,
MetricName: aws.String(metric),
Value: aws.Float64(respTime),
Unit: aws.String(cloudwatch.StandardUnitMilliseconds),
Expand All @@ -59,7 +50,7 @@ func PublishRespTime(location, url, respCode string, respTime float64) {
},
&cloudwatch.Dimension{
Name: aws.String("FromLocation"),
Value: aws.String(location),
Value: aws.String(LocationOrIp(&location)),
},
},
},
Expand Down
46 changes: 32 additions & 14 deletions util/pingtimes.go
@@ -1,7 +1,5 @@
package util

// HTTP reseponse time data structure and methods

import (
"encoding/json"
"fmt"
Expand All @@ -10,7 +8,8 @@ import (
"time"
)

// Components of an HTTP ping request for reporting performance (to cloudwatch, or whatever)
// PingTimes holds the components of an HTTP ping request for reporting
// performance (to cloudwatch, or whatever). See the receiver methods below.
type PingTimes struct {
Start time.Time // time we started the ping
DnsLk time.Duration // DNS Lookup
Expand All @@ -26,18 +25,18 @@ type PingTimes struct {
Size int64 // total response bytes
}

// Response time is the total duration from the TCP open until the TCP close.
// DNS lookup time is not included in this measure.
// Will be zero iff the request failed.
// This method sets the value in the object to the sum. Call this before dumping as JSON!
// RespTime returns the total duration from the TCP open until the TCP close.
// DNS lookup time is NOT included in this measure.
// The time.Duration returned will be zero iff the request failed.
// This method sets or changes the pt.Total value.
func (pt *PingTimes) RespTime() time.Duration {
if pt.Total == 0 {
pt.Total = pt.DnsLk + pt.TcpHs + pt.TlsHs + pt.Reply + pt.Close
}
return pt.Total
}

// Msec returns the duration as a floating point number of seconds.
// Msec converts a time.Duration to a floating point number of seconds.
func Msec(d time.Duration) float64 {
sec := d / time.Second
nsec := d % time.Second
Expand All @@ -46,7 +45,9 @@ func Msec(d time.Duration) float64 {

var myIp *string

// Return my outbound IP address as a string
// GetMyIp returns your primary local IP address (as from `ifconfig`).
// Note that a docker container has a different primary local IP address
// (172.something) than your base host.
func GetMyIp() string {
if myIp == nil {
conn, err := net.Dial("udp", "8.8.8.8:53")
Expand All @@ -60,6 +61,8 @@ func GetMyIp() string {
return *myIp
}

// LocationOrIp returns the loc string, if it has a value, or else the
// IP address as determined by GetMyIp().
func LocationOrIp(loc *string) string {
if loc != nil && len(*loc) > 0 {
return *loc
Expand All @@ -68,6 +71,8 @@ func LocationOrIp(loc *string) string {
}
}

// String returns a canonical string representation of the PingTimes
// argument using golang native representatino of time.Duration.
func (pt *PingTimes) String() string {
return fmt.Sprintln(
"DnsLk:", pt.DnsLk, // DNS lookup
Expand All @@ -81,15 +86,18 @@ func (pt *PingTimes) String() string {
)
}

// I don't know why golang string doesn't already have this feature.
func SafeStrPtr(sp *string, ifnil string) string {
if sp == nil || *sp == "" {
return ifnil
}
return *sp
}

// Return tab separated values: Unix timestamp first then msec time values for
// each of the time component fields as msec.uuu (three digits of microseconds).
// MsecTsv returns a tab separated values string with the (Unix epoch)
// timestamp of the start of the test followed by the msec time deltas
// for each of the time component fields as msec.uuu (three digits of
// microseconds), and then the other values of PingTimes.
func (pt *PingTimes) MsecTsv() string {
return fmt.Sprintf("%d\t%.03f\t%.03f\t%.03f\t%.03f\t%.03f\t%.03f\t%03d\t%d\t%s\t%s\t%s",
pt.Start.Unix(),
Expand All @@ -106,8 +114,17 @@ func (pt *PingTimes) MsecTsv() string {
SafeStrPtr(pt.DestUrl, "noUrl"))
}

// TextHeader dumps a column header corresponding to the values onto the
// file handle provided. (Included for backwards compatibility.)
func TextHeader(file *os.File) {
fmt.Fprintf(file, "# %s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
fmt.Fprintf(file, "%s", PingTimesHeader())
}

// PingTimesHeader returns a column header string with field names
// corresponding to the values onto the PingTimes structure, in the
// same order as the string returned by MsecTsv().
func PingTimesHeader() string {
return fmt.Sprintf("# %s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
"timestamp",
"DNS",
"TCP",
Expand All @@ -122,12 +139,13 @@ func TextHeader(file *os.File) {
"proto://uri")
}

// Write ping times as tab-separated milliseconds into the given open file.
// DumpText writes ping times as tab-separated milliseconds into the file.
func (pt *PingTimes) DumpText(file *os.File) {
fmt.Fprintln(file, pt.MsecTsv())
}

// Write ping times as raw JSON (nanosecond values) into the given open file.
// DumpJson writes ping times as JSON using native (nanosecond)
// timestamp values into the given open file.
func (pt *PingTimes) DumpJson(file *os.File) error {
enc := json.NewEncoder(file)
enc.SetIndent("", " ")
Expand Down
71 changes: 71 additions & 0 deletions util/ponger.go
@@ -0,0 +1,71 @@
package util

import (
"fmt"
"log"
"net/http"
"runtime"
"time"
)

func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}

// StartServer is a goroutine that runs up a web server to listen on the given
// port, and sets up handlers for two endpoints: "/ping" serves a simple HTTP
// reply that includes the request URI and current time; "/memstats" serves a
// simple HTTP page showing current golang memory utilization. The port
// argument should be a valid port number that you can listen on.
//
// StartServer never returns. Invoke it with go StartServer(yourPort).
func StartServer(port int) {
http.HandleFunc("/ping", pongReply)
http.HandleFunc("/memstats", memStatsReply)

max := 5 // 5 tries = 15 seconds (linear backoff -- 5th triangular number)

addr := fmt.Sprintf(":%d", port)
// The ListenAndServe call should not return. If it does the address may be in use so retry below.
err := http.ListenAndServe(addr, nil)

tries := 0
for tries < max {
tries++
// sleep a little while (longer each time through the loop)
log.Println(err, "sleep", tries)
time.Sleep(time.Duration(tries) * time.Second)
// now try again ... it may take a while for a previous instance to exit
err = http.ListenAndServe(addr, nil)
}

if err != nil {
// if error is anything else print the error and return
log.Println(err)
}
}

func pongReply(w http.ResponseWriter, r *http.Request) {
now := time.Now()
fmt.Fprintln(w, "<h1>pong</h1><p>reply to", r.URL.Path, "@", now)
log.Println("pongReply", r.RemoteAddr)
}

func memStatsReply(w http.ResponseWriter, r *http.Request) {
// with thanks from https://golangcode.com/print-the-current-memory-usage/
var m runtime.MemStats
runtime.ReadMemStats(&m)
active := m.Mallocs - m.Frees
alloc := bToMb(m.Alloc)
fmt.Fprintln(w, "<h1>MemStats</h1>\n<p>", time.Now())
// The number of live objects is Mallocs - Frees.
fmt.Fprintf(w, "<br>Active Objects = %v\n", active)
// HeapAlloc is bytes of allocated heap objects.
fmt.Fprintf(w, "<br>Active Bytes = %v MiB\n", alloc)
// Sys is the total bytes of memory obtained from the OS.
fmt.Fprintf(w, "<br>OS Bytes = %v MiB\n", bToMb(m.Sys))
// NumGC is the number of completed GC cycles.
fmt.Fprintf(w, "<br>Num GCs = %v\n</p>", m.NumGC)

log.Println("pongReply", r.RemoteAddr, active, alloc)
}

0 comments on commit 610d090

Please sign in to comment.