Skip to content

Commit

Permalink
feat(tsm1): Add Read<Type>ArrayBlock APIs to TSMReader and mmapAccessor
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartcarnie committed Jul 16, 2018
1 parent b260a1d commit 790639d
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 154 deletions.
1 change: 1 addition & 0 deletions tsdb/engine/tsm1/engine.go
Expand Up @@ -43,6 +43,7 @@ import (
//go:generate tmpl -data=@file_store.gen.go.tmpldata file_store.gen.go.tmpl
//go:generate tmpl -data=@encoding.gen.go.tmpldata encoding.gen.go.tmpl
//go:generate tmpl -data=@compact.gen.go.tmpldata compact.gen.go.tmpl
//go:generate tmpl -data=@reader.gen.go.tmpldata reader.gen.go.tmpl

func init() {
tsdb.RegisterEngine("tsm1", NewEngine)
Expand Down
5 changes: 5 additions & 0 deletions tsdb/engine/tsm1/file_store.go
Expand Up @@ -46,10 +46,15 @@ type TSMFile interface {
// ReadAt returns all the values in the block identified by entry.
ReadAt(entry *IndexEntry, values []Value) ([]Value, error)
ReadFloatBlockAt(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error)
ReadFloatArrayBlockAt(entry *IndexEntry, values *tsdb.FloatArray) error
ReadIntegerBlockAt(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error)
ReadIntegerArrayBlockAt(entry *IndexEntry, values *tsdb.IntegerArray) error
ReadUnsignedBlockAt(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error)
ReadUnsignedArrayBlockAt(entry *IndexEntry, values *tsdb.UnsignedArray) error
ReadStringBlockAt(entry *IndexEntry, values *[]StringValue) ([]StringValue, error)
ReadStringArrayBlockAt(entry *IndexEntry, values *tsdb.StringArray) error
ReadBooleanBlockAt(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error)
ReadBooleanArrayBlockAt(entry *IndexEntry, values *tsdb.BooleanArray) error

// Entries returns the index entries for all blocks for the given key.
Entries(key []byte) []IndexEntry
Expand Down
21 changes: 21 additions & 0 deletions tsdb/engine/tsm1/file_store_key_iterator_test.go
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/tsdb"
)

func TestNewMergeKeyIterator(t *testing.T) {
Expand Down Expand Up @@ -196,3 +197,23 @@ func (*mockTSMFile) ReadStringBlockAt(*IndexEntry, *[]StringValue) ([]StringValu
func (*mockTSMFile) ReadBooleanBlockAt(*IndexEntry, *[]BooleanValue) ([]BooleanValue, error) {
panic("implement me")
}

func (*mockTSMFile) ReadFloatArrayBlockAt(*IndexEntry, *tsdb.FloatArray) error {
panic("implement me")
}

func (*mockTSMFile) ReadIntegerArrayBlockAt(*IndexEntry, *tsdb.IntegerArray) error {
panic("implement me")
}

func (*mockTSMFile) ReadUnsignedArrayBlockAt(*IndexEntry, *tsdb.UnsignedArray) error {
panic("implement me")
}

func (*mockTSMFile) ReadStringArrayBlockAt(*IndexEntry, *tsdb.StringArray) error {
panic("implement me")
}

func (*mockTSMFile) ReadBooleanArrayBlockAt(*IndexEntry, *tsdb.BooleanArray) error {
panic("implement me")
}
285 changes: 285 additions & 0 deletions tsdb/engine/tsm1/reader.gen.go
@@ -0,0 +1,285 @@
// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: reader.gen.go.tmpl

package tsm1

import (
"github.com/influxdata/influxdb/tsdb"
)

// ReadFloatBlockAt returns the float values corresponding to the given index entry.
func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, vals *[]FloatValue) ([]FloatValue, error) {
t.mu.RLock()
v, err := t.accessor.readFloatBlock(entry, vals)
t.mu.RUnlock()
return v, err
}

// ReadFloatArrayBlockAt fills vals with the float values corresponding to the given index entry.
func (t *TSMReader) ReadFloatArrayBlockAt(entry *IndexEntry, vals *tsdb.FloatArray) error {
t.mu.RLock()
err := t.accessor.readFloatArrayBlock(entry, vals)
t.mu.RUnlock()
return err
}

// ReadIntegerBlockAt returns the integer values corresponding to the given index entry.
func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, vals *[]IntegerValue) ([]IntegerValue, error) {
t.mu.RLock()
v, err := t.accessor.readIntegerBlock(entry, vals)
t.mu.RUnlock()
return v, err
}

// ReadIntegerArrayBlockAt fills vals with the integer values corresponding to the given index entry.
func (t *TSMReader) ReadIntegerArrayBlockAt(entry *IndexEntry, vals *tsdb.IntegerArray) error {
t.mu.RLock()
err := t.accessor.readIntegerArrayBlock(entry, vals)
t.mu.RUnlock()
return err
}

// ReadUnsignedBlockAt returns the unsigned values corresponding to the given index entry.
func (t *TSMReader) ReadUnsignedBlockAt(entry *IndexEntry, vals *[]UnsignedValue) ([]UnsignedValue, error) {
t.mu.RLock()
v, err := t.accessor.readUnsignedBlock(entry, vals)
t.mu.RUnlock()
return v, err
}

// ReadUnsignedArrayBlockAt fills vals with the unsigned values corresponding to the given index entry.
func (t *TSMReader) ReadUnsignedArrayBlockAt(entry *IndexEntry, vals *tsdb.UnsignedArray) error {
t.mu.RLock()
err := t.accessor.readUnsignedArrayBlock(entry, vals)
t.mu.RUnlock()
return err
}

// ReadStringBlockAt returns the string values corresponding to the given index entry.
func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, vals *[]StringValue) ([]StringValue, error) {
t.mu.RLock()
v, err := t.accessor.readStringBlock(entry, vals)
t.mu.RUnlock()
return v, err
}

// ReadStringArrayBlockAt fills vals with the string values corresponding to the given index entry.
func (t *TSMReader) ReadStringArrayBlockAt(entry *IndexEntry, vals *tsdb.StringArray) error {
t.mu.RLock()
err := t.accessor.readStringArrayBlock(entry, vals)
t.mu.RUnlock()
return err
}

// ReadBooleanBlockAt returns the boolean values corresponding to the given index entry.
func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, vals *[]BooleanValue) ([]BooleanValue, error) {
t.mu.RLock()
v, err := t.accessor.readBooleanBlock(entry, vals)
t.mu.RUnlock()
return v, err
}

// ReadBooleanArrayBlockAt fills vals with the boolean values corresponding to the given index entry.
func (t *TSMReader) ReadBooleanArrayBlockAt(entry *IndexEntry, vals *tsdb.BooleanArray) error {
t.mu.RLock()
err := t.accessor.readBooleanArrayBlock(entry, vals)
t.mu.RUnlock()
return err
}

// blockAccessor abstracts a method of accessing blocks from a
// TSM file.
type blockAccessor interface {
init() (*indirectIndex, error)
read(key []byte, timestamp int64) ([]Value, error)
readAll(key []byte) ([]Value, error)
readBlock(entry *IndexEntry, values []Value) ([]Value, error)
readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error)
readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error
readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error)
readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error
readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error)
readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error
readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error)
readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error
readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error)
readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error
readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error)
rename(path string) error
path() string
close() error
free() error
}

func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}

a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
m.mu.RUnlock()

if err != nil {
return nil, err
}

return a, nil
}

func (m *mmapAccessor) readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return ErrTSMClosed
}

err := DecodeFloatArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
m.mu.RUnlock()

return err
}

func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}

a, err := DecodeIntegerBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
m.mu.RUnlock()

if err != nil {
return nil, err
}

return a, nil
}

func (m *mmapAccessor) readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return ErrTSMClosed
}

err := DecodeIntegerArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
m.mu.RUnlock()

return err
}

func (m *mmapAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}

a, err := DecodeUnsignedBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
m.mu.RUnlock()

if err != nil {
return nil, err
}

return a, nil
}

func (m *mmapAccessor) readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return ErrTSMClosed
}

err := DecodeUnsignedArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
m.mu.RUnlock()

return err
}

func (m *mmapAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}

a, err := DecodeStringBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
m.mu.RUnlock()

if err != nil {
return nil, err
}

return a, nil
}

func (m *mmapAccessor) readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return ErrTSMClosed
}

err := DecodeStringArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
m.mu.RUnlock()

return err
}

func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return nil, ErrTSMClosed
}

a, err := DecodeBooleanBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
m.mu.RUnlock()

if err != nil {
return nil, err
}

return a, nil
}

func (m *mmapAccessor) readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error {
m.incAccess()

m.mu.RLock()
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
m.mu.RUnlock()
return ErrTSMClosed
}

err := DecodeBooleanArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
m.mu.RUnlock()

return err
}

0 comments on commit 790639d

Please sign in to comment.