Skip to content

Commit

Permalink
[enhance:lindb#1021]: monthly partitioning of indices
Browse files Browse the repository at this point in the history
  • Loading branch information
joyant committed May 17, 2024
1 parent f740235 commit 09c4211
Show file tree
Hide file tree
Showing 22 changed files with 2,125 additions and 935 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ web/package-lock.json
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
*.cov
coverage.html

/data

Expand Down
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ header: ## check and add license header.
import: ## opt go imports format.
sh scripts/imports.sh

format: ## go format
format: ## go format
go fmt ./...

lint: ## run lint
Expand All @@ -79,6 +79,7 @@ test-without-lint: ## Run test without lint
LOG_LEVEL=fatal ## disable log for test
gotest -v -race -coverprofile=coverage_tmp.out -covermode=atomic ./...
cat coverage_tmp.out |grep -v "_mock.go" > coverage.out
go tool cover -html=coverage.out -o coverage.html

test: header lint test-without-lint ## Run test cases.

Expand All @@ -104,7 +105,7 @@ gen-sql-grammar: ## generate lin query language gen-sql-grammar
antlr4 -Dlanguage=Go -listener -visitor -package grammar ./sql/grammar/SQL.g4

key-words: ## print all key words for lin query language
go run github.com/lindb/lindb/cmd/tools keywords
go run github.com/lindb/lindb/cmd/tools keywords

clean-mock: ## remove all mock files
find ./ -name "*_mock.go" | xargs rm
Expand Down
2 changes: 1 addition & 1 deletion e2e/tsdb/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestDatabase_Write_And_Rollup(t *testing.T) {

now, _ := commontimeutil.ParseTimestamp("20190702 19:10:00", "20060102 15:04:05")
familyTime := interval.Calculator().CalcFamilyTime(now)
f, err := shard.GetOrCrateDataFamily(familyTime)
f, err := shard.GetOrCreateDataFamily(familyTime)
assert.NoError(t, err)
assert.NotNil(t, f)

Expand Down
16 changes: 16 additions & 0 deletions metrics/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ type ShardStatistics struct {
IndexDBFlushFailures *linmetric.BoundCounter // flush index database failure
}

// SegmentStatistics represents segment statistics.
type SegmentStatistics struct {
IndexDBFlushDuration *linmetric.BoundHistogram // flush index database duration(include count)
IndexDBFlushFailures *linmetric.BoundCounter // flush index database failure
}

// FamilyStatistics represents family statistics.
type FamilyStatistics struct {
ActiveFamilies *linmetric.BoundGauge // number of current active families
Expand Down Expand Up @@ -122,6 +128,16 @@ func NewShardStatistics(database, shard string) *ShardStatistics {
}
}

// NewSegmentStatistics creates a segment statistics.
func NewSegmentStatistics(database, shard, segmentName string) *SegmentStatistics {
return &SegmentStatistics{
IndexDBFlushFailures: shardScope.NewCounterVec("indexdb_segment_flush_failures", "db", "shard", "segment").
WithTagValues(database, shard, segmentName),
IndexDBFlushDuration: shardScope.Scope("indexdb_segment_flush_duration").NewHistogramVec("db", "shard", "segment").
WithTagValues(database, shard, segmentName),
}
}

// NewMetaDBStatistics create a metadata database statistics.
func NewMetaDBStatistics(database string) *MetaDBStatistics {
return &MetaDBStatistics{
Expand Down
11 changes: 11 additions & 0 deletions pkg/timeutil/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package timeutil

import (
"time"
)

// Truncate truncates timestamp based on interval
func Truncate(timestamp, interval int64) int64 {
return timestamp / interval * interval
Expand All @@ -43,3 +47,10 @@ func CalIntervalRatio(queryInterval, storageInterval int64) int {
}
return int(queryInterval / storageInterval)
}

// GetMonthTimestamp returns the start of the month for a given timestamp.
func GetMonthTimestamp(familyTime int64) int64 {
tm := time.Unix(familyTime/1000, 0)
ts := time.Date(tm.Year(), tm.Month(), 1, 0, 0, 0, 0, time.Local)
return ts.Unix() * 1000
}
5 changes: 5 additions & 0 deletions pkg/timeutil/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,8 @@ func TestTruncate(t *testing.T) {
t1, _ = timeutil.ParseTimestamp("20190702 19:10:00", "20060102 15:04:05")
assert.Equal(t, t1, Truncate(now, 10*timeutil.OneMinute))
}

func TestGetMonthTimestamp(t *testing.T) {
got := GetMonthTimestamp(1715787380378)
assert.Equal(t, int64(1714492800000), got)
}
2 changes: 1 addition & 1 deletion replica/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (w *writeAheadLog) GetOrCreatePartition(
if !ok {
return nil, fmt.Errorf("shard: %d not exist", shardID.Int())
}
family, err := shard.GetOrCrateDataFamily(familyTime)
family, err := shard.GetOrCreateDataFamily(familyTime)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions tsdb/data_family.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type DataFamily interface {
type dataFamily struct {
indicator string // database + shard + family time
shard Shard
segment Segment
segment DataSegment
interval timeutil.Interval
intervalCalc timeutil.IntervalCalculator
familyTime int64
Expand Down Expand Up @@ -129,7 +129,7 @@ type dataFamily struct {
// newDataFamily creates a data family storage unit
func newDataFamily(
shard Shard,
segment Segment,
segment DataSegment,
interval timeutil.Interval,
timeRange timeutil.TimeRange,
familyTime int64,
Expand Down Expand Up @@ -611,7 +611,7 @@ func (f *dataFamily) GetOrCreateMemoryDatabase(familyTime int64) (memdb.MemoryDa
Name: f.shard.Database().Name(),
BufferMgr: f.shard.BufferManager(),
MetaNotifier: f.shard.Database().MetaDB().Notify,
IndexNotifier: f.shard.IndexDB().Notify,
IndexNotifier: f.shard.GetIndexDB(familyTime).Notify,
})
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions tsdb/data_family_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func TestDataFamily_GetOrCreateMemoryDatabase(t *testing.T) {
shard := NewMockShard(ctrl)
db := NewMockDatabase(ctrl)
indexDB := index.NewMockMetricIndexDatabase(ctrl)
shard.EXPECT().IndexDB().Return(indexDB).AnyTimes()
shard.EXPECT().GetIndexDB(gomock.Any()).Return(indexDB).AnyTimes()
shard.EXPECT().Database().Return(db).AnyTimes()
db.EXPECT().Name().Return("db").AnyTimes()
metaDB := index.NewMockMetricMetaDatabase(ctrl)
Expand Down Expand Up @@ -620,7 +620,7 @@ func TestDataFamily_WriteRows(t *testing.T) {
db := NewMockDatabase(ctrl)
shard.EXPECT().Database().Return(db).AnyTimes()
indexDB := index.NewMockMetricIndexDatabase(ctrl)
shard.EXPECT().IndexDB().Return(indexDB).AnyTimes()
shard.EXPECT().GetIndexDB(gomock.Any()).Return(indexDB).AnyTimes()
metaDB := index.NewMockMetricMetaDatabase(ctrl)
db.EXPECT().MetaDB().Return(metaDB).AnyTimes()
db.EXPECT().Name().Return("db").AnyTimes()
Expand Down Expand Up @@ -787,7 +787,7 @@ func TestDataFamily_Evict(t *testing.T) {
shard.EXPECT().Database().Return(db).AnyTimes()
opt := &option.DatabaseOption{Ahead: "1h", Behind: "1h"}
db.EXPECT().GetOption().Return(opt).AnyTimes()
segment := NewMockSegment(ctrl)
segment := NewMockDataSegment(ctrl)
segment.EXPECT().EvictFamily(gomock.Any()).AnyTimes()

cases := []struct {
Expand Down
230 changes: 230 additions & 0 deletions tsdb/data_segment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
// Licensed to LinDB under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. LinDB licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package tsdb

import (
"fmt"
"sort"
"strconv"
"sync"

"github.com/lindb/common/pkg/logger"

"github.com/lindb/lindb/constants"
"github.com/lindb/lindb/kv"
"github.com/lindb/lindb/pkg/timeutil"
"github.com/lindb/lindb/tsdb/tblstore/metricsdata"
)

//go:generate mockgen -source=./data_segment.go -destination=./data_segment_mock.go -package=tsdb

// DataSegment represents a time based dataSegment, there are some segments in a interval dataSegment.
// A dataSegment use k/v store for storing time series data.
type DataSegment interface {
// BaseTime returns dataSegment base time.
BaseTime() int64
// GetOrCreateDataFamily returns the data family based on timestamp.
GetOrCreateDataFamily(timestamp int64) (DataFamily, error)
// GetDataFamilies returns data family list by time range, return nil if not match.
GetDataFamilies(timeRange timeutil.TimeRange) []DataFamily
// NeedEvict checks dataSegment if it can evict, long term no read operation.
NeedEvict() bool
// EvictFamily evicts data family.
EvictFamily(familyTime int64)
// Close closes dataSegment, include kv store.
Close()
}

// dataSegment implements DataSegment interface.
type dataSegment struct {
indicator string
shard Shard
baseTime int64
kvStore kv.Store
interval timeutil.Interval
families map[int]DataFamily

mutex sync.RWMutex

logger logger.Logger
}

// newDataSegment returns dataSegment, dataSegment is wrapper of kv store.
func newDataSegment(shard Shard, segmentName string, interval timeutil.Interval) (DataSegment, error) {
indicator := ShardSegmentPath(shard.Database().Name(), shard.ShardID(), interval, segmentName)
// parse base time from dataSegment name
calc := interval.Calculator()
baseTime, err := calc.ParseSegmentTime(segmentName)
if err != nil {
return nil, fmt.Errorf("parse dataSegment[%s] base time error", indicator)
}

storeOption := kv.DefaultStoreOption()
intervals := shard.Database().GetOption().Intervals
if shard.CurrentInterval() == interval && len(intervals) > 1 {
// if interval == writeable interval and database set auto rollup intervals
sort.Sort(intervals) // need sort interval
var rollup []timeutil.Interval
for _, rollupInterval := range intervals {
rollup = append(rollup, rollupInterval.Interval)
}
storeOption.Rollup = rollup[1:]
storeOption.Source = interval
}
kvStore, err := kv.GetStoreManager().CreateStore(indicator, storeOption)
if err != nil {
return nil, fmt.Errorf("create kv store for dataSegment error:%s", err)
}
return &dataSegment{
shard: shard,
indicator: indicator,
baseTime: baseTime,
kvStore: kvStore,
interval: interval,
families: make(map[int]DataFamily),
logger: logger.GetLogger("TSDB", "DataSegment"),
}, nil
}

// BaseTime returns dataSegment base time
func (s *dataSegment) BaseTime() int64 {
return s.baseTime
}

// GetDataFamilies returns data family list by time range, return nil if not match
func (s *dataSegment) GetDataFamilies(timeRange timeutil.TimeRange) []DataFamily {
var result []DataFamily
calc := s.interval.Calculator()

familyQueryTimeRange := timeutil.TimeRange{
Start: calc.CalcFamilyStartTime(s.baseTime, calc.CalcFamily(timeRange.Start, s.baseTime)),
End: calc.CalcFamilyStartTime(s.baseTime, calc.CalcFamily(timeRange.End, s.baseTime)),
}
familyNames := s.kvStore.ListFamilyNames()

for _, familyName := range familyNames {
familyTime, err := strconv.Atoi(familyName)
if err != nil {
// TODO: add metric
continue
}
family := s.getOrLoadFamily(familyName, familyTime)
timeRange := family.TimeRange()
if familyQueryTimeRange.Overlap(timeRange) {
result = append(result, family)
}
}
return result
}

// NeedEvict checks dataSegment if it can evict, long term no read operation.
func (s *dataSegment) NeedEvict() bool {
s.mutex.Lock()
defer s.mutex.Unlock()

return len(s.families) == 0
}

// EvictFamily evicts data family.
func (s *dataSegment) EvictFamily(familyTime int64) {
calc := s.interval.Calculator()
family := calc.CalcFamily(familyTime, s.baseTime)

s.mutex.Lock()
defer s.mutex.Unlock()

delete(s.families, family)
}

// GetOrCreateDataFamily returns the data family based on timestamp.
func (s *dataSegment) GetOrCreateDataFamily(timestamp int64) (DataFamily, error) {
calc := s.interval.Calculator()

segmentTime := calc.CalcSegmentTime(timestamp)
if segmentTime != s.baseTime {
return nil, fmt.Errorf("%w, dataSegment base time not match, segmentTime: %d, baseTime: %d",
constants.ErrDataFamilyNotFound, timestamp, s.baseTime)
}

familyTime := calc.CalcFamily(timestamp, s.baseTime)

s.mutex.Lock()
defer s.mutex.Unlock()

if family, ok := s.families[familyTime]; ok {
return family, nil
}
familyOption := kv.FamilyOption{
CompactThreshold: 0,
Merger: string(metricsdata.MetricDataMerger),
}
familyName := strconv.Itoa(familyTime)
family := s.kvStore.GetFamily(familyName)
if family == nil {
// create kv family
var err error
family, err = s.kvStore.CreateFamily(fmt.Sprintf("%d", familyTime), familyOption)
if err != nil {
return nil, fmt.Errorf("%w ,failed to create data family: %s",
constants.ErrDataFamilyNotFound, err)
}
}
dataFamily := s.initDataFamily(familyTime, family)
return dataFamily, nil
}

// Close closes dataSegment, include kv store.
func (s *dataSegment) Close() {
s.mutex.Lock()
defer s.mutex.Unlock()

for _, family := range s.families {
if err := family.Close(); err != nil {
s.logger.Error("close family err", logger.String("family", family.Indicator()))
}
}
if err := kv.GetStoreManager().CloseStore(s.kvStore.Name()); err != nil {
s.logger.Error("close kv store error", logger.Error(err))
}
// clear family cache
s.families = make(map[int]DataFamily)
}

// getOrLoadFamily returns data family if it's exist in memory or storage.
func (s *dataSegment) getOrLoadFamily(familyName string, familyTime int) DataFamily {
s.mutex.Lock()
defer s.mutex.Unlock()

if family, ok := s.families[familyTime]; ok {
return family
}
return s.initDataFamily(familyTime, s.kvStore.GetFamily(familyName))
}

// initDataFamily initializes data family from storage.
func (s *dataSegment) initDataFamily(familyTime int, family kv.Family) DataFamily {
calc := s.interval.Calculator()
// create data family
familyStartTime := calc.CalcFamilyStartTime(s.baseTime, familyTime)
dataFamily := newDataFamilyFunc(s.shard, s, s.interval, timeutil.TimeRange{
Start: familyStartTime,
End: calc.CalcFamilyEndTime(familyStartTime),
}, familyStartTime, family)
s.families[familyTime] = dataFamily
return dataFamily
}
Loading

0 comments on commit 09c4211

Please sign in to comment.