Skip to content

Commit

Permalink
wait by err group
Browse files Browse the repository at this point in the history
  • Loading branch information
mtanda committed Feb 10, 2018
1 parent ff7cf3e commit 049a786
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 10 deletions.
11 changes: 7 additions & 4 deletions archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
"golang.org/x/sync/errgroup"
)

type Archiver struct {
Expand Down Expand Up @@ -91,7 +92,7 @@ func NewArchiver(ctx context.Context, cfg ArchiveConfig, storagePath string, ind
}, nil
}

func (archiver *Archiver) start() {
func (archiver *Archiver) start(eg errgroup.Group) {
if len(archiver.namespace) == 0 {
return
}
Expand All @@ -105,10 +106,12 @@ func (archiver *Archiver) start() {
archiver.currentLabelIndex = state.Index
}

go archiver.archive()
eg.Go(func() error {
return archiver.archive()
})
}

func (archiver *Archiver) archive() {
func (archiver *Archiver) archive() error {
timeMargin := 15 * time.Minute // wait until CloudWatch record metrics
//archiveTime := archiver.interval / 4
apiCallRate := 0.5
Expand Down Expand Up @@ -240,7 +243,7 @@ func (archiver *Archiver) archive() {
case <-archiver.ctx.Done():
level.Info(archiver.logger).Log("msg", "archiving stopped")
archiver.db.Close()
return
return nil
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
"golang.org/x/sync/errgroup"
)

type Indexer struct {
Expand Down Expand Up @@ -82,18 +83,21 @@ func NewIndexer(ctx context.Context, cfg IndexConfig, storagePath string, logger
}, nil
}

func (indexer *Indexer) start() {
func (indexer *Indexer) start(eg errgroup.Group) {
level.Info(indexer.logger).Log("msg", fmt.Sprintf("index region = %s", *indexer.region))
level.Info(indexer.logger).Log("msg", fmt.Sprintf("index namespace = %+v", indexer.namespace))
indexer.indexedTimestampFrom = time.Now().UTC()
state, err := indexer.loadState()
if err == nil {
indexer.indexedTimestampTo = time.Unix(state.Timestamp, 0)
}
go indexer.index()

eg.Go(func() error {
return indexer.index()
})
}

func (indexer *Indexer) index() {
func (indexer *Indexer) index() error {
t := time.NewTimer(1 * time.Minute)
defer t.Stop()
for {
Expand Down Expand Up @@ -163,7 +167,7 @@ func (indexer *Indexer) index() {
case <-indexer.ctx.Done():
level.Info(indexer.logger).Log("msg", "indexing stopped")
indexer.db.Close()
return
return nil
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/promlog"
"github.com/prometheus/prometheus/prompb"
"golang.org/x/sync/errgroup"
)

var (
Expand Down Expand Up @@ -412,6 +413,7 @@ func main() {
}

// graceful shutdown
eg := errgroup.Group{}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
term := make(chan os.Signal, 1)
Expand All @@ -425,6 +427,9 @@ func main() {
case <-term:
level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...")
cancel()
if err := eg.Wait(); err != nil {
level.Error(logger).Log("err", err)
}
os.Exit(0)
case <-ctx.Done():
}
Expand All @@ -448,13 +453,13 @@ func main() {
level.Error(logger).Log("err", err)
panic(err)
}
indexer.start()
indexer.start(eg)
archiver, err := NewArchiver(ctx, readCfg.Targets[0].Archive, cfg.storagePath, indexer, log.With(logger, "component", "archiver"))
if err != nil {
level.Error(logger).Log("err", err)
panic(err)
}
archiver.start()
archiver.start(eg)

http.Handle("/metrics", prometheus.Handler())
http.HandleFunc("/read", func(w http.ResponseWriter, r *http.Request) {
Expand Down

0 comments on commit 049a786

Please sign in to comment.