Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add lock for runtime stats to fix panic caused by concurrent execution #18983

Merged
merged 7 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ func (s *testSuiteJoin1) TestJoinPanic(c *C) {
tk.MustQuery("SELECT * FROM events e JOIN (SELECT MAX(clock) AS clock FROM events e2 GROUP BY e2.source) e3 ON e3.clock=e.clock")
err := tk.ExecToErr("SELECT * FROM events e JOIN (SELECT clock FROM events e2 GROUP BY e2.source) e3 ON e3.clock=e.clock")
c.Check(err, NotNil)

// Test for PR 18983, use to detect race.
tk.MustExec("use test")
tk.MustExec("drop table if exists tpj1,tpj2;")
tk.MustExec("create table tpj1 (id int, b int, unique index (id));")
tk.MustExec("create table tpj2 (id int, b int, unique index (id));")
tk.MustExec("insert into tpj1 values (1,1);")
tk.MustExec("insert into tpj2 values (1,1);")
tk.MustQuery("select tpj1.b,tpj2.b from tpj1 left join tpj2 on tpj1.id=tpj2.id where tpj1.id=1;").Check(testkit.Rows("1 1"))
}

func (s *testSuite) TestJoinInDisk(c *C) {
Expand Down
40 changes: 25 additions & 15 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ type tikvSnapshot struct {
sync.RWMutex
hitCnt int64
cached map[string][]byte
stats *SnapshotRuntimeStats
}
stats *SnapshotRuntimeStats
}

// newTiKVSnapshot creates a snapshot of an TiKV store.
Expand Down Expand Up @@ -238,7 +238,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
minCommitTSPushed: &s.minCommitTSPushed,
Client: s.store.client,
}
if s.stats != nil {
if s.mu.stats != nil {
cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats)
defer func() {
s.mergeRegionRequestStats(cli.stats)
Expand Down Expand Up @@ -367,7 +367,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
Client: s.store.client,
resolveLite: true,
}
if s.stats != nil {
if s.mu.stats != nil {
cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats)
defer func() {
s.mergeRegionRequestStats(cli.stats)
Expand Down Expand Up @@ -452,7 +452,9 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) {
case kv.TaskID:
s.taskID = val.(uint64)
case kv.CollectRuntimeStats:
s.stats = val.(*SnapshotRuntimeStats)
s.mu.Lock()
s.mu.stats = val.(*SnapshotRuntimeStats)
s.mu.Unlock()
}
}

Expand All @@ -462,7 +464,9 @@ func (s *tikvSnapshot) DelOption(opt kv.Option) {
case kv.ReplicaRead:
s.replicaRead = kv.ReplicaReadLeader
case kv.CollectRuntimeStats:
s.stats = nil
s.mu.Lock()
s.mu.stats = nil
s.mu.Unlock()
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -579,35 +583,41 @@ func prettyWriteKey(buf *bytes.Buffer, key []byte) {
}

func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) {
if s.stats == nil || bo.totalSleep == 0 {
if s.mu.stats == nil || bo.totalSleep == 0 {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.stats.backoffSleepMS == nil {
s.stats.backoffSleepMS = bo.backoffSleepMS
s.stats.backoffTimes = bo.backoffTimes
if s.mu.stats == nil {
return
}
if s.mu.stats.backoffSleepMS == nil {
s.mu.stats.backoffSleepMS = bo.backoffSleepMS
s.mu.stats.backoffTimes = bo.backoffTimes
return
}
for k, v := range bo.backoffSleepMS {
s.stats.backoffSleepMS[k] += v
s.mu.stats.backoffSleepMS[k] += v
}
for k, v := range bo.backoffTimes {
s.stats.backoffTimes[k] += v
s.mu.stats.backoffTimes[k] += v
}
}

func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats) {
s.mu.Lock()
defer s.mu.Unlock()
if s.stats.rpcStats == nil {
s.stats.rpcStats = stats
if s.mu.stats == nil {
return
}
if s.mu.stats.rpcStats == nil {
s.mu.stats.rpcStats = stats
return
}
for k, v := range stats {
stat, ok := s.stats.rpcStats[k]
stat, ok := s.mu.stats.rpcStats[k]
if !ok {
s.stats.rpcStats[k] = v
s.mu.stats.rpcStats[k] = v
continue
}
stat.count += v.count
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,5 +316,5 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) {
snapshot.recordBackoffInfo(bo)
snapshot.recordBackoffInfo(bo)
expect := "Get:{num_rpc:4, total_time:2.002s},txnLockFast_backoff:{num:2, total_time:60 ms}"
c.Assert(snapshot.stats.String(), Equals, expect)
c.Assert(snapshot.mu.stats.String(), Equals, expect)
}