Skip to content

Commit

Permalink
create appender for each namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
mtanda committed Apr 12, 2018
1 parent 55a469e commit e66ec78
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,27 @@ func (archiver *Archiver) archive(ctx context.Context) error {

archiver.db.DisableCompactions()
app := archiver.db.Appender()
l := make(labels.Labels, 0)
l = append(l, labels.Label{Name: "__name__", Value: "dummy"})
if _, err = app.Add(l, startTime.Unix()*1000, 0); err != nil {
return err
}
if err := app.Commit(); err != nil {
return err
}
appenders := make(map[int]*tsdb.Appender)
for i := range archiver.namespace {
app := archiver.db.Appender()
appenders[i] = &app
}
for {
select {
case <-ft.C:
ft.Reset(1 * time.Second / time.Duration(cps))

if len(matchedLabelsList) > 0 {
matchedLabels := matchedLabelsList[archiver.s.Index]
err = archiver.process(app, matchedLabels, startTime, endTime)
err = archiver.process(*appenders[archiver.s.Namespace], matchedLabels, startTime, endTime)
if err != nil {
return err
}
Expand All @@ -212,9 +225,10 @@ func (archiver *Archiver) archive(ctx context.Context) error {
}

archiver.db.EnableCompactions()
if err := app.Commit(); err != nil {
if err := (*appenders[lastNamespace]).Commit(); err != nil {
return err
}
appenders[lastNamespace] = nil // release appender
archiver.s.Timestamp[archiver.namespace[lastNamespace]] = endTime.Add(-1 * time.Second).Unix()

level.Info(archiver.logger).Log("namespace", archiver.namespace[lastNamespace], "index", archiver.s.Index, "len", len(matchedLabelsList))
Expand All @@ -231,9 +245,10 @@ func (archiver *Archiver) archive(ctx context.Context) error {

return nil
} else {
if err := app.Commit(); err != nil {
if err := (*appenders[lastNamespace]).Commit(); err != nil {
return err
}
appenders[lastNamespace] = nil // release appender
archiver.s.Timestamp[archiver.namespace[lastNamespace]] = endTime.Add(-1 * time.Second).Unix()
if err := archiver.saveState(); err != nil {
return err
Expand All @@ -253,17 +268,6 @@ func (archiver *Archiver) archive(ctx context.Context) error {
archiverTargetsTotal.WithLabelValues(archiver.namespace[archiver.s.Namespace]).Set(float64(len(matchedLabelsList)))
}
}
case <-wt.C:
wt.Reset(1 * time.Minute)

if err := app.Commit(); err != nil {
return err
}
if err := archiver.saveState(); err != nil {
return err
}
level.Info(archiver.logger).Log("namespace", archiver.namespace[archiver.s.Namespace], "index", archiver.s.Index, "len", len(matchedLabelsList))
archiverTargetsProgress.WithLabelValues(archiver.namespace[archiver.s.Namespace]).Set(float64(archiver.s.Index))
case <-actx.Done():
if !ft.Stop() {
<-ft.C
Expand Down

0 comments on commit e66ec78

Please sign in to comment.