-
Notifications
You must be signed in to change notification settings - Fork 0
/
manager.go
106 lines (90 loc) · 2.01 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/*
* NeoDB
*
* Copyright 2018 The Radon Authors.
* Copyright 2021-2030 The NeoDB Authors.
* Code is licensed under the GPLv3.
*
*/
package proxy
import (
"fmt"
"sync"
"time"
"github.com/sealdb/neodb/config"
"github.com/sealdb/mysqlstack/sqlparser"
"github.com/sealdb/mysqlstack/xlog"
)
type Manager struct {
log *xlog.Log
sessions *Sessions
conf *config.ProxyConfig
done chan bool
ticker *time.Ticker
wg sync.WaitGroup
mu sync.RWMutex
}
// https://www.percona.com/doc/percona-server/8.0/management/kill_idle_trx.html
func (mgr *Manager) killIdleTxn() error {
log := mgr.log
ss := mgr.sessions
ssInTxn := ss.SnapshotTxn()
for _, tsi := range ssInTxn {
if mgr.conf.IdleTxnTimeout == 0 {
break
}
if tsi.Time > mgr.conf.IdleTxnTimeout {
if tsi.Info == sqlparser.BeginTxnStr || tsi.Info == "" {
log.Warning("the session in transaction will be killed, the session info is: %v:", tsi)
str := fmt.Sprintf("the session in transaction is idle for a long time: %d s > %d s.", tsi.Time, mgr.conf.IdleTxnTimeout)
ss.Kill(tsi.ID, str)
}
}
}
return nil
}
func (mgr *Manager) manageMain() error {
mgr.mu.Lock()
defer mgr.mu.Unlock()
if err := mgr.killIdleTxn(); err != nil {
return err
}
return nil
}
func (mgr *Manager) manage() {
defer mgr.ticker.Stop()
for {
select {
case <-mgr.ticker.C:
mgr.manageMain()
case <-mgr.done:
return
}
}
}
// Init used to init manager goroutine.
func (mgr *Manager) Init() error {
log := mgr.log
mgr.wg.Add(1)
go func(mgr *Manager) {
defer mgr.wg.Done()
mgr.manage()
}(mgr)
log.Info("Manager.init.done")
return nil
}
// Close used to close the goroutine.
func (mgr *Manager) Close() {
close(mgr.done)
mgr.wg.Wait()
}
// NewManager creates new Manager.
func NewManager(log *xlog.Log, sessions *Sessions, conf *config.ProxyConfig) *Manager {
return &Manager{
log: log,
sessions: sessions,
conf: conf,
done: make(chan bool),
ticker: time.NewTicker(time.Duration(time.Second)),
}
}