Skip to content

Commit

Permalink
[enhance:#1021]: monthly partitioning of indices (#1028)
Browse files Browse the repository at this point in the history
* [enhance:#1021]: monthly partitioning of indices

* [enhance:#1021]: regen mock files

* [enhance:#1021]: fix GetMonthTimestamp func
  • Loading branch information
joyant committed May 18, 2024
1 parent f740235 commit de43215
Show file tree
Hide file tree
Showing 23 changed files with 2,132 additions and 940 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ web/package-lock.json
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
*.cov
coverage.html

/data

Expand Down
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ header: ## check and add license header.
import: ## opt go imports format.
sh scripts/imports.sh

format: ## go format
format: ## go format
go fmt ./...

lint: ## run lint
Expand All @@ -79,6 +79,7 @@ test-without-lint: ## Run test without lint
LOG_LEVEL=fatal ## disable log for test
gotest -v -race -coverprofile=coverage_tmp.out -covermode=atomic ./...
cat coverage_tmp.out |grep -v "_mock.go" > coverage.out
go tool cover -html=coverage.out -o coverage.html

test: header lint test-without-lint ## Run test cases.

Expand All @@ -104,7 +105,7 @@ gen-sql-grammar: ## generate lin query language gen-sql-grammar
antlr4 -Dlanguage=Go -listener -visitor -package grammar ./sql/grammar/SQL.g4

key-words: ## print all key words for lin query language
go run github.com/lindb/lindb/cmd/tools keywords
go run github.com/lindb/lindb/cmd/tools keywords

clean-mock: ## remove all mock files
find ./ -name "*_mock.go" | xargs rm
Expand Down
2 changes: 1 addition & 1 deletion e2e/tsdb/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestDatabase_Write_And_Rollup(t *testing.T) {

now, _ := commontimeutil.ParseTimestamp("20190702 19:10:00", "20060102 15:04:05")
familyTime := interval.Calculator().CalcFamilyTime(now)
f, err := shard.GetOrCrateDataFamily(familyTime)
f, err := shard.GetOrCreateDataFamily(familyTime)
assert.NoError(t, err)
assert.NotNil(t, f)

Expand Down
16 changes: 16 additions & 0 deletions metrics/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ type ShardStatistics struct {
IndexDBFlushFailures *linmetric.BoundCounter // flush index database failure
}

// SegmentStatistics represents segment statistics.
type SegmentStatistics struct {
IndexDBFlushDuration *linmetric.BoundHistogram // flush index database duration(include count)
IndexDBFlushFailures *linmetric.BoundCounter // flush index database failure
}

// FamilyStatistics represents family statistics.
type FamilyStatistics struct {
ActiveFamilies *linmetric.BoundGauge // number of current active families
Expand Down Expand Up @@ -122,6 +128,16 @@ func NewShardStatistics(database, shard string) *ShardStatistics {
}
}

// NewSegmentStatistics creates a segment statistics.
func NewSegmentStatistics(database, shard, segmentName string) *SegmentStatistics {
return &SegmentStatistics{
IndexDBFlushFailures: shardScope.NewCounterVec("indexdb_segment_flush_failures", "db", "shard", "segment").
WithTagValues(database, shard, segmentName),
IndexDBFlushDuration: shardScope.Scope("indexdb_segment_flush_duration").NewHistogramVec("db", "shard", "segment").
WithTagValues(database, shard, segmentName),
}
}

// NewMetaDBStatistics create a metadata database statistics.
func NewMetaDBStatistics(database string) *MetaDBStatistics {
return &MetaDBStatistics{
Expand Down
11 changes: 11 additions & 0 deletions pkg/timeutil/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package timeutil

import (
"time"
)

// Truncate truncates timestamp based on interval
func Truncate(timestamp, interval int64) int64 {
return timestamp / interval * interval
Expand All @@ -43,3 +47,10 @@ func CalIntervalRatio(queryInterval, storageInterval int64) int {
}
return int(queryInterval / storageInterval)
}

// GetMonthTimestamp returns the start of the month for a given timestamp.
func GetMonthTimestamp(familyTime int64) int64 {
tm := time.Unix(familyTime/1000, 0)
ts := time.Date(tm.Year(), tm.Month(), 1, 0, 0, 0, 0, time.Local)
return ts.UnixNano() / 1000000
}
11 changes: 9 additions & 2 deletions pkg/timeutil/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package timeutil

import (
"testing"

"github.com/lindb/common/pkg/timeutil"

"github.com/stretchr/testify/assert"

"testing"
)

const date = "20191212 10:11:10"
Expand All @@ -47,3 +48,9 @@ func TestTruncate(t *testing.T) {
t1, _ = timeutil.ParseTimestamp("20190702 19:10:00", "20060102 15:04:05")
assert.Equal(t, t1, Truncate(now, 10*timeutil.OneMinute))
}

func TestGetMonthTimestamp(t *testing.T) {
now, _ := timeutil.ParseTimestamp("20190711 12:30:30", "20060102 15:04:05")
t1, _ := timeutil.ParseTimestamp("20190701 00:00:00", "20060102 15:04:05")
assert.Equal(t, t1, GetMonthTimestamp(now))
}
2 changes: 1 addition & 1 deletion replica/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (w *writeAheadLog) GetOrCreatePartition(
if !ok {
return nil, fmt.Errorf("shard: %d not exist", shardID.Int())
}
family, err := shard.GetOrCrateDataFamily(familyTime)
family, err := shard.GetOrCreateDataFamily(familyTime)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions replica/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ func TestWriteAheadLog_GetOrCreatePartition(t *testing.T) {
name: "get data family failure",
prepare: func(_ *writeAheadLog) {
engine.EXPECT().GetShard(gomock.Any(), gomock.Any()).Return(shard, true)
shard.EXPECT().GetOrCrateDataFamily(gomock.Any()).Return(nil, fmt.Errorf("err"))
shard.EXPECT().GetOrCreateDataFamily(gomock.Any()).Return(nil, fmt.Errorf("err"))
},
wantErr: true,
},
{
name: "new log queue failure",
prepare: func(_ *writeAheadLog) {
engine.EXPECT().GetShard(gomock.Any(), gomock.Any()).Return(shard, true)
shard.EXPECT().GetOrCrateDataFamily(gomock.Any()).Return(nil, nil)
shard.EXPECT().GetOrCreateDataFamily(gomock.Any()).Return(nil, nil)
newFanOutQueue = func(dirPath string, dataSizeLimit int64) (q queue.FanOutQueue, err error) {
return nil, fmt.Errorf("err")
}
Expand All @@ -90,7 +90,7 @@ func TestWriteAheadLog_GetOrCreatePartition(t *testing.T) {
name: "create partition successfully",
prepare: func(_ *writeAheadLog) {
engine.EXPECT().GetShard(gomock.Any(), gomock.Any()).Return(shard, true)
shard.EXPECT().GetOrCrateDataFamily(gomock.Any()).Return(nil, nil)
shard.EXPECT().GetOrCreateDataFamily(gomock.Any()).Return(nil, nil)
newFanOutQueue = func(dirPath string, dataSizeLimit int64) (q queue.FanOutQueue, err error) {
return nil, nil
}
Expand Down
6 changes: 3 additions & 3 deletions tsdb/data_family.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type DataFamily interface {
type dataFamily struct {
indicator string // database + shard + family time
shard Shard
segment Segment
segment DataSegment
interval timeutil.Interval
intervalCalc timeutil.IntervalCalculator
familyTime int64
Expand Down Expand Up @@ -129,7 +129,7 @@ type dataFamily struct {
// newDataFamily creates a data family storage unit
func newDataFamily(
shard Shard,
segment Segment,
segment DataSegment,
interval timeutil.Interval,
timeRange timeutil.TimeRange,
familyTime int64,
Expand Down Expand Up @@ -611,7 +611,7 @@ func (f *dataFamily) GetOrCreateMemoryDatabase(familyTime int64) (memdb.MemoryDa
Name: f.shard.Database().Name(),
BufferMgr: f.shard.BufferManager(),
MetaNotifier: f.shard.Database().MetaDB().Notify,
IndexNotifier: f.shard.IndexDB().Notify,
IndexNotifier: f.shard.GetIndexDB(familyTime).Notify,
})
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions tsdb/data_family_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func TestDataFamily_GetOrCreateMemoryDatabase(t *testing.T) {
shard := NewMockShard(ctrl)
db := NewMockDatabase(ctrl)
indexDB := index.NewMockMetricIndexDatabase(ctrl)
shard.EXPECT().IndexDB().Return(indexDB).AnyTimes()
shard.EXPECT().GetIndexDB(gomock.Any()).Return(indexDB).AnyTimes()
shard.EXPECT().Database().Return(db).AnyTimes()
db.EXPECT().Name().Return("db").AnyTimes()
metaDB := index.NewMockMetricMetaDatabase(ctrl)
Expand Down Expand Up @@ -620,7 +620,7 @@ func TestDataFamily_WriteRows(t *testing.T) {
db := NewMockDatabase(ctrl)
shard.EXPECT().Database().Return(db).AnyTimes()
indexDB := index.NewMockMetricIndexDatabase(ctrl)
shard.EXPECT().IndexDB().Return(indexDB).AnyTimes()
shard.EXPECT().GetIndexDB(gomock.Any()).Return(indexDB).AnyTimes()
metaDB := index.NewMockMetricMetaDatabase(ctrl)
db.EXPECT().MetaDB().Return(metaDB).AnyTimes()
db.EXPECT().Name().Return("db").AnyTimes()
Expand Down Expand Up @@ -787,7 +787,7 @@ func TestDataFamily_Evict(t *testing.T) {
shard.EXPECT().Database().Return(db).AnyTimes()
opt := &option.DatabaseOption{Ahead: "1h", Behind: "1h"}
db.EXPECT().GetOption().Return(opt).AnyTimes()
segment := NewMockSegment(ctrl)
segment := NewMockDataSegment(ctrl)
segment.EXPECT().EvictFamily(gomock.Any()).AnyTimes()

cases := []struct {
Expand Down
Loading

0 comments on commit de43215

Please sign in to comment.