Skip to content

Commit

Permalink
lightning: fix data lost when partial write and server is busy (#43769)…
Browse files Browse the repository at this point in the history
… (#43833)

close #43768
  • Loading branch information
ti-chi-bot committed May 15, 2023
1 parent 8612338 commit e39eb2c
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 3 deletions.
6 changes: 5 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,11 @@ func (local *Backend) executeJob(
job.lastRetryableErr = err
return nil
}
if job.stage == needRescan {
// if the job.stage successfully converted into "ingested", it means
// these data are ingested into TiKV so we handle remaining data.
// For other job.stage, the job should be sent back to caller to retry
// later.
if job.stage != ingested {
return nil
}

Expand Down
131 changes: 129 additions & 2 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ type mockImportClient struct {
sst.ImportSSTClient
store *metapb.Store
resp *sst.IngestResponse
onceResp *atomic.Pointer[sst.IngestResponse]
err error
retry int
cnt int
Expand Down Expand Up @@ -709,8 +710,17 @@ func (c *mockImportClient) MultiIngest(_ context.Context, req *sst.MultiIngestRe
if c.apiInvokeRecorder != nil {
c.apiInvokeRecorder["MultiIngest"] = append(c.apiInvokeRecorder["MultiIngest"], c.store.GetId())
}
if c.cnt < c.retry && (c.err != nil || c.resp != nil) {
return c.resp, c.err
if c.cnt < c.retry {
if c.err != nil {
return c.resp, c.err
}
if c.onceResp != nil {
resp := c.onceResp.Swap(&sst.IngestResponse{})
return resp, nil
}
if c.resp != nil {
return c.resp, nil
}
}

if !c.multiIngestCheckFn(c.store) {
Expand Down Expand Up @@ -1452,6 +1462,123 @@ func TestPartialWriteIngestErrorWillPanic(t *testing.T) {
require.Equal(t, []uint64{1}, apiInvokeRecorder["MultiIngest"])
}

func TestPartialWriteIngestBusy(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

apiInvokeRecorder := map[string][]uint64{}
notLeaderResp := &sst.IngestResponse{
Error: &errorpb.Error{
ServerIsBusy: &errorpb.ServerIsBusy{},
}}
onceResp := &atomic.Pointer[sst.IngestResponse]{}
onceResp.Store(notLeaderResp)

local := &Backend{
splitCli: initTestSplitClient3Replica([][]byte{{}, {'c'}}, nil),
importClientFactory: &mockImportClientFactory{
stores: []*metapb.Store{
{Id: 1}, {Id: 2}, {Id: 3},
},
createClientFn: func(store *metapb.Store) sst.ImportSSTClient {
importCli := newMockImportClient()
importCli.store = store
importCli.apiInvokeRecorder = apiInvokeRecorder
if store.Id == 1 {
importCli.retry = 1
importCli.onceResp = onceResp
}
return importCli
},
},
logger: log.L(),
writeLimiter: noopStoreWriteLimiter{},
bufferPool: membuf.NewPool(),
supportMultiIngest: true,
tikvCodec: keyspace.CodecV1,
}

db, tmpPath := makePebbleDB(t, nil)
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel2 := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
cancel: cancel2,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
err := f.db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
err = f.db.Set([]byte("a2"), []byte("a2"), nil)
require.NoError(t, err)

jobCh := make(chan *regionJob, 10)

partialWriteJob := &regionJob{
keyRange: Range{start: []byte("a"), end: []byte("c")},
region: &split.RegionInfo{
Region: &metapb.Region{
Id: 1,
Peers: []*metapb.Peer{
{Id: 1, StoreId: 1}, {Id: 2, StoreId: 2}, {Id: 3, StoreId: 3},
},
StartKey: []byte("a"),
EndKey: []byte("c"),
},
Leader: &metapb.Peer{Id: 1, StoreId: 1},
},
stage: regionScanned,
engine: f,
// use small regionSplitSize to trigger partial write
regionSplitSize: 1,
}
var jobWg sync.WaitGroup
jobWg.Add(1)
jobCh <- partialWriteJob

var wg sync.WaitGroup
wg.Add(1)
jobOutCh := make(chan *regionJob)
go func() {
defer wg.Done()
for {
job := <-jobOutCh
switch job.stage {
case wrote:
// mimic retry later
jobCh <- job
case ingested:
// partially write will change the start key
require.Equal(t, []byte("a2"), job.keyRange.start)
require.Equal(t, []byte("c"), job.keyRange.end)
jobWg.Done()
return
default:
require.Fail(t, "job stage %s is not expected, job: %v", job.stage, job)
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
err := local.startWorker(ctx, jobCh, jobOutCh, &jobWg)
require.NoError(t, err)
}()

jobWg.Wait()
cancel()
wg.Wait()

require.Equal(t, int64(2), f.importedKVCount.Load())

require.Equal(t, []uint64{1, 2, 3, 1, 2, 3}, apiInvokeRecorder["Write"])
require.Equal(t, []uint64{1, 1, 1}, apiInvokeRecorder["MultiIngest"])
}

// mockGetSizeProperties mocks that 50MB * 20 SST file.
func mockGetSizeProperties(log.Logger, *pebble.DB, KeyAdapter) (*sizeProperties, error) {
props := newSizeProperties()
Expand Down

0 comments on commit e39eb2c

Please sign in to comment.