Skip to content

Commit

Permalink
fix #8599: move task schedule to a separate goroutine (#9775)
Browse files Browse the repository at this point in the history
moving the task schedule from the ticker normal routine to a separate goroutine can avoid the hakeeper's health check and tick update operations being blocked by the task schedule, or the tick will be skipped and can not correctly estimate the time passing.

and I have already added some unit tests.

Approved by: @zhangxu19830126
  • Loading branch information
gouhongshen committed Jun 9, 2023
1 parent de68009 commit e9baca1
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 4 deletions.
34 changes: 34 additions & 0 deletions pkg/logservice/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,34 @@ func (l *store) queryLog(ctx context.Context, shardID uint64,
}
}

func (l *store) tickerForTaskSchedule(ctx context.Context, duration time.Duration) {
ticker := time.NewTicker(duration)
defer ticker.Stop()

for {
select {
case <-ticker.C:
state, _ := l.getCheckerStateFromLeader()
if state != nil && state.State == pb.HAKeeperRunning {
l.taskSchedule(state)
}

case <-ctx.Done():
return
}

// l.taskSchedule could be blocking a long time, this extra select
// can give a chance immediately to check the ctx status when it resumes.
select {
case <-ctx.Done():
return
default:
// nothing to do
}
}

}

func (l *store) ticker(ctx context.Context) {
if l.cfg.HAKeeperTickInterval.Duration == 0 {
panic("invalid HAKeeperTickInterval")
Expand All @@ -685,6 +713,12 @@ func (l *store) ticker(ctx context.Context) {
haTicker := time.NewTicker(l.cfg.HAKeeperCheckInterval.Duration)
defer haTicker.Stop()

// moving task schedule from the ticker normal routine to a
// separate goroutine can avoid the hakeeper's health check and tick update
// operations being blocked by task schedule, or the tick will be skipped and
// can not correctly estimate the time passing.
go l.tickerForTaskSchedule(ctx, l.cfg.HAKeeperCheckInterval.Duration)

for {
select {
case <-ticker.C:
Expand Down
16 changes: 12 additions & 4 deletions pkg/logservice/store_hakeeper_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,23 +111,32 @@ func (l *store) updateIDAlloc(count uint64) error {
return nil
}

func (l *store) hakeeperCheck() {
func (l *store) getCheckerStateFromLeader() (*pb.CheckerState, uint64) {
isLeader, term, err := l.isLeaderHAKeeper()
if err != nil {
l.runtime.Logger().Error("failed to get HAKeeper Leader ID", zap.Error(err))
return
return nil, term
}

if !isLeader {
l.taskScheduler.StopScheduleCronTask()
return
return nil, term
}
state, err := l.getCheckerState()
if err != nil {
// TODO: check whether this is temp error
l.runtime.Logger().Error("failed to get checker state", zap.Error(err))
return nil, term
}
return state, term
}

func (l *store) hakeeperCheck() {
state, term := l.getCheckerStateFromLeader()
if state == nil {
return
}

switch state.State {
case pb.HAKeeperCreated:
l.runtime.Logger().Warn("waiting for initial cluster info to be set, check skipped")
Expand All @@ -140,7 +149,6 @@ func (l *store) hakeeperCheck() {
l.handleBootstrapFailure()
case pb.HAKeeperRunning:
l.healthCheck(term, state)
l.taskSchedule(state)
default:
panic("unknown HAKeeper state")
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/logservice/store_hakeeper_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,37 @@ func TestHAKeeperCanBootstrapAndRepairShards(t *testing.T) {
runHAKeeperClusterTest(t, fn)
}

func TestGetCheckerStateFromLeader(t *testing.T) {
fn := func(t *testing.T, store *store) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
defer cancel()

for {
select {
case <-ctx.Done():
t.Error("test deadline reached")
return

default:
isLeader, termA, err := store.isLeaderHAKeeper()
state, termB := store.getCheckerStateFromLeader()
require.NoError(t, err)
assert.Equal(t, termB, termA)

if !isLeader {
assert.Equal(t, (*pb.CheckerState)(nil), state)
} else {
assert.NotEqual(t, (*pb.CheckerState)(nil), state)
return
}
time.Sleep(time.Second)
}
}
}

runHAKeeperStoreTest(t, false, fn)
}

func TestGetCheckerState(t *testing.T) {
fn := func(t *testing.T, store *store) {
state, err := store.getCheckerState()
Expand Down
92 changes: 92 additions & 0 deletions pkg/logservice/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package logservice
import (
"context"
"fmt"
"github.com/matrixorigin/matrixone/pkg/pb/task"
"math"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -298,6 +299,97 @@ func TestQueryLog(t *testing.T) {
runStoreTest(t, fn)
}

func proceedHAKeeperToRunning(t *testing.T, store *store) {
state, err := store.getCheckerState()
assert.NoError(t, err)
assert.Equal(t, pb.HAKeeperCreated, state.State)

err = store.setInitialClusterInfo(1, 1, 1)
assert.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

hb := store.getHeartbeatMessage()
_, err = store.addLogStoreHeartbeat(ctx, hb)
assert.NoError(t, err)

state, err = store.getCheckerState()
assert.NoError(t, err)
assert.Equal(t, pb.HAKeeperBootstrapping, state.State)

_, term, err := store.isLeaderHAKeeper()
assert.NoError(t, err)

store.bootstrap(term, state)
state, err = store.getCheckerState()

assert.NoError(t, err)
assert.Equal(t, pb.HAKeeperBootstrapCommandsReceived, state.State)

cmd, err := store.getCommandBatch(ctx, store.id())
require.NoError(t, err)
require.Equal(t, 1, len(cmd.Commands))
assert.True(t, cmd.Commands[0].Bootstrapping)

// handle startReplica to make sure logHeartbeat msg contain shards info,
// which used in store.checkBootstrap to determine if all log shards ready
service := &Service{store: store}
service.handleStartReplica(cmd.Commands[0])

for state.State != pb.HAKeeperRunning && store.bootstrapCheckCycles > 0 {
func() {
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()

_, err = store.addLogStoreHeartbeat(ctx, store.getHeartbeatMessage())
assert.NoError(t, err)

store.checkBootstrap(state)
state, err = store.getCheckerState()
assert.NoError(t, err)

time.Sleep(time.Millisecond * 100)
}()
}

assert.Equal(t, pb.HAKeeperRunning, state.State)
}

// test if the tickerForTaskSchedule can push forward these routine
func TestTickerForTaskSchedule(t *testing.T) {
fn := func(t *testing.T, store *store, taskService taskservice.TaskService) {
// making hakeeper state proceeds to running before test task schedule
tickerCxt, tickerCancel := context.WithCancel(context.Background())
defer tickerCancel()

//do task schedule background
go store.tickerForTaskSchedule(tickerCxt, time.Millisecond*10)

proceedHAKeeperToRunning(t, store)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

err := taskService.Create(ctx, task.TaskMetadata{ID: "1234"})
assert.NoError(t, err)

cnUUID := uuid.New().String()
cmd := pb.CNStoreHeartbeat{UUID: cnUUID}
_, err = store.addCNStoreHeartbeat(ctx, cmd)
assert.NoError(t, err)

// waiting the background taskSchedule operation
// schedule task that we created to CN node
time.Sleep(time.Millisecond * 100)

tasks, err := taskService.QueryTask(ctx, taskservice.WithTaskRunnerCond(taskservice.EQ, cnUUID))
assert.NoError(t, err)
assert.Equal(t, 1, len(tasks))
}

runHakeeperTaskServiceTest(t, fn)
}

func TestHAKeeperTick(t *testing.T) {
fn := func(t *testing.T, store *store) {
peers := make(map[uint64]dragonboat.Target)
Expand Down

0 comments on commit e9baca1

Please sign in to comment.