Skip to content

Commit

Permalink
[refactor][memdb]: scan memeory database data by condition
Browse files Browse the repository at this point in the history
  • Loading branch information
stone1100 committed Feb 16, 2020
1 parent d57f424 commit ec21191
Show file tree
Hide file tree
Showing 23 changed files with 514 additions and 284 deletions.
1 change: 0 additions & 1 deletion aggregation/primitive_agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func (agg *primitiveAggregator) Iterator() series.PrimitiveIterator {
func (agg *primitiveAggregator) reset() {
if agg.values != nil {
agg.values.Reset()
//agg.values = collections.NewFloatArray(agg.pointCount)
}
}

Expand Down
2 changes: 1 addition & 1 deletion aggregation/series_agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

//go:generate mockgen -source=./series_agg.go -destination=./series_agg_mock.go -package=aggregation

// defines series aggregates which aggregates fields of a time series
// FieldAggregates represents aggregator which aggregates fields of a time series
type FieldAggregates []SeriesAggregator

// ResultSet returns the result set of aggregator
Expand Down
11 changes: 7 additions & 4 deletions flow/filtering.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@ package flow
import (
"github.com/lindb/roaring"

"github.com/lindb/lindb/series"
"github.com/lindb/lindb/pkg/timeutil"
"github.com/lindb/lindb/series/field"
)

//go:generate mockgen -source=./filtering.go -destination=./filtering_mock.go -package=flow

// DataFilter represents the filter ability over memory database and files under data family.
type DataFilter interface {
// Filter filters the data based on metricIDs/fieldIDs/version/seriesIDs,
// Filter filters the data based on metricIDs/fieldIDs/seriesIDs/timeRange,
// if finds data then returns filter result set, else returns nil.
Filter(metricID uint32, fieldIDs []uint16, version series.Version, seriesIDs *roaring.Bitmap) ([]FilterResultSet, error)
Filter(metricID uint32, fieldIDs []field.ID,
seriesIDs *roaring.Bitmap, timeRange timeutil.TimeRange,
) ([]FilterResultSet, error)
}

// FilterResultSet represents the filter result set, loads data and does down sampling need based on this interface.
type FilterResultSet interface {
// Load loads the data from storage, then does down sampling, finally reduces the down sampling results.
Load(flow StorageQueryFlow, fieldIDs []uint16, highKey uint16, groupedSeries map[string][]uint16)
Load(flow StorageQueryFlow, fieldIDs []field.ID, highKey uint16, groupedSeries map[string][]uint16)
}
2 changes: 1 addition & 1 deletion parallel/storage_query_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (qf *storageQueryFlow) Prepare(downSamplingSpecs aggregation.AggregatorSpec
func (qf *storageQueryFlow) GetAggregator() (agg aggregation.FieldAggregates) {
select {
case agg = <-qf.aggPool:
// reuse existing aggregator
// reuse existing aggregator
default:
// create new field aggregator
agg = qf.allocAgg(qf.downSamplingSpecs)
Expand Down
5 changes: 3 additions & 2 deletions query/storage_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/lindb/lindb/parallel"
"github.com/lindb/lindb/pkg/timeutil"
"github.com/lindb/lindb/series"
"github.com/lindb/lindb/series/field"
"github.com/lindb/lindb/sql/stmt"
"github.com/lindb/lindb/tsdb"
"github.com/lindb/lindb/tsdb/indexdb"
Expand Down Expand Up @@ -36,7 +37,7 @@ type storageExecutor struct {
shards []tsdb.Shard

metricID uint32
fieldIDs []uint16
fieldIDs []field.ID
storageExecutePlan *storageExecutePlan
intervalType timeutil.IntervalType

Expand Down Expand Up @@ -178,7 +179,7 @@ func (e *storageExecutor) executeQueryFlow(indexDB indexdb.IndexDatabase, filter

// 1. filtering, check series ids if exist in storage
e.queryFlow.Filtering(func() {
resultSet, err := filter.Filter(e.metricID, e.fieldIDs, version, seriesIDs)
resultSet, err := filter.Filter(e.metricID, e.fieldIDs, seriesIDs, e.query.TimeRange)
if err != nil {
e.queryFlow.Complete(err)
return
Expand Down
11 changes: 6 additions & 5 deletions query/storage_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/lindb/lindb/aggregation"
"github.com/lindb/lindb/aggregation/function"
"github.com/lindb/lindb/series/field"
"github.com/lindb/lindb/sql/stmt"
"github.com/lindb/lindb/tsdb/metadb"
)
Expand All @@ -21,10 +22,10 @@ type storageExecutePlan struct {
query *stmt.Query
idGetter metadb.IDGetter

fieldIDs []uint16
fieldIDs []field.ID

metricID uint32
fields map[uint16]aggregation.AggregatorSpec
fields map[field.ID]aggregation.AggregatorSpec
groupByTagKeys map[string]uint32

err error
Expand All @@ -35,7 +36,7 @@ func newStorageExecutePlan(index metadb.IDGetter, query *stmt.Query) Plan {
return &storageExecutePlan{
idGetter: index,
query: query,
fields: make(map[uint16]aggregation.AggregatorSpec),
fields: make(map[field.ID]aggregation.AggregatorSpec),
groupByTagKeys: make(map[string]uint32),
}
}
Expand All @@ -57,7 +58,7 @@ func (p *storageExecutePlan) Plan() error {
if p.err != nil {
return p.err
}
p.fieldIDs = make([]uint16, len(p.fields))
p.fieldIDs = make([]field.ID, len(p.fields))
idx := 0
for fieldID := range p.fields {
p.fieldIDs[idx] = fieldID
Expand Down Expand Up @@ -103,7 +104,7 @@ func (p *storageExecutePlan) getDownSamplingAggSpecs() aggregation.AggregatorSpe
}

// getFieldIDs returns sorted slice of field ids
func (p *storageExecutePlan) getFieldIDs() []uint16 {
func (p *storageExecutePlan) getFieldIDs() []field.ID {
return p.fieldIDs
}

Expand Down
60 changes: 29 additions & 31 deletions query/storage_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestStoragePlan_Metric(t *testing.T) {
metadataIndex := metadb.NewMockIDGetter(ctrl)
metadataIndex.EXPECT().GetMetricID(gomock.Any()).Return(uint32(10), nil)
metadataIndex.EXPECT().GetFieldID(gomock.Any(), gomock.Any()).
Return(uint16(10), field.SumField, nil).AnyTimes()
Return(field.ID(10), field.SumField, nil).AnyTimes()

query, _ := sql.Parse("select f from cpu")
plan := newStorageExecutePlan(metadataIndex, query)
Expand All @@ -43,18 +43,18 @@ func TestStoragePlan_SelectList(t *testing.T) {
metadataIndex := metadb.NewMockIDGetter(ctrl)
metadataIndex.EXPECT().GetMetricID(gomock.Any()).Return(uint32(10), nil).AnyTimes()
metadataIndex.EXPECT().GetFieldID(gomock.Any(), "f").
Return(uint16(10), field.SumField, nil).AnyTimes()
Return(field.ID(10), field.SumField, nil).AnyTimes()
metadataIndex.EXPECT().GetFieldID(gomock.Any(), "a").
Return(uint16(11), field.MinField, nil).AnyTimes()
Return(field.ID(11), field.MinField, nil).AnyTimes()
metadataIndex.EXPECT().GetFieldID(gomock.Any(), "b").
Return(uint16(12), field.MaxField, nil).AnyTimes()
Return(field.ID(12), field.MaxField, nil).AnyTimes()
metadataIndex.EXPECT().GetFieldID(gomock.Any(), "c").
Return(uint16(13), field.HistogramField, nil).AnyTimes()
Return(field.ID(13), field.HistogramField, nil).AnyTimes()
metadataIndex.EXPECT().GetFieldID(gomock.Any(), "e").
Return(uint16(14), field.HistogramField, nil).AnyTimes()
Return(field.ID(14), field.HistogramField, nil).AnyTimes()

metadataIndex.EXPECT().GetFieldID(gomock.Any(), "no_f").
Return(uint16(99), field.HistogramField, constants.ErrNotFound).AnyTimes()
Return(field.ID(99), field.HistogramField, constants.ErrNotFound).AnyTimes()

// error
query := &stmt.Query{MetricName: "cpu"}
Expand All @@ -75,8 +75,8 @@ func TestStoragePlan_SelectList(t *testing.T) {
storagePlan := plan.(*storageExecutePlan)
downSampling := aggregation.NewDownSamplingSpec("f", field.SumField)
downSampling.AddFunctionType(function.Sum)
assert.Equal(t, map[uint16]aggregation.AggregatorSpec{uint16(10): downSampling}, storagePlan.fields)
assert.Equal(t, []uint16{uint16(10)}, storagePlan.getFieldIDs())
assert.Equal(t, map[field.ID]aggregation.AggregatorSpec{field.ID(10): downSampling}, storagePlan.fields)
assert.Equal(t, []field.ID{10}, storagePlan.getFieldIDs())

query, _ = sql.Parse("select a,b,c as d from cpu")
plan = newStorageExecutePlan(metadataIndex, query)
Expand All @@ -90,20 +90,18 @@ func TestStoragePlan_SelectList(t *testing.T) {
downSampling2.AddFunctionType(function.Max)
downSampling3 := aggregation.NewDownSamplingSpec("c", field.HistogramField)
downSampling3.AddFunctionType(function.Histogram)
expect := map[uint16]aggregation.AggregatorSpec{
uint16(11): downSampling1,
uint16(12): downSampling2,
uint16(13): downSampling3,
expect := map[field.ID]aggregation.AggregatorSpec{
field.ID(11): downSampling1,
field.ID(12): downSampling2,
field.ID(13): downSampling3,
}
assert.Equal(t, expect, storagePlan.fields)
assert.Equal(t, []uint16{uint16(11), uint16(12), uint16(13)}, storagePlan.getFieldIDs())
assert.Equal(t, []field.ID{11, 12, 13}, storagePlan.getFieldIDs())

query, _ = sql.Parse("select min(a),max(sum(c)+avg(c)+e) as d from cpu")
plan = newStorageExecutePlan(metadataIndex, query)
err = plan.Plan()
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)
storagePlan = plan.(*storageExecutePlan)

downSampling1 = aggregation.NewDownSamplingSpec("a", field.MinField)
Expand All @@ -113,13 +111,13 @@ func TestStoragePlan_SelectList(t *testing.T) {
downSampling3.AddFunctionType(function.Avg)
downSampling4 := aggregation.NewDownSamplingSpec("e", field.HistogramField)
downSampling4.AddFunctionType(function.Histogram)
expect = map[uint16]aggregation.AggregatorSpec{
uint16(11): downSampling1,
uint16(13): downSampling3,
uint16(14): downSampling4,
expect = map[field.ID]aggregation.AggregatorSpec{
field.ID(11): downSampling1,
field.ID(13): downSampling3,
field.ID(14): downSampling4,
}
assert.Equal(t, expect, storagePlan.fields)
assert.Equal(t, []uint16{uint16(11), uint16(13), uint16(14)}, storagePlan.getFieldIDs())
assert.Equal(t, []field.ID{11, 13, 14}, storagePlan.getFieldIDs())
}

func TestStorageExecutePlan_groupBy(t *testing.T) {
Expand All @@ -130,8 +128,8 @@ func TestStorageExecutePlan_groupBy(t *testing.T) {
idGetter.EXPECT().GetMetricID("disk").Return(uint32(10), nil),
idGetter.EXPECT().GetTagKeyID(uint32(10), "host").Return(uint32(10), nil),
idGetter.EXPECT().GetTagKeyID(uint32(10), "path").Return(uint32(11), nil),
idGetter.EXPECT().GetFieldID(uint32(10), "f").Return(uint16(12), field.SumField, nil),
idGetter.EXPECT().GetFieldID(uint32(10), "d").Return(uint16(10), field.SumField, nil),
idGetter.EXPECT().GetFieldID(uint32(10), "f").Return(field.ID(12), field.SumField, nil),
idGetter.EXPECT().GetFieldID(uint32(10), "d").Return(field.ID(10), field.SumField, nil),
)

// normal
Expand All @@ -145,7 +143,7 @@ func TestStorageExecutePlan_groupBy(t *testing.T) {
assert.Equal(t, "d", aggSpecs[0].FieldName())
assert.Equal(t, "f", aggSpecs[1].FieldName())

assert.Equal(t, []uint16{10, 12}, storagePlan.getFieldIDs())
assert.Equal(t, []field.ID{10, 12}, storagePlan.getFieldIDs())
assert.Equal(t, 2, len(storagePlan.groupByTagKeys))
assert.Equal(t, uint32(10), storagePlan.groupByTagKeys["host"])
assert.Equal(t, uint32(11), storagePlan.groupByTagKeys["path"])
Expand Down Expand Up @@ -180,7 +178,7 @@ func TestStorageExecutePlan_field_expr_fail(t *testing.T) {
idGetter := metadb.NewMockIDGetter(ctrl)
gomock.InOrder(
idGetter.EXPECT().GetMetricID("disk").Return(uint32(10), nil),
idGetter.EXPECT().GetFieldID(uint32(10), "f").Return(uint16(10), field.Unknown, nil),
idGetter.EXPECT().GetFieldID(uint32(10), "f").Return(field.ID(10), field.Unknown, nil),
)
query, _ := sql.Parse("select f from disk")
plan := newStorageExecutePlan(idGetter, query)
Expand All @@ -189,7 +187,7 @@ func TestStorageExecutePlan_field_expr_fail(t *testing.T) {

gomock.InOrder(
idGetter.EXPECT().GetMetricID("disk").Return(uint32(10), nil),
idGetter.EXPECT().GetFieldID(uint32(10), "f").Return(uint16(10), field.SumField, nil),
idGetter.EXPECT().GetFieldID(uint32(10), "f").Return(field.ID(10), field.SumField, nil),
)
query, _ = sql.Parse("select histogram(f) from disk")
plan = newStorageExecutePlan(idGetter, query)
Expand All @@ -198,8 +196,8 @@ func TestStorageExecutePlan_field_expr_fail(t *testing.T) {

gomock.InOrder(
idGetter.EXPECT().GetMetricID("disk").Return(uint32(10), nil),
idGetter.EXPECT().GetFieldID(uint32(10), "d").Return(uint16(10), field.SumField, nil),
idGetter.EXPECT().GetFieldID(uint32(10), "f").Return(uint16(10), field.SumField, nil),
idGetter.EXPECT().GetFieldID(uint32(10), "d").Return(field.ID(10), field.SumField, nil),
idGetter.EXPECT().GetFieldID(uint32(10), "f").Return(field.ID(10), field.SumField, nil),
)
query, _ = sql.Parse("select (d+histogram(f)+b) from disk")
plan = newStorageExecutePlan(idGetter, query)
Expand All @@ -208,8 +206,8 @@ func TestStorageExecutePlan_field_expr_fail(t *testing.T) {

gomock.InOrder(
idGetter.EXPECT().GetMetricID("disk").Return(uint32(10), nil),
idGetter.EXPECT().GetFieldID(uint32(10), "d").Return(uint16(12), field.SumField, nil),
idGetter.EXPECT().GetFieldID(uint32(10), "f").Return(uint16(11), field.SumField, nil),
idGetter.EXPECT().GetFieldID(uint32(10), "d").Return(field.ID(12), field.SumField, nil),
idGetter.EXPECT().GetFieldID(uint32(10), "f").Return(field.ID(11), field.SumField, nil),
)
query, _ = sql.Parse("select (d+histogram(f)+b),e from disk")
plan = newStorageExecutePlan(idGetter, query)
Expand Down
7 changes: 3 additions & 4 deletions tsdb/family.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/lindb/lindb/kv"
"github.com/lindb/lindb/pkg/logger"
"github.com/lindb/lindb/pkg/timeutil"
"github.com/lindb/lindb/series"
"github.com/lindb/lindb/series/field"
"github.com/lindb/lindb/tsdb/tblstore"
)

Expand Down Expand Up @@ -68,8 +68,8 @@ func (f *dataFamily) Family() kv.Family {

// Filter filters the data based on metric/version/seriesIDs,
// if finds data then returns the FilterResultSet, else returns nil
func (f *dataFamily) Filter(metricID uint32, fieldIDs []uint16,
version series.Version, seriesIDs *roaring.Bitmap,
func (f *dataFamily) Filter(metricID uint32, fieldIDs []field.ID,
seriesIDs *roaring.Bitmap, timeRange timeutil.TimeRange,
) (resultSet []flow.FilterResultSet, err error) {
snapShot := f.family.GetSnapshot()
defer func() {
Expand All @@ -78,7 +78,6 @@ func (f *dataFamily) Filter(metricID uint32, fieldIDs []uint16,
snapShot.Close()
}
}()

readers, err := snapShot.FindReaders(metricID)
if len(readers) == 0 {
if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions tsdb/family_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/lindb/lindb/kv/table"
"github.com/lindb/lindb/kv/version"
"github.com/lindb/lindb/pkg/timeutil"
"github.com/lindb/lindb/series"
"github.com/lindb/lindb/tsdb/tblstore"
)

Expand Down Expand Up @@ -49,21 +48,21 @@ func TestDataFamily_Filter(t *testing.T) {

// test find kv readers err
snapshot.EXPECT().FindReaders(gomock.Any()).Return(nil, fmt.Errorf("err"))
rs, err := dataFamily.Filter(uint32(10), nil, series.NewVersion(), nil)
rs, err := dataFamily.Filter(uint32(10), nil, nil, timeutil.TimeRange{})
assert.Error(t, err)
assert.Nil(t, rs)

// test find kv readers nil
snapshot.EXPECT().FindReaders(gomock.Any()).Return(nil, nil)
rs, err = dataFamily.Filter(uint32(10), nil, series.NewVersion(), nil)
rs, err = dataFamily.Filter(uint32(10), nil, nil, timeutil.TimeRange{})
assert.NoError(t, err)
assert.Nil(t, rs)

// test not find in reader
reader := table.NewMockReader(ctrl)
snapshot.EXPECT().FindReaders(gomock.Any()).Return([]table.Reader{reader}, nil)
reader.EXPECT().Get(gomock.Any()).Return(nil, false)
rs, err = dataFamily.Filter(uint32(10), nil, series.NewVersion(), nil)
rs, err = dataFamily.Filter(uint32(10), nil, nil, timeutil.TimeRange{})
assert.NoError(t, err)
assert.Nil(t, rs)

Expand All @@ -74,7 +73,7 @@ func TestDataFamily_Filter(t *testing.T) {
newVersionBlockIterator = func(block []byte) (iterator tblstore.VersionBlockIterator, e error) {
return nil, fmt.Errorf("err")
}
rs, err = dataFamily.Filter(uint32(10), nil, series.NewVersion(), nil)
rs, err = dataFamily.Filter(uint32(10), nil, nil, timeutil.TimeRange{})
assert.Error(t, err)
assert.Nil(t, rs)

Expand All @@ -85,6 +84,6 @@ func TestDataFamily_Filter(t *testing.T) {
return blockIt, nil
}
reader.EXPECT().Get(gomock.Any()).Return([]byte{1, 2, 3}, true)
_, err = dataFamily.Filter(uint32(10), nil, series.NewVersion(), nil)
_, err = dataFamily.Filter(uint32(10), nil, nil, timeutil.TimeRange{})
assert.NoError(t, err)
}
Loading

0 comments on commit ec21191

Please sign in to comment.