Skip to content

Commit

Permalink
stmtsummary: fix issue of concurrent map read and write (#35367)
Browse files Browse the repository at this point in the history
close #35340
  • Loading branch information
crazycs520 committed Jun 14, 2022
1 parent 2487dc7 commit f8a00f3
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 12 deletions.
34 changes: 34 additions & 0 deletions infoschema/cluster_tables_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -398,6 +399,39 @@ func TestStmtSummaryEvictedCountTable(t *testing.T) {
require.NoError(t, tk.QueryToErr("select * from information_schema.CLUSTER_STATEMENTS_SUMMARY_EVICTED"))
}

func TestStmtSummaryIssue35340(t *testing.T) {
var clean func()
s := new(clusterTablesSuite)
s.store, s.dom, clean = testkit.CreateMockStoreAndDomain(t)
defer clean()

tk := s.newTestKitWithRoot(t)
tk.MustExec("set global tidb_stmt_summary_refresh_interval=1800")
tk.MustExec("set global tidb_stmt_summary_max_stmt_count = 3000")
for i := 0; i < 100; i++ {
user := "user" + strconv.Itoa(i)
tk.MustExec(fmt.Sprintf("create user '%v'@'localhost'", user))
}
tk.MustExec("flush privileges")
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
tk := s.newTestKitWithRoot(t)
for j := 0; j < 100; j++ {
user := "user" + strconv.Itoa(j)
require.True(t, tk.Session().Auth(&auth.UserIdentity{
Username: user,
Hostname: "localhost",
}, nil, nil))
tk.MustQuery("select count(*) from information_schema.statements_summary;")
}
}()
}
wg.Wait()
}

func TestStmtSummaryHistoryTableWithUserTimezone(t *testing.T) {
// setup suite
var clean func()
Expand Down
28 changes: 16 additions & 12 deletions util/stmtsummary/reader.go
Expand Up @@ -129,11 +129,7 @@ func (ssr *stmtSummaryReader) getStmtByDigestRow(ssbd *stmtSummaryByDigest, begi

// `ssElement` is lazy expired, so expired elements could also be read.
// `beginTime` won't change since `ssElement` is created, so locking is not needed here.
isAuthed := true
if ssr.user != nil && !ssr.hasProcessPriv && ssElement != nil {
_, isAuthed = ssElement.authUsers[ssr.user.Username]
}
if ssElement == nil || ssElement.beginTime < beginTimeForCurInterval || !isAuthed {
if ssElement == nil || ssElement.beginTime < beginTimeForCurInterval {
return nil
}
return ssr.getStmtByDigestElementRow(ssElement, ssbd)
Expand All @@ -142,6 +138,14 @@ func (ssr *stmtSummaryReader) getStmtByDigestRow(ssbd *stmtSummaryByDigest, begi
func (ssr *stmtSummaryReader) getStmtByDigestElementRow(ssElement *stmtSummaryByDigestElement, ssbd *stmtSummaryByDigest) []types.Datum {
ssElement.Lock()
defer ssElement.Unlock()
isAuthed := true
if ssr.user != nil && !ssr.hasProcessPriv {
_, isAuthed = ssElement.authUsers[ssr.user.Username]
}
if !isAuthed {
return nil
}

datums := make([]types.Datum, len(ssr.columnValueFactories))
for i, factory := range ssr.columnValueFactories {
datums[i] = types.NewDatum(factory(ssr, ssElement, ssbd))
Expand All @@ -155,12 +159,9 @@ func (ssr *stmtSummaryReader) getStmtByDigestHistoryRow(ssbd *stmtSummaryByDiges

rows := make([][]types.Datum, 0, len(ssElements))
for _, ssElement := range ssElements {
isAuthed := true
if ssr.user != nil && !ssr.hasProcessPriv {
_, isAuthed = ssElement.authUsers[ssr.user.Username]
}
if isAuthed {
rows = append(rows, ssr.getStmtByDigestElementRow(ssElement, ssbd))
record := ssr.getStmtByDigestElementRow(ssElement, ssbd)
if record != nil {
rows = append(rows, record)
}
}
return rows
Expand Down Expand Up @@ -191,7 +192,10 @@ func (ssr *stmtSummaryReader) getStmtEvictedOtherHistoryRow(ssbde *stmtSummaryBy

ssbd := new(stmtSummaryByDigest)
for _, seElement := range seElements {
rows = append(rows, ssr.getStmtByDigestElementRow(seElement.otherSummary, ssbd))
record := ssr.getStmtByDigestElementRow(seElement.otherSummary, ssbd)
if record != nil {
rows = append(rows, record)
}
}
return rows
}
Expand Down

0 comments on commit f8a00f3

Please sign in to comment.