Skip to content

Commit

Permalink
Merge pull request #656 from signal18/capture
Browse files Browse the repository at this point in the history
Monitoring Capture Fix
  • Loading branch information
svaroqui committed Jun 21, 2024
2 parents 5fd4a07 + a7ac766 commit d9b58b3
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 23 deletions.
2 changes: 2 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,8 @@ func (cluster *Cluster) StateProcessing() {
mybcksrv := cluster.GetBackupServer()
master := cluster.GetMaster()
for _, s := range cstates {
//Remove from captured state if already resolved, so it will capture next occurence
cluster.GetStateMachine().CapturedState.Delete(s.ErrKey)
servertoreseed := cluster.GetServerFromURL(s.ServerUrl)
if s.ErrKey == "WARN0073" {
for _, s := range cluster.Servers {
Expand Down
26 changes: 21 additions & 5 deletions cluster/cluster_chk.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,15 +460,31 @@ func (cluster *Cluster) IsCurrentGTIDSync(m *ServerMonitor, s *ServerMonitor) bo
}
}

func (cluster *Cluster) CheckCapture(state state.State) {
func (cluster *Cluster) CheckCapture(st state.State) {
if !cluster.Conf.MonitorCapture {
return
}
if strings.Contains(cluster.Conf.MonitorCaptureTrigger, state.ErrKey) {
if state.ServerUrl != "" {
srv := cluster.GetServerFromURL(state.ServerUrl)

if strings.Contains(cluster.Conf.MonitorCaptureTrigger, st.ErrKey) {

var cstate *state.CapturedState

// Check captured state
SM := cluster.GetStateMachine()
if cs, ok := SM.GetCapturedState(st.ErrKey); ok {
cstate = cs
} else {
cstate = new(state.CapturedState)
cstate.Parse(st)
SM.AddToCapturedState(st.ErrKey, cstate)
}

//Capture if the server is not logged
if st.ServerUrl != "" && cstate.Contains(st.ServerUrl) == false {
srv := cluster.GetServerFromURL(st.ServerUrl)
if srv != nil {
srv.Capture()
//Add entry to captured state
srv.Capture(cstate)
}
}
}
Expand Down
29 changes: 22 additions & 7 deletions cluster/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1370,11 +1370,15 @@ func (server *ServerMonitor) UnInstallPlugin(name string) error {
return nil
}

func (server *ServerMonitor) Capture() error {
func (server *ServerMonitor) Capture(cstate *state.CapturedState) error {
cluster := server.ClusterGroup
if server.InCaptureMode {
return nil
}
//Log the server url
cstate.ServerURLs = append(cstate.ServerURLs, server.URL)
// cluster.GetStateMachine().CapturedState.Store(cstate.ErrKey, cstate)
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Capture %s on server %s", cstate.ErrKey, server.URL)

go server.CaptureLoop(cluster.GetStateMachine().GetHeartbeats())
go server.JobCapturePurge(cluster.Conf.WorkingDir+"/"+cluster.Name, cluster.Conf.MonitorCaptureFileKeep)
Expand Down Expand Up @@ -1430,8 +1434,11 @@ func (server *ServerMonitor) ReloadSaveInfosVariables() error {
}

func (server *ServerMonitor) CaptureLoop(start int64) {
server.InCaptureMode = true
cluster := server.ClusterGroup

server.SetInCaptureMode(true)
defer server.SetInCaptureMode(false)

type Save struct {
ProcessList []dbhelper.Processlist `json:"processlist"`
InnoDBStatus string `json:"innodbstatus"`
Expand All @@ -1442,7 +1449,8 @@ func (server *ServerMonitor) CaptureLoop(start int64) {
t := time.Now()
logs := ""
var err error
for true {
var curHB int64 = start
for {

var clsave Save
clsave.ProcessList,
Expand All @@ -1462,16 +1470,23 @@ func (server *ServerMonitor) CaptureLoop(start int64) {
cluster.LogSQL(logs, err, server.URL, "CaptureLoop", config.LvlErr, "Failed Slave Status for server %s: %s ", server.URL, err)

saveJSON, _ := json.MarshalIndent(clsave, "", "\t")
err := ioutil.WriteFile(cluster.Conf.WorkingDir+"/"+cluster.Name+"/capture_"+server.Name+"_"+t.Format("20060102150405")+".json", saveJSON, 0644)
err = os.WriteFile(cluster.Conf.WorkingDir+"/"+cluster.Name+"/capture_"+server.Name+"_"+t.Format("20060102150405")+".json", saveJSON, 0644)
if err != nil {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Exit loop %s with error %v\n", server.URL, err)
return
}
if cluster.GetStateMachine().GetHeartbeats() < start+5 {

for curHB == cluster.GetStateMachine().GetHeartbeats() {
time.Sleep(10 * time.Millisecond)
}

curHB = cluster.GetStateMachine().GetHeartbeats()

if curHB >= start+5 {
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Exit loop %s. Start HB: %d, Stop HB: %d ", server.URL, start, curHB-1)
break
}
time.Sleep(40 * time.Millisecond)
}
server.InCaptureMode = false
}

func (server *ServerMonitor) RotateSystemLogs() {
Expand Down
4 changes: 4 additions & 0 deletions cluster/srv_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,7 @@ func (server *ServerMonitor) SetBackingUpBinaryLog(value bool) {
func (server *ServerMonitor) SetBinaryLogDir(value string) {
server.BinaryLogDir = value
}

func (server *ServerMonitor) SetInCaptureMode(value bool) {
server.InCaptureMode = value
}
73 changes: 62 additions & 11 deletions utils/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package state

import (
"fmt"
"slices"
"sort"
"strconv"
"sync"
Expand All @@ -33,6 +34,26 @@ type StateHttp struct {

type Map map[string]State

type CapturedState struct {
ErrKey string
ErrType string
ErrDesc string
ErrFrom string
ServerURLs []string
}

func (cs *CapturedState) Contains(url string) bool {
return slices.Contains(cs.ServerURLs, url)
}

func (cs *CapturedState) Parse(s State) {
cs.ErrKey = s.ErrKey
cs.ErrType = s.ErrType
cs.ErrDesc = s.ErrDesc
cs.ErrFrom = s.ErrFrom
cs.ServerURLs = make([]string, 0)
}

func NewMap() *Map {
m := make(Map)
return &m
Expand Down Expand Up @@ -62,16 +83,17 @@ func (m Map) Search(key string) bool {
}

type StateMachine struct {
CurState *Map `json:"-"`
OldState *Map `json:"-"`
Discovered bool `json:"discovered"`
sla Sla `json:"-"`
lastState int64 `json:"-"`
heartbeats int64 `json:"-"`
InFailover bool `json:"inFailover"`
InSchemaMonitor bool `json:"inSchemaMonitor"`
SchemaMonitorStartTime int64 `json:"-"`
SchemaMonitorEndTime int64 `json:"-"`
CurState *Map `json:"-"`
OldState *Map `json:"-"`
CapturedState *sync.Map `json:"-"`
Discovered bool `json:"discovered"`
sla Sla `json:"-"`
lastState int64 `json:"-"`
heartbeats int64 `json:"-"`
InFailover bool `json:"inFailover"`
InSchemaMonitor bool `json:"inSchemaMonitor"`
SchemaMonitorStartTime int64 `json:"-"`
SchemaMonitorEndTime int64 `json:"-"`
sync.Mutex
}

Expand Down Expand Up @@ -120,9 +142,9 @@ func (SM *StateMachine) SetSla(mySla Sla) {
}

func (SM *StateMachine) Init() {

SM.CurState = NewMap()
SM.OldState = NewMap()
SM.CapturedState = new(sync.Map)
SM.Discovered = false
SM.sla.Init()
SM.lastState = 0
Expand Down Expand Up @@ -407,3 +429,32 @@ func (SM *StateMachine) PreserveState(key string) {
SM.AddState(key, value)
}
}

func (SM *StateMachine) AddToCapturedState(key string, cstate *CapturedState) {
_, ok := SM.CapturedState.Load(key)
if !ok {
SM.CapturedState.Store(key, cstate)
}
}

func (SM *StateMachine) DeleteCapturedState(key string) {
SM.CapturedState.Delete(key)
}

func (SM *StateMachine) SearchCapturedState(key string) bool {
_, ok := SM.CapturedState.Load(key)
if ok {
return true
} else {
return false
}
}

func (SM *StateMachine) GetCapturedState(key string) (*CapturedState, bool) {
cs, ok := SM.CapturedState.Load(key)
if ok {
return cs.(*CapturedState), true
} else {
return nil, false
}
}

0 comments on commit d9b58b3

Please sign in to comment.