Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate diskFrontier domain for series candidate. #115

Merged
merged 1 commit into from
Apr 9, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 13 additions & 3 deletions storage/metric/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ type diskFrontier struct {
lastSupertime time.Time
}

func (f *diskFrontier) String() string {
func (f diskFrontier) String() string {
return fmt.Sprintf("diskFrontier from %s at %s to %s at %s", f.firstFingerprint.ToRowKey(), f.firstSupertime, f.lastFingerprint.ToRowKey(), f.lastSupertime)
}

func (f *diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool {
func (f diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool {
return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint))
}

Expand All @@ -48,12 +48,15 @@ func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) {
if !i.SeekToLast() || i.Key() == nil {
return
}

lastKey, err := extractSampleKey(i)
if err != nil {
panic(err)
}

i.SeekToFirst()
if !i.SeekToFirst() || i.Key() == nil {
return
}
firstKey, err := extractSampleKey(i)
if i.Key() == nil {
return
Expand Down Expand Up @@ -92,6 +95,13 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
upperSeek = lastSupertime
)

// If the diskFrontier for this iterator says that the candidate fingerprint
// is outside of its seeking domain, there is no way that a seriesFrontier
// could be materialized. Simply bail.
if !d.ContainsFingerprint(f) {
return
}

// If we are either the first or the last key in the database, we need to use
// pessimistic boundary frontiers.
if f.Equal(d.firstFingerprint) {
Expand Down
2 changes: 1 addition & 1 deletion storage/metric/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
type MetricPersistence interface {
// A storage system may rely on external resources and thusly should be
// closed when finished.
Close() error
Close()

// Commit all pending operations, if any, since some of the storage components
// queue work on channels and operate on it in bulk.
Expand Down
76 changes: 19 additions & 57 deletions storage/metric/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
index "github.com/prometheus/prometheus/storage/raw/index/leveldb"
leveldb "github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
"io"
"log"
"sort"
"sync"
Expand Down Expand Up @@ -58,68 +57,31 @@ var (
)

type leveldbOpener func()
type leveldbCloser interface {
Close()
}

func (l *LevelDBMetricPersistence) Close() error {
var persistences = []struct {
name string
closer io.Closer
}{
{
"Fingerprint to Label Name and Value Pairs",
l.fingerprintToMetrics,
},
{
"Fingerprint High Watermarks",
l.metricHighWatermarks,
},
{
"Fingerprint Samples",
l.metricSamples,
},
{
"Label Name to Fingerprints",
l.labelNameToFingerprints,
},
{
"Label Name and Value Pairs to Fingerprints",
l.labelSetToFingerprints,
},
{
"Metric Membership Index",
l.metricMembershipIndex,
},
}

errorChannel := make(chan error, len(persistences))

for _, persistence := range persistences {
name := persistence.name
closer := persistence.closer

go func(name string, closer io.Closer) {
if closer != nil {
closingError := closer.Close()

if closingError != nil {
log.Printf("Could not close a LevelDBPersistence storage container; inconsistencies are possible: %q\n", closingError)
}

errorChannel <- closingError
} else {
errorChannel <- nil
}
}(name, closer)
func (l *LevelDBMetricPersistence) Close() {
var persistences = []leveldbCloser{
l.fingerprintToMetrics,
l.metricHighWatermarks,
l.metricSamples,
l.labelNameToFingerprints,
l.labelSetToFingerprints,
l.metricMembershipIndex,
}

for i := 0; i < cap(errorChannel); i++ {
closingError := <-errorChannel
closerGroup := sync.WaitGroup{}

if closingError != nil {
return closingError
}
for _, closer := range persistences {
closerGroup.Add(1)
go func(closer leveldbCloser) {
closer.Close()
closerGroup.Done()
}(closer)
}

return nil
closerGroup.Wait()
}

func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetricPersistence, err error) {
Expand Down
4 changes: 1 addition & 3 deletions storage/metric/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (s memorySeriesStorage) GetRangeValues(fp model.Fingerprint, i model.Interv
return
}

func (s memorySeriesStorage) Close() (err error) {
func (s memorySeriesStorage) Close() {
// This can probably be simplified:
//
// s.fingerPrintToSeries = map[model.Fingerprint]*stream{}
Expand All @@ -344,8 +344,6 @@ func (s memorySeriesStorage) Close() (err error) {
for labelName := range s.labelNameToFingerprints {
delete(s.labelNameToFingerprints, labelName)
}

return
}

func (s memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {
Expand Down
27 changes: 6 additions & 21 deletions storage/metric/rule_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,13 +552,8 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer
func() {
p, closer := persistenceMaker()

defer func() {
defer closer.Close()
err := p.Close()
if err != nil {
t.Fatalf("Encountered anomaly closing persistence: %q\n", err)
}
}()
defer closer.Close()
defer p.Close()

m := model.Metric{
model.MetricNameLabel: "age_in_years",
Expand Down Expand Up @@ -994,13 +989,8 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo
func() {
p, closer := persistenceMaker()

defer func() {
defer closer.Close()
err := p.Close()
if err != nil {
t.Fatalf("Encountered anomaly closing persistence: %q\n", err)
}
}()
defer closer.Close()
defer p.Close()

m := model.Metric{
model.MetricNameLabel: "age_in_years",
Expand Down Expand Up @@ -1348,13 +1338,8 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer
func() {
p, closer := persistenceMaker()

defer func() {
defer closer.Close()
err := p.Close()
if err != nil {
t.Fatalf("Encountered anomaly closing persistence: %q\n", err)
}
}()
defer closer.Close()
defer p.Close()

m := model.Metric{
model.MetricNameLabel: "age_in_years",
Expand Down
7 changes: 1 addition & 6 deletions storage/metric/stochastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,7 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
stochastic := func(x int) (success bool) {
p, closer := persistenceMaker()
defer closer.Close()
defer func() {
err := p.Close()
if err != nil {
t.Error(err)
}
}()
defer p.Close()

seed := rand.NewSource(int64(x))
random := rand.New(seed)
Expand Down
14 changes: 2 additions & 12 deletions storage/metric/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,7 @@ func buildLevelDBTestPersistence(name string, f func(p MetricPersistence, t test
t.Errorf("Could not create LevelDB Metric Persistence: %q\n", err)
}

defer func() {
err := p.Close()
if err != nil {
t.Errorf("Anomaly while closing database: %q\n", err)
}
}()
defer p.Close()

f(p, t)
}
Expand All @@ -72,12 +67,7 @@ func buildMemoryTestPersistence(f func(p MetricPersistence, t test.Tester)) func

p := NewMemorySeriesStorage()

defer func() {
err := p.Close()
if err != nil {
t.Errorf("Anomaly while closing database: %q\n", err)
}
}()
defer p.Close()

f(p, t)
}
Expand Down
20 changes: 5 additions & 15 deletions storage/metric/tiered.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,14 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
return
}

func (t *tieredStorage) rebuildDiskFrontier() (err error) {
func (t *tieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)

recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure})
}()

i := t.diskStorage.metricSamples.NewIterator(true)
defer i.Close()

t.diskFrontier, err = newDiskFrontier(i)
if err != nil {
panic(err)
Expand Down Expand Up @@ -298,8 +295,6 @@ func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDe
flusher: f,
}

// fmt.Printf("fingerprint -> %s\n", model.NewFingerprintFromMetric(stream.metric).ToRowKey())

return visitor, visitor, visitor
}

Expand All @@ -309,11 +304,7 @@ func (f *memoryToDiskFlusher) Flush() {
for i := 0; i < length; i++ {
samples = append(samples, <-f.toDiskQueue)
}
start := time.Now()
f.disk.AppendSamples(samples)
if false {
fmt.Printf("Took %s to append...\n", time.Since(start))
}
}

func (f memoryToDiskFlusher) Close() {
Expand Down Expand Up @@ -360,11 +351,14 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
var (
scans = viewJob.builder.ScanJobs()
view = newView()
// Get a single iterator that will be used for all data extraction below.
iterator = t.diskStorage.metricSamples.NewIterator(true)
)
defer iterator.Close()

// Rebuilding of the frontier should happen on a conditional basis if a
// (fingerprint, timestamp) tuple is outside of the current frontier.
err = t.rebuildDiskFrontier()
err = t.rebuildDiskFrontier(iterator)
if err != nil {
panic(err)
}
Expand All @@ -374,10 +368,6 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
return
}

// Get a single iterator that will be used for all data extraction below.
iterator := t.diskStorage.metricSamples.NewIterator(true)
defer iterator.Close()

for _, scanJob := range scans {
seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion storage/raw/index/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ type MembershipIndex interface {
Has(key coding.Encoder) (bool, error)
Put(key coding.Encoder) error
Drop(key coding.Encoder) error
Close() error
Close()
}
4 changes: 2 additions & 2 deletions storage/raw/index/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type LevelDBMembershipIndex struct {
persistence *leveldb.LevelDBPersistence
}

func (l *LevelDBMembershipIndex) Close() error {
return l.persistence.Close()
func (l *LevelDBMembershipIndex) Close() {
l.persistence.Close()
}

func (l *LevelDBMembershipIndex) Has(key coding.Encoder) (bool, error) {
Expand Down
11 changes: 6 additions & 5 deletions storage/raw/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package raw
import (
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage"
"io"
)

// Persistence models a key-value store for bytes that supports various
// additional operations.
type Persistence interface {
io.Closer

// Close reaps all of the underlying system resources associated with this
// persistence.
Close()
// Has informs the user whether a given key exists in the database.
Has(key coding.Encoder) (bool, error)
// Get retrieves the key from the database if it exists or returns nil if
Expand All @@ -50,8 +50,9 @@ type Persistence interface {
// en masse. The interface implies no protocol around the atomicity of
// effectuation.
type Batch interface {
io.Closer

// Close reaps all of the underlying system resources associated with this
// batch mutation.
Close()
// Put follows the same protocol as Persistence.Put.
Put(key, value coding.Encoder)
// Drop follows the same protocol as Persistence.Drop.
Expand Down
4 changes: 1 addition & 3 deletions storage/raw/leveldb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ func (b batch) Put(key, value coding.Encoder) {
b.batch.Put(keyEncoded, valueEncoded)
}

func (b batch) Close() (err error) {
func (b batch) Close() {
b.batch.Close()

return
}
1 change: 1 addition & 0 deletions storage/raw/leveldb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ type Iterator interface {
SeekToFirst() (ok bool)
SeekToLast() (ok bool)
Value() []byte
Close()
}