Skip to content

Commit

Permalink
[enhance:#1007]: reduce goroutine when write too many data families (#…
Browse files Browse the repository at this point in the history
…1016)

* [enhance:#1007]: reduce goroutine when write too many data families

* [enhance:#1007]: update comments for clarity

* [enhance:#1007]: update test cases
  • Loading branch information
joyant committed Feb 26, 2024
1 parent 5fbd819 commit f740235
Show file tree
Hide file tree
Showing 7 changed files with 496 additions and 470 deletions.
3 changes: 1 addition & 2 deletions index/model/trie_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,7 @@ func createTriesDataBulk(t *testing.T, blockSize int) (keys [][]byte, values []u
w := bytes.NewBuffer([]byte{})
b := NewTrieBucketBuilder(blockSize, w)
assert.NoError(t, b.Write(keys, values))
data = w.Bytes()
return keys, values, data, keysString
return keys, values, w.Bytes(), keysString
}

func TestTrieBucket_Unmarlshal(t *testing.T) {
Expand Down
197 changes: 154 additions & 43 deletions replica/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"

"github.com/lindb/common/pkg/logger"
"github.com/lindb/common/pkg/timeutil"
Expand All @@ -40,7 +41,11 @@ var (
// for testing
newLocalReplicatorFn = NewLocalReplicator
newRemoteReplicatorFn = NewRemoteReplicator
newReplicatorPeerFn = NewReplicatorPeer
)

const (
replicatorTypeLocal = "local"
replicatorTypeRemote = "remote"
)

// Partition represents a partition of writeTask ahead log.
Expand All @@ -67,6 +72,12 @@ type Partition interface {
Stop()
// getReplicaState returns each family's log replica state.
getReplicaState() models.FamilyLogReplicaState
// StartReplica iterates over all replicators and copies data.
StartReplica()
// replicaLoop starts replica loop
replicaLoop()
// replica tries to consume message
replica(nodeID models.NodeID, replicator Replicator)
// recovery rebuilds replication relation based on local partition.
recovery(leader models.NodeID) error
}
Expand All @@ -81,14 +92,16 @@ type partition struct {
shardID models.ShardID
shard tsdb.Shard
family tsdb.DataFamily
running *atomic.Bool

peers map[models.NodeID]ReplicatorPeer
cliFct rpc.ClientStreamFactory
stateMgr storage.StateManager
replicators map[models.NodeID]Replicator
cliFct rpc.ClientStreamFactory
stateMgr storage.StateManager

mutex sync.Mutex

statistics *metrics.StorageWriteAheadLogStatistics
statistics *metrics.StorageWriteAheadLogStatistics
replicatorStatistics map[models.NodeID]*metrics.StorageReplicatorRunnerStatistics

logger logger.Logger
}
Expand All @@ -104,21 +117,24 @@ func NewPartition(
stateMgr storage.StateManager,
) Partition {
c, cancel := context.WithCancel(ctx)
return &partition{
ctx: c,
cancel: cancel,
log: log,
db: shard.Database().Name(),
shardID: shard.ShardID(),
shard: shard,
family: family,
currentNodeID: currentNodeID,
cliFct: cliFct,
stateMgr: stateMgr,
peers: make(map[models.NodeID]ReplicatorPeer),
statistics: metrics.NewStorageWriteAheadLogStatistics(shard.Database().Name(), shard.ShardID().String()),
logger: logger.GetLogger("Replica", "Partition"),
p := &partition{
ctx: c,
cancel: cancel,
log: log,
db: shard.Database().Name(),
shardID: shard.ShardID(),
shard: shard,
family: family,
running: &atomic.Bool{},
currentNodeID: currentNodeID,
cliFct: cliFct,
stateMgr: stateMgr,
replicators: make(map[models.NodeID]Replicator),
statistics: metrics.NewStorageWriteAheadLogStatistics(shard.Database().Name(), shard.ShardID().String()),
replicatorStatistics: make(map[models.NodeID]*metrics.StorageReplicatorRunnerStatistics),
logger: logger.GetLogger("Replica", "Partition"),
}
return p
}

// ReplicaLog writes msg that leader sends replica msg.
Expand Down Expand Up @@ -190,10 +206,17 @@ func (p *partition) stopReplicator(node string) {

nodeID := models.ParseNodeID(node)
// shutdown replicator if exist
peer, ok := p.peers[nodeID]
replicator, ok := p.replicators[nodeID]
if ok {
peer.Shutdown()
delete(p.peers, nodeID)
replicator.Close()
// copy on write
replicators := make(map[models.NodeID]Replicator, len(p.replicators)-1)
for id := range p.replicators {
if id != nodeID {
replicators[id] = p.replicators[id]
}
}
p.replicators = replicators
}
}

Expand Down Expand Up @@ -250,6 +273,68 @@ func (p *partition) BuildReplicaForFollower(leader, replica models.NodeID) error
return err
}

// StartReplica iterates over all replicators and copies data.
func (p *partition) StartReplica() {
if p.running.CompareAndSwap(false, true) {
go p.replicaLoop()
}
}

// replicaLoop starts replica loop
func (p *partition) replicaLoop() {
for p.running.Load() {
for nodeID, replicator := range p.replicators {
p.replica(nodeID, replicator)
}
}
}

// replica tries to consume message
func (p *partition) replica(nodeID models.NodeID, replicator Replicator) {
var replicatorStatistics = p.replicatorStatistics[nodeID]

defer func() {
if recovered := recover(); recovered != nil {
replicatorStatistics.ReplicaPanics.Incr()
p.logger.Error("panic when replica data",
logger.Any("err", recovered),
logger.Stack(),
)
}
}()

if replicator.IsReady() && replicator.Connect() {
seq := replicator.Consume()
if seq >= 0 {
var replicatorType string
switch replicator.(type) {
case *localReplicator:
replicatorType = replicatorTypeLocal
case *remoteReplicator:
replicatorType = replicatorTypeRemote
}
p.logger.Debug("replica write ahead log",
logger.String("type", replicatorType),
logger.String("replicator", replicator.String()),
logger.Int64("index", seq))
data, err := replicator.GetMessage(seq)
if err != nil {
replicator.IgnoreMessage(seq)
replicatorStatistics.ConsumeMessageFailures.Incr()
p.logger.Warn("cannot get replica message data, ignore replica message",
logger.String("replicator", replicator.String()),
logger.Int64("index", seq), logger.Error(err))
} else {
replicatorStatistics.ConsumeMessage.Incr()
replicator.Replica(seq, data)
replicatorStatistics.ReplicaBytes.Add(float64(len(data)))
}
}
} else {
p.logger.Warn("replica is not ready", logger.String("replicator", replicator.String()))
}
}

// Close shutdowns all replica workers.
func (p *partition) Close() error {
// close log
Expand All @@ -259,16 +344,22 @@ func (p *partition) Close() error {

// Stop stops replicator channel.
func (p *partition) Stop() {
p.running.Store(false)
p.stop()
}

// stop stops replicator channel.
func (p *partition) stop() {
// 1. cancel context of partition(will stop replicator)
p.cancel()

// 2. stop the peer of replicator
var waiter sync.WaitGroup
waiter.Add(len(p.peers))
for k := range p.peers {
r := p.peers[k]
waiter.Add(len(p.replicators))
for k := range p.replicators {
r := p.replicators[k]
go func() {
r.Shutdown()
r.Close()
waiter.Done()
}()
}
Expand All @@ -292,11 +383,17 @@ func (p *partition) getReplicaState() models.FamilyLogReplicaState {
Pending: fanout.Pending(),
}
nodeID := models.ParseNodeID(name)
if peer, ok := p.getReplicatorRunner(nodeID); ok {
replicatorType, replicatorState := peer.ReplicatorState()
if replicator, ok := p.replicators[nodeID]; ok {
var replicatorType string
switch replicator.(type) {
case *localReplicator:
replicatorType = replicatorTypeLocal
case *remoteReplicator:
replicatorType = replicatorTypeRemote
}
peerState.ReplicatorType = replicatorType
peerState.State = replicatorState.state
peerState.StateErrMsg = replicatorState.errMsg
peerState.State = replicator.State().state
peerState.StateErrMsg = replicator.State().errMsg
}

stateOfReplicators = append(stateOfReplicators, peerState)
Expand All @@ -314,15 +411,18 @@ func (p *partition) buildReplica(leader, replica models.NodeID) error {
p.mutex.Lock()
defer p.mutex.Unlock()

if _, ok := p.peers[replica]; ok {
if _, ok := p.replicators[replica]; ok {
// exist
return nil
}
walConsumer, err := p.log.GetOrCreateConsumerGroup(fmt.Sprintf("%d", replica))
if err != nil {
return err
}
var replicator Replicator
var (
replicator Replicator
replicaType string
)
channel := ReplicatorChannel{
State: &models.ReplicaState{
Database: p.shard.Database().Name(),
Expand All @@ -336,25 +436,36 @@ func (p *partition) buildReplica(leader, replica models.NodeID) error {
if replica == p.currentNodeID {
// local replicator
replicator = newLocalReplicatorFn(&channel, p.shard, p.family)
replicaType = replicatorTypeLocal
} else {
// build remote replicator
replicator = newRemoteReplicatorFn(p.ctx, &channel, p.stateMgr, p.cliFct)
replicaType = replicatorTypeRemote
}

// startup replicator peer
peer := newReplicatorPeerFn(replicator)
p.peers[replica] = peer
peer.Startup()
var (
state = replicator.ReplicaState()
replicators = make(map[models.NodeID]Replicator, len(p.replicators))
replicatorStatistics = make(map[models.NodeID]*metrics.StorageReplicatorRunnerStatistics, len(p.replicatorStatistics))
)

return nil
}
for nodeID, replicator0 := range p.replicators {
replicators[nodeID] = replicator0
}
for nodeID, statistics := range p.replicatorStatistics {
replicatorStatistics[nodeID] = statistics
}

func (p *partition) getReplicatorRunner(nodeID models.NodeID) (ReplicatorPeer, bool) {
p.mutex.Lock()
defer p.mutex.Unlock()
// copy on write
replicatorStatistics[replica] = metrics.NewStorageReplicatorRunnerStatistics(replicaType, state.Database, state.ShardID.String())
// the order is to first use replicators and then replicatorStatistics,
// so replicatorStatistics must be assigned first in concurrent scenarios,
// perhaps in the future, this should be refactored to encapsulate two variables into a single structure.
p.replicatorStatistics = replicatorStatistics
replicators[replica] = replicator
p.replicators = replicators

peer, ok := p.peers[nodeID]
return peer, ok
return nil
}

// recovery rebuilds replication relation based on local partition.
Expand Down
Loading

0 comments on commit f740235

Please sign in to comment.