From 049a786abcbf39af74fd44a02bf73a2acec1ecfd Mon Sep 17 00:00:00 2001 From: Mitsuhiro Tanda Date: Thu, 8 Feb 2018 20:14:28 +0900 Subject: [PATCH] wait by err group --- archive.go | 11 +++++++---- index.go | 12 ++++++++---- main.go | 9 +++++++-- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/archive.go b/archive.go index 6db51157..f06c12b5 100644 --- a/archive.go +++ b/archive.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/labels" + "golang.org/x/sync/errgroup" ) type Archiver struct { @@ -91,7 +92,7 @@ func NewArchiver(ctx context.Context, cfg ArchiveConfig, storagePath string, ind }, nil } -func (archiver *Archiver) start() { +func (archiver *Archiver) start(eg errgroup.Group) { if len(archiver.namespace) == 0 { return } @@ -105,10 +106,12 @@ func (archiver *Archiver) start() { archiver.currentLabelIndex = state.Index } - go archiver.archive() + eg.Go(func() error { + return archiver.archive() + }) } -func (archiver *Archiver) archive() { +func (archiver *Archiver) archive() error { timeMargin := 15 * time.Minute // wait until CloudWatch record metrics //archiveTime := archiver.interval / 4 apiCallRate := 0.5 @@ -240,7 +243,7 @@ func (archiver *Archiver) archive() { case <-archiver.ctx.Done(): level.Info(archiver.logger).Log("msg", "archiving stopped") archiver.db.Close() - return + return nil } } } diff --git a/index.go b/index.go index 2f75090a..4ed4978d 100644 --- a/index.go +++ b/index.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/labels" + "golang.org/x/sync/errgroup" ) type Indexer struct { @@ -82,7 +83,7 @@ func NewIndexer(ctx context.Context, cfg IndexConfig, storagePath string, logger }, nil } -func (indexer *Indexer) start() { +func (indexer *Indexer) start(eg errgroup.Group) { level.Info(indexer.logger).Log("msg", fmt.Sprintf("index region = %s", *indexer.region)) level.Info(indexer.logger).Log("msg", fmt.Sprintf("index namespace = %+v", indexer.namespace)) indexer.indexedTimestampFrom = time.Now().UTC() @@ -90,10 +91,13 @@ func (indexer *Indexer) start() { if err == nil { indexer.indexedTimestampTo = time.Unix(state.Timestamp, 0) } - go indexer.index() + + eg.Go(func() error { + return indexer.index() + }) } -func (indexer *Indexer) index() { +func (indexer *Indexer) index() error { t := time.NewTimer(1 * time.Minute) defer t.Stop() for { @@ -163,7 +167,7 @@ func (indexer *Indexer) index() { case <-indexer.ctx.Done(): level.Info(indexer.logger).Log("msg", "indexing stopped") indexer.db.Close() - return + return nil } } } diff --git a/main.go b/main.go index 6aa4930f..71652270 100644 --- a/main.go +++ b/main.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/promlog" "github.com/prometheus/prometheus/prompb" + "golang.org/x/sync/errgroup" ) var ( @@ -412,6 +413,7 @@ func main() { } // graceful shutdown + eg := errgroup.Group{} ctx := context.Background() ctx, cancel := context.WithCancel(ctx) term := make(chan os.Signal, 1) @@ -425,6 +427,9 @@ func main() { case <-term: level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...") cancel() + if err := eg.Wait(); err != nil { + level.Error(logger).Log("err", err) + } os.Exit(0) case <-ctx.Done(): } @@ -448,13 +453,13 @@ func main() { level.Error(logger).Log("err", err) panic(err) } - indexer.start() + indexer.start(eg) archiver, err := NewArchiver(ctx, readCfg.Targets[0].Archive, cfg.storagePath, indexer, log.With(logger, "component", "archiver")) if err != nil { level.Error(logger).Log("err", err) panic(err) } - archiver.start() + archiver.start(eg) http.Handle("/metrics", prometheus.Handler()) http.HandleFunc("/read", func(w http.ResponseWriter, r *http.Request) {