Skip to content

Commit

Permalink
use tmp session in gc_worker. (#4453)
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu authored and hanfei1991 committed Sep 7, 2017
1 parent 4e58024 commit e442273
Showing 1 changed file with 38 additions and 42 deletions.
80 changes: 38 additions & 42 deletions store/tikv/gc_worker.go
Expand Up @@ -18,7 +18,7 @@ import (
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/juju/errors"
Expand All @@ -39,8 +39,7 @@ type GCWorker struct {
uuid string
desc string
store *tikvStore
session tidb.Session
gcIsRunning bool
gcIsRunning int32
lastFinish time.Time
cancel goctx.CancelFunc
done chan error
Expand All @@ -60,16 +59,13 @@ func NewGCWorker(store kv.Storage) (*GCWorker, error) {
uuid: strconv.FormatUint(ver.Ver, 16),
desc: fmt.Sprintf("host:%s, pid:%d, start at %s", hostName, os.Getpid(), time.Now()),
store: store.(*tikvStore),
gcIsRunning: false,
gcIsRunning: 0,
lastFinish: time.Now(),
done: make(chan error),
}
var ctx goctx.Context
ctx, worker.cancel = goctx.WithCancel(goctx.Background())
var wg sync.WaitGroup
wg.Add(1)
go worker.start(ctx, &wg)
wg.Wait() // Wait create session finish in worker, some test code depend on this to avoid race.
go worker.start(ctx)
return worker, nil
}

Expand Down Expand Up @@ -107,12 +103,9 @@ var gcVariableComments = map[string]string{
gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)",
}

func (w *GCWorker) start(ctx goctx.Context, wg *sync.WaitGroup) {
func (w *GCWorker) start(ctx goctx.Context) {
log.Infof("[gc worker] %s start.", w.uuid)

w.createSession()
w.tick(ctx) // Immediately tick once to initialize configs.
wg.Done()

ticker := time.NewTicker(gcWorkerTickInterval)
defer ticker.Stop()
Expand All @@ -121,7 +114,7 @@ func (w *GCWorker) start(ctx goctx.Context, wg *sync.WaitGroup) {
case <-ticker.C:
w.tick(ctx)
case err := <-w.done:
w.gcIsRunning = false
atomic.StoreInt32(&w.gcIsRunning, 0)
w.lastFinish = time.Now()
if err != nil {
log.Errorf("[gc worker] runGCJob error: %v", err)
Expand All @@ -134,18 +127,17 @@ func (w *GCWorker) start(ctx goctx.Context, wg *sync.WaitGroup) {
}
}

func (w *GCWorker) createSession() {
func createSession(store kv.Storage) tidb.Session {
for {
var err error
w.session, err = tidb.CreateSession(w.store)
session, err := tidb.CreateSession(store)
if err != nil {
log.Warnf("[gc worker] create session err: %v", err)
continue
}
// Disable privilege check for gc worker session.
privilege.BindPrivilegeManager(w.session, nil)
w.session.GetSessionVars().InRestrictedSQL = true
return
privilege.BindPrivilegeManager(session, nil)
session.GetSessionVars().InRestrictedSQL = true
return session
}
}

Expand Down Expand Up @@ -186,7 +178,7 @@ func (w *GCWorker) storeIsBootstrapped() bool {

// Leader of GC worker checks if it should start a GC job every tick.
func (w *GCWorker) leaderTick(ctx goctx.Context) error {
if w.gcIsRunning {
if !atomic.CompareAndSwapInt32(&w.gcIsRunning, 0, 1) {
return nil
}

Expand All @@ -201,7 +193,6 @@ func (w *GCWorker) leaderTick(ctx goctx.Context) error {
return nil
}

w.gcIsRunning = true
log.Infof("[gc worker] %s starts GC job, safePoint: %v", w.uuid, safePoint)
go w.runGCJob(ctx, safePoint)
return nil
Expand Down Expand Up @@ -317,7 +308,9 @@ func (w *GCWorker) runGCJob(ctx goctx.Context, safePoint uint64) {
func (w *GCWorker) deleteRanges(ctx goctx.Context, safePoint uint64) error {
gcWorkerCounter.WithLabelValues("delete_range").Inc()

ranges, err := ddl.LoadDeleteRanges(w.session, safePoint)
session := createSession(w.store)
ranges, err := ddl.LoadDeleteRanges(session, safePoint)
session.Close()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -381,7 +374,9 @@ func (w *GCWorker) deleteRanges(ctx goctx.Context, safePoint uint64) error {
}
startKey = endKey
}
err := ddl.CompleteDeleteRange(w.session, r)
session := createSession(w.store)
err := ddl.CompleteDeleteRange(session, r)
session.Close()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -529,24 +524,26 @@ func doGC(ctx goctx.Context, store *tikvStore, safePoint uint64, identifier stri

func (w *GCWorker) checkLeader() (bool, error) {
gcWorkerCounter.WithLabelValues("check_leader").Inc()
session := createSession(w.store)
defer session.Close()

_, err := w.session.Execute("BEGIN")
_, err := session.Execute("BEGIN")
if err != nil {
return false, errors.Trace(err)
}
leader, err := w.loadValueFromSysTable(gcLeaderUUIDKey)
if err != nil {
w.session.Execute("ROLLBACK")
session.Execute("ROLLBACK")
return false, errors.Trace(err)
}
log.Debugf("[gc worker] got leader: %s", leader)
if leader == w.uuid {
err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease))
if err != nil {
w.session.Execute("ROLLBACK")
session.Execute("ROLLBACK")
return false, errors.Trace(err)
}
_, err = w.session.Execute("COMMIT")
_, err = session.Execute("COMMIT")
if err != nil {
return false, errors.Trace(err)
}
Expand All @@ -562,26 +559,26 @@ func (w *GCWorker) checkLeader() (bool, error) {

err = w.saveValueToSysTable(gcLeaderUUIDKey, w.uuid)
if err != nil {
w.session.Execute("ROLLBACK")
session.Execute("ROLLBACK")
return false, errors.Trace(err)
}
err = w.saveValueToSysTable(gcLeaderDescKey, w.desc)
if err != nil {
w.session.Execute("ROLLBACK")
session.Execute("ROLLBACK")
return false, errors.Trace(err)
}
err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease))
if err != nil {
w.session.Execute("ROLLBACK")
session.Execute("ROLLBACK")
return false, errors.Trace(err)
}
_, err = w.session.Execute("COMMIT")
_, err = session.Execute("COMMIT")
if err != nil {
return false, errors.Trace(err)
}
return true, nil
}
w.session.Execute("ROLLBACK")
session.Execute("ROLLBACK")
return false, nil
}

Expand Down Expand Up @@ -641,8 +638,11 @@ func (w *GCWorker) loadDurationWithDefault(key string, def time.Duration) (*time
}

func (w *GCWorker) loadValueFromSysTable(key string) (string, error) {
session := createSession(w.store)
defer session.Close()

stmt := fmt.Sprintf(`SELECT (variable_value) FROM mysql.tidb WHERE variable_name='%s' FOR UPDATE`, key)
rs, err := w.session.Execute(stmt)
rs, err := session.Execute(stmt)
if err != nil {
return "", errors.Trace(err)
}
Expand All @@ -660,11 +660,14 @@ func (w *GCWorker) loadValueFromSysTable(key string) (string, error) {
}

func (w *GCWorker) saveValueToSysTable(key, value string) error {
session := createSession(w.store)
defer session.Close()

stmt := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`,
key, value, gcVariableComments[key])
_, err := w.session.Execute(stmt)
_, err := session.Execute(stmt)
log.Debugf("[gc worker] save kv, %s:%s %v", key, value, err)
return errors.Trace(err)
}
Expand All @@ -688,17 +691,10 @@ func NewMockGCWorker(store kv.Storage) (*MockGCWorker, error) {
uuid: strconv.FormatUint(ver.Ver, 16),
desc: fmt.Sprintf("host:%s, pid:%d, start at %s", hostName, os.Getpid(), time.Now()),
store: store.(*tikvStore),
gcIsRunning: false,
gcIsRunning: 0,
lastFinish: time.Now(),
done: make(chan error),
}
worker.session, err = tidb.CreateSession(worker.store)
if err != nil {
log.Errorf("initialize MockGCWorker session fail: %s", err)
return nil, errors.Trace(err)
}
privilege.BindPrivilegeManager(worker.session, nil)
worker.session.GetSessionVars().InRestrictedSQL = true
return &MockGCWorker{worker: worker}, nil
}

Expand Down

0 comments on commit e442273

Please sign in to comment.