From ed1902f85134f2ef08e1dfcae99fa4d95f0c3a7a Mon Sep 17 00:00:00 2001 From: Peter Boothe Date: Fri, 21 Dec 2018 12:22:57 -0500 Subject: [PATCH 1/3] Adds memoryless package Mmeoryless allows the running of functions as Poisson processes. --- README.md | 4 ++ memoryless/README.md | 5 ++ memoryless/memoryless.go | 97 +++++++++++++++++++++++++++++++++++ memoryless/memoryless_test.go | 62 ++++++++++++++++++++++ 4 files changed, 168 insertions(+) create mode 100644 memoryless/README.md create mode 100644 memoryless/memoryless.go create mode 100644 memoryless/memoryless_test.go diff --git a/README.md b/README.md index 4b6a63a..49b206d 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,10 @@ Extensions for the flag package. ### httpx Extensions of the http package. +### memoryless +Tools to run a function as a memoryless poisson process. Helps prevent spurious +patterns. + ### osx Extensions of the os package. diff --git a/memoryless/README.md b/memoryless/README.md new file mode 100644 index 0000000..755499f --- /dev/null +++ b/memoryless/README.md @@ -0,0 +1,5 @@ +Functions which run a given function as a memoryless Poisson process. + +This is very useful if your function generates a gauge measurement or it exerts +load on the system in some way. By distributing the measurement or load across +time, we help ensure that our systems' data is minimally affected. diff --git a/memoryless/memoryless.go b/memoryless/memoryless.go new file mode 100644 index 0000000..f75db64 --- /dev/null +++ b/memoryless/memoryless.go @@ -0,0 +1,97 @@ +// Package memoryless helps repeated calls to a function be distributed across +// time in a memoryless fashion. +package memoryless + +import ( + "context" + "fmt" + "math/rand" + "time" +) + +// Config represents the time we should wait. "Once" is provided as a helper, +// because frequently for unit testing and integration testing, you only want +// the "Forever" loop to run once. +// +// The zero value of this struct has Once set to false, which means the value +// only needs to be set explicitly in codepaths where it might be true. +type Config struct { + Expected, Min, Max time.Duration + Once bool +} + +func (c Config) waittime() time.Duration { + wt := time.Duration(rand.ExpFloat64() * float64(c.Expected)) + if wt < c.Min { + wt = c.Min + } + if c.Max != 0 && wt > c.Max { + wt = c.Max + } + return wt +} + +// Run calls the given function repeatedly, waiting a c.Expected amount of time +// between calls on average. The wait time is actually random and will generate +// a memoryless (Poisson) distribution of f() calls in time, ensuring that f() +// has the PASTA property (Poisson Arrivals See Time Averages). This statistical +// guarantee is subject to two caveats. +// +// Caveat 1 is that, in a nod to the realities of systems needing to have +// guarantees, we allow the random wait time to be clamped both above and below. +// This means that calls to f() should be at least c.Min and at most c.Max apart +// in time. This clamping causes bias in the timing. For use of this function to +// be statistically sensible, the clamping should not be too extreme. The exact +// mathematical meaning of "too extreme" depends on your situation, but a nice +// rule of thumb is c.Min should be at most 10% of expected and c.Max should be +// at least 250% of expected. These values mean that less than 10% of time you +// will be waiting c.Min and less than 10% of the time you will be waiting +// c.Max. +// +// Caveat 2 is that this assumes that the function f() takes negligible time to +// run when compared to the expected wait time. Technically memoryless events +// have the property that the times between successive event starts has the +// exponential distribution, and this code will not start a new call to f() +// before the old one has completed, which provides a lower bound on wait times. +func Run(ctx context.Context, f func(), c Config) error { + if !(0 <= c.Min && c.Min <= c.Expected && (c.Max == 0 || c.Expected <= c.Max)) { + return fmt.Errorf( + "The arguments to Run make no sense. It should be true that Min <= Expected <= Max (or Min <= Expected and Max is 0), "+ + "but that is not true for Min(%v) Expected(%v) Max(%v).", + c.Min, c.Expected, c.Max) + } + if c.Once { + f() + return nil + } + // We use this flow control method because select {} doesn't promise that + // multiple channels will get selected with equal probability. By using a + // "done" variable, we ensure that no matter why the wait was ended (which is + // what the select in the loop does - it ends the wait for one reason or + // another), if the context was canceled then the "done" variable will be set + // to true as soon as the goroutine resumes. Goroutines get scheduled by a + // central scheduler that promises no starvation, so the "the timer is always + // finished by the time the function is done" condition will not have the + // potential to cause livelock for Run() + done := false + go func() { + <-ctx.Done() + done = true + }() + for !done { + // Start the timer before the function call because the time between function + // call *starts* should be exponentially distributed. + t := time.NewTimer(c.waittime()) + f() + // Wait until the timer is done or the context is canceled. If both conditions + // are true, which case gets called is unspecified. + select { + case <-ctx.Done(): + case <-t.C: + } + } + // The only way this function can return is if the goroutine set done to true, + // which means that the goroutine is no longer blocked on the channel read. + // Therefore, the goroutine does not leak. + return nil +} diff --git a/memoryless/memoryless_test.go b/memoryless/memoryless_test.go new file mode 100644 index 0000000..9c8d084 --- /dev/null +++ b/memoryless/memoryless_test.go @@ -0,0 +1,62 @@ +package memoryless_test + +import ( + "context" + "testing" + "time" + + "github.com/m-lab/go/memoryless" + "github.com/m-lab/go/rtx" +) + +func TestRunWithBadArgs(t *testing.T) { + f := func() { panic("should not be called") } + for _, c := range []memoryless.Config{ + {Expected: -1}, + {Min: -1}, + {Max: -1}, + {Min: -3, Expected: -2, Max: -1}, + {Min: 1}, + {Min: 2, Max: 1}, + {Expected: 2, Max: 1}, + {Min: 2, Expected: 1}, + } { + err := memoryless.Run(context.Background(), f, c) + if err == nil { + t.Errorf("Should have had an error running config %v", c) + } + } +} + +func TestRunOnce(t *testing.T) { + count := 0 + f := func() { count++ } + rtx.Must( + memoryless.Run(context.Background(), f, memoryless.Config{Once: true}), + "Bad time config") + if count != 1 { + t.Errorf("Once should mean once, not %d.", count) + } +} + +func TestRunForever(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + // We use count rather than a waitgroup because an extra call to f() shouldn't + // cause the test to fail - cancel() races with the timer, and that's both + // fundamental and okay. Contexts can be canceled() multiple times no problem, + // but if you ever call .Done() on a WaitGroup more times than you .Add(), you + // get a panic. + count := 1000 + f := func() { + if count < 0 { + cancel() + } else { + count-- + } + } + wt := time.Duration(1 * time.Microsecond) + go memoryless.Run(ctx, f, memoryless.Config{Expected: wt, Min: wt, Max: wt}) + <-ctx.Done() + // If this does not run forever, then f() was called at least 100 times and + // then the context was canceled. +} From ec1f51bfaaf952e565bce240839ecf82e69467f9 Mon Sep 17 00:00:00 2001 From: Peter Boothe Date: Fri, 21 Dec 2018 15:09:20 -0500 Subject: [PATCH 2/3] Cleaned up logic, added comments. --- memoryless/memoryless.go | 54 +++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/memoryless/memoryless.go b/memoryless/memoryless.go index f75db64..96caae4 100644 --- a/memoryless/memoryless.go +++ b/memoryless/memoryless.go @@ -9,15 +9,26 @@ import ( "time" ) -// Config represents the time we should wait. "Once" is provided as a helper, -// because frequently for unit testing and integration testing, you only want -// the "Forever" loop to run once. +// Config represents the time we should wait between runs of the function. // -// The zero value of this struct has Once set to false, which means the value -// only needs to be set explicitly in codepaths where it might be true. +// A valid config will have: +// 0 <= Min <= Expected <= Max (or 0 <= Min <= Expected and Max is 0) type Config struct { - Expected, Min, Max time.Duration - Once bool + // Expected records the expected/mean/average amount of time between runs. + Expected time.Duration + // Min provides clamping of the randomly produced value. All timers will wait + // at least Min time. + Min time.Duration + // Max provides clamping of the randomly produced value. All timers will take + // at least Max time. + Max time.Duration + + // Once is provided as a helper, because frequently for unit testing and + // integration testing, you only want the "Forever" loop to run once. + // + // The zero value of this struct has Once set to false, which means the value + // only needs to be set explicitly in codepaths where it might be true. + Once bool } func (c Config) waittime() time.Duration { @@ -64,21 +75,9 @@ func Run(ctx context.Context, f func(), c Config) error { f() return nil } - // We use this flow control method because select {} doesn't promise that - // multiple channels will get selected with equal probability. By using a - // "done" variable, we ensure that no matter why the wait was ended (which is - // what the select in the loop does - it ends the wait for one reason or - // another), if the context was canceled then the "done" variable will be set - // to true as soon as the goroutine resumes. Goroutines get scheduled by a - // central scheduler that promises no starvation, so the "the timer is always - // finished by the time the function is done" condition will not have the - // potential to cause livelock for Run() - done := false - go func() { - <-ctx.Done() - done = true - }() - for !done { + // When Done() is not closed and the Deadline has not been exceeded, the error + // is nil. + for ctx.Err() == nil { // Start the timer before the function call because the time between function // call *starts* should be exponentially distributed. t := time.NewTimer(c.waittime()) @@ -87,11 +86,16 @@ func Run(ctx context.Context, f func(), c Config) error { // are true, which case gets called is unspecified. select { case <-ctx.Done(): + // Clean up the timer. + t.Stop() + // Please don't put logic here that assumes that this code path will + // definitely execute if the context is done. select {} doesn't promise that + // multiple channels will get selected with equal probability, which means + // that if f() takes a while and c.Max is low, then it could be true that the + // timer is done AND the context is canceled, and we have no guarantee that + // in that case the canceled context case will be the one that is selected. case <-t.C: } } - // The only way this function can return is if the goroutine set done to true, - // which means that the goroutine is no longer blocked on the channel read. - // Therefore, the goroutine does not leak. return nil } From 9f2bb937c6cdcad6a10717087375820e5ef83cdc Mon Sep 17 00:00:00 2001 From: Peter Boothe Date: Fri, 21 Dec 2018 15:19:19 -0500 Subject: [PATCH 3/3] typo fix --- memoryless/memoryless.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memoryless/memoryless.go b/memoryless/memoryless.go index 96caae4..3546fd6 100644 --- a/memoryless/memoryless.go +++ b/memoryless/memoryless.go @@ -20,7 +20,7 @@ type Config struct { // at least Min time. Min time.Duration // Max provides clamping of the randomly produced value. All timers will take - // at least Max time. + // at most Max time. Max time.Duration // Once is provided as a helper, because frequently for unit testing and