From 1091c3fd653631c5af34b49b530ea24f4188c8f7 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Thu, 20 Sep 2018 21:18:34 -0700 Subject: [PATCH] stats/view: implement Flush Flush is a global function that immediately reports all collected data by the worker, regardless of the reporting duration or buffering. Added tests but also ensured that calling (*worker).stop() then Flush doesn't block and returns ASAP if the state is "quit". The required protected w.quit from double close but also using a select to detect cases of forever waiting channel `w.quit` which was closed. Fixes #862 --- stats/view/view_measure_test.go | 2 ++ stats/view/worker.go | 27 ++++++++++++-- stats/view/worker_commands.go | 11 ++++++ stats/view/worker_test.go | 62 +++++++++++++++++++++++++++++++++ 4 files changed, 100 insertions(+), 2 deletions(-) diff --git a/stats/view/view_measure_test.go b/stats/view/view_measure_test.go index 6ee8413d4..8675e0354 100644 --- a/stats/view/view_measure_test.go +++ b/stats/view/view_measure_test.go @@ -8,6 +8,8 @@ import ( ) func TestMeasureFloat64AndInt64(t *testing.T) { + restart() + // Recording through both a Float64Measure and Int64Measure with the // same name should work. diff --git a/stats/view/worker.go b/stats/view/worker.go index 9255d27d2..11bd18645 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -17,6 +17,7 @@ package view import ( "fmt" + "sync" "time" "go.opencensus.io/stats" @@ -43,6 +44,8 @@ type worker struct { timer *time.Ticker c chan command quit, done chan bool + flushCh chan bool + quitOnce sync.Once } var defaultWorker *worker @@ -142,6 +145,21 @@ func newWorker() *worker { c: make(chan command, 1024), quit: make(chan bool), done: make(chan bool), + flushCh: make(chan bool), + } +} + +// Flush reports all collected points regardless +// of the time reporting period or buffering. +func Flush() { + select { + case <-defaultWorker.quit: + // If this channel is closed i.e. we quit, do nothing. + return + default: // Otherwise we can proceed with flushing. + req := &flushReq{c: make(chan bool)} + defaultWorker.c <- req + <-req.c // don't return until the flush is complete. } } @@ -162,8 +180,13 @@ func (w *worker) start() { } func (w *worker) stop() { - w.quit <- true - <-w.done + w.quitOnce.Do(func() { + // Close w.quit so that any operations that need + // to check if we've stopped/quit will immediately + // select on w.quit. + close(w.quit) + <-w.done + }) } func (w *worker) getMeasureRef(name string) *measureRef { diff --git a/stats/view/worker_commands.go b/stats/view/worker_commands.go index 06c3c5464..3ed258835 100644 --- a/stats/view/worker_commands.go +++ b/stats/view/worker_commands.go @@ -73,6 +73,17 @@ func (cmd *registerViewReq) handleCommand(w *worker) { } } +// flushReq is the command to flush all recorded +// data regardless of time period and buffering. +type flushReq struct { + c chan bool +} + +func (fr *flushReq) handleCommand(w *worker) { + w.reportUsage(time.Now()) + fr.c <- true +} + // unregisterFromViewReq is the command to unregister to a view. Has no // impact on the data collection for client that are pulling data from the // library. diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go index d43014648..0d09a6a7a 100644 --- a/stats/view/worker_test.go +++ b/stats/view/worker_test.go @@ -397,6 +397,68 @@ func TestUnregisterReportsUsage(t *testing.T) { } } +func TestFlush(t *testing.T) { + restart() + ctx := context.Background() + + SetReportingPeriod(time.Hour) + + m1 := stats.Int64("measure", "desc", "unit") + view1 := &View{Name: "count", Measure: m1, Aggregation: Count()} + m2 := stats.Int64("measure2", "desc", "unit") + view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()} + + if err := Register(view1, view2); err != nil { + t.Fatalf("cannot register: %v", err) + } + + e := &countExporter{} + RegisterExporter(e) + + // Irrespective of the reporting period, with Flush + // all the recorded points should be reported. Hence we'll + // set an arbitrarily large period of 1 hr. + SetReportingPeriod(time.Hour) + + stats.Record(ctx, m1.M(1)) + stats.Record(ctx, m2.M(3)) + stats.Record(ctx, m2.M(1)) + + <-time.After(40 * time.Millisecond) + Flush() + <-time.After(40 * time.Millisecond) + + e.Lock() + got := e.totalCount + e.Unlock() + want := int64(3) // Number of wanted data points + if got != want { + t.Errorf("Count data\nGot: %d\nWant: %v", got, want) + } +} + +func TestFlush_afterStopDoesnotBlock(t *testing.T) { + restart() + + doneCh := make(chan bool) + go func() { + defer close(doneCh) + + for i := 0; i < 10; i++ { + Flush() + defaultWorker.stop() + Flush() + } + }() + + select { + case <-time.After(300 * time.Microsecond): // Arbitrary duration that's considered "long" + t.Fatal("Flush + stop goroutine did not return on time") + case <-doneCh: + // returned ASAP so okay + } +} + type countExporter struct { sync.Mutex count int64