Skip to content

Commit

Permalink
Fix duplicate read when point is in WAL and disk
Browse files Browse the repository at this point in the history
This commit fixes an issue where two points in a series with the same
timestamp can be read twice if they are in the WAL and disk. The write
to the WAL should overwrite the write to the disk and the client
should only see the WAL version.

Fixes #3315.
  • Loading branch information
benbjohnson committed Jul 28, 2015
1 parent f994a97 commit 097b025
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 10 deletions.
33 changes: 28 additions & 5 deletions tsdb/engine/v1/engine.go
Expand Up @@ -86,6 +86,9 @@ func NewEngine(path string, opt tsdb.EngineOptions) tsdb.Engine {
return e
}

// Path returns the path the engine was initialized with.
func (e *Engine) Path() string { return e.path }

// Open opens and initializes the engine.
func (e *Engine) Open() error {
if err := func() error {
Expand Down Expand Up @@ -594,11 +597,24 @@ func (c *Cursor) read() (key, value []byte) {
return nil, nil
}

// Use the buffer if it exists and there's no cache or if it is lower than the cache.
if c.buf.key != nil && (c.index >= len(c.cache) || bytes.Compare(c.buf.key, c.cache[c.index][0:8]) == -1) {
key, value = c.buf.key, c.buf.value
c.buf.key, c.buf.value = nil, nil
return
// Check if the buffer exists.
if c.buf.key != nil {
// Return the buffer if the cache is drained.
if c.index >= len(c.cache) {
return c.readBuf()
}

// Return the buffer if less than the cache.
cmp := bytes.Compare(c.buf.key, c.cache[c.index][0:8])
if cmp == -1 {
return c.readBuf()
}

// If the buffer and cache have equal keys then remove the buffer.
// The cache will be read at the end of the function.
if cmp == 0 {
c.buf.key, c.buf.value = nil, nil
}
}

// Otherwise read from the cache.
Expand All @@ -617,6 +633,13 @@ func (c *Cursor) read() (key, value []byte) {
return
}

// readBuf returns the value from the cursor buffer and then clears the buffer.
func (c *Cursor) readBuf() (key, value []byte) {
key, value = c.buf.key, c.buf.value
c.buf.key, c.buf.value = nil, nil
return
}

// WALPartitionN is the number of partitions in the write ahead log.
const WALPartitionN = 8

Expand Down
113 changes: 113 additions & 0 deletions tsdb/engine/v1/engine_test.go
@@ -0,0 +1,113 @@
package v1_test

import (
"io/ioutil"
"os"
"reflect"
"testing"
"time"

"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/influxdb/tsdb/engine/v1"
)

// Ensure the engine can write duplicate points to the WAL and cache and retrieve them correctly.
func TestEngine_Cursor_Duplicate(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()

// Write point.
if err := e.WritePoints([]tsdb.Point{
tsdb.NewPoint("cpu", tsdb.Tags{}, tsdb.Fields{"value": 100}, time.Unix(0, 1)),
}, nil, nil); err != nil {
t.Fatal(err)
}

// Flush to disk.
if err := e.Flush(0); err != nil {
t.Fatal(err)
}

// Write point again.
if err := e.WritePoints([]tsdb.Point{
tsdb.NewPoint("cpu", tsdb.Tags{}, tsdb.Fields{"value": 100}, time.Unix(0, 1)),
}, nil, nil); err != nil {
t.Fatal(err)
}

// Iterate over "cpu" series.
if keys, err := e.ReadSeriesPointKeys(`cpu`, tsdb.Tags{}); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(keys, [][]byte{
{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, // Unix(0, 1)
}) {
t.Fatalf("unexpected series point keys: %v", keys)
}
}

// Engine represents a test wrapper for v1.Engine.
type Engine struct {
*v1.Engine
}

// NewEngine returns a new instance of Engine.
func NewEngine(opt tsdb.EngineOptions) *Engine {
// Generate temporary file.
f, _ := ioutil.TempFile("", "v1-")
f.Close()
os.Remove(f.Name())

// Return test wrapper.
return &Engine{Engine: v1.NewEngine(f.Name(), opt).(*v1.Engine)}
}

// OpenEngine returns an opened instance of Engine. Panic on error.
func OpenEngine(opt tsdb.EngineOptions) *Engine {
e := NewEngine(opt)
if err := e.Open(); err != nil {
panic(err)
}
return e
}

// OpenDefaultEngine returns an open Engine with default options.
func OpenDefaultEngine() *Engine { return OpenEngine(tsdb.NewEngineOptions()) }

// Close closes the engine and removes all data.
func (e *Engine) Close() error {
e.Engine.Close()
os.RemoveAll(e.Path())
return nil
}

// ReadSeriesPointKeys returns the raw keys for all points store for a series.
func (e *Engine) ReadSeriesPointKeys(name string, tags tsdb.Tags) ([][]byte, error) {
// Open transaction on engine.
tx, err := e.Begin(false)
if err != nil {
return nil, err
}
defer tx.Rollback()

// Create a cursor for the series.
c := tx.Cursor(string(tsdb.MakeKey([]byte(name), tags)))

// Collect all the keys.
var keys [][]byte
for k, _ := c.Seek([]byte{0, 0, 0, 0, 0, 0, 0, 0}); k != nil; k, _ = c.Next() {
keys = append(keys, copyBytes(k))
}

return keys, nil
}

// copyBytes returns a copy of a byte slice.
func copyBytes(b []byte) []byte {
if b == nil {
return nil
}

other := make([]byte, len(b))
copy(other, b)
return other
}
10 changes: 5 additions & 5 deletions tsdb/points.go
Expand Up @@ -792,7 +792,7 @@ func unescapeQuoteString(in string) string {
// NewPoint returns a new point with the given measurement name, tags, fields and timestamp
func NewPoint(name string, tags Tags, fields Fields, time time.Time) Point {
return &point{
key: makeKey([]byte(name), tags),
key: MakeKey([]byte(name), tags),
time: time,
fields: fields.MarshalBinary(),
}
Expand Down Expand Up @@ -822,7 +822,7 @@ func (p *point) Name() string {

// SetName updates the measurement name for the point
func (p *point) SetName(name string) {
p.key = makeKey([]byte(name), p.Tags())
p.key = MakeKey([]byte(name), p.Tags())
}

// Time return the timestamp for the point
Expand Down Expand Up @@ -864,20 +864,20 @@ func (p *point) Tags() Tags {
return tags
}

func makeKey(name []byte, tags Tags) []byte {
func MakeKey(name []byte, tags Tags) []byte {
return append(escape(name), tags.HashKey()...)
}

// SetTags replaces the tags for the point
func (p *point) SetTags(tags Tags) {
p.key = makeKey(p.name(), tags)
p.key = MakeKey(p.name(), tags)
}

// AddTag adds or replaces a tag value for a point
func (p *point) AddTag(key, value string) {
tags := p.Tags()
tags[key] = value
p.key = makeKey(p.name(), tags)
p.key = MakeKey(p.name(), tags)
}

// Fields returns the fields for the point
Expand Down

0 comments on commit 097b025

Please sign in to comment.