diff --git a/pkg/config/config.go b/pkg/config/config.go index 8a20e2e64..d2b140af6 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -69,12 +69,12 @@ type cluster struct { } type shard struct { + // control gorutine number for read + GorutineNum int `env:"ZINC_SHARD_GORUTINE_NUM,default=3"` // DefaultNum is the default number of shards. Num int64 `env:"ZINC_SHARD_NUM,default=3"` // MaxSize is the maximum size limit for one shard, or will create a new shard. MaxSize uint64 `env:"ZINC_SHARD_MAX_SIZE,default=1073741824"` - // control gorutine number for read - GorutineNum int `env:"ZINC_SHARD_GORUTINE_NUM,default=10"` } type etcd struct { diff --git a/pkg/core/index.go b/pkg/core/index.go index e873f3581..cf54d00fa 100644 --- a/pkg/core/index.go +++ b/pkg/core/index.go @@ -90,6 +90,12 @@ func (index *Index) GetStats() meta.IndexStat { return s } +func (index *Index) UpdateWALSize(size uint64) { + index.lock.Lock() + index.ref.Stats.WALSize = size + index.lock.Unlock() +} + func (index *Index) GetAnalyzers() map[string]*analysis.Analyzer { index.lock.RLock() a := index.analyzers diff --git a/pkg/core/index_shards_wal.go b/pkg/core/index_shards_wal.go index e2c83632a..ea650190d 100644 --- a/pkg/core/index_shards_wal.go +++ b/pkg/core/index_shards_wal.go @@ -159,7 +159,7 @@ func (s *IndexShard) ConsumeWAL() bool { if minID == maxID { return false // no new entries } - // log.Debug().Str("index", s.GetIndexName()).Int64("shard", s.GetID()).Uint64("minID", minID).Uint64("maxID", maxID).Msg("consume wal begin") + log.Debug().Str("index", s.GetIndexName()).Str("shard", s.GetID()).Uint64("minID", minID).Uint64("maxID", maxID).Msg("consume wal begin") // limit max batch size if maxID-minID > MaxBatchSize { @@ -218,7 +218,7 @@ func (s *IndexShard) ConsumeWAL() bool { return false } } - // log.Debug().Str("index", s.GetIndexName()).Int64("shard", s.GetID()).Uint64("minID", minID).Uint64("maxID", maxID).Msg("consume wal end") + log.Debug().Str("index", s.GetIndexName()).Str("shard", s.GetID()).Uint64("minID", minID).Uint64("maxID", maxID).Msg("consume wal end") // Truncate log if err = s.wal.TruncateFront(minID); err != nil { diff --git a/pkg/core/index_shards_wallist.go b/pkg/core/index_shards_wallist.go index 426ccf135..ded43ea1c 100644 --- a/pkg/core/index_shards_wallist.go +++ b/pkg/core/index_shards_wallist.go @@ -116,8 +116,9 @@ func (t *IndexShardWALList) ConsumeWAL() { if size == uint64(index.GetShardNum()) { size = 0 } + index.UpdateWALSize(size) + stats := index.GetStats() - atomic.StoreUint64(&stats.WALSize, size) SetMetricStatsByIndex(name, "doc_num", float64(atomic.LoadUint64(&stats.DocNum))) SetMetricStatsByIndex(name, "storage_size", float64(atomic.LoadUint64(&stats.StorageSize)/1024/1024)) // convert to MB