-
Notifications
You must be signed in to change notification settings - Fork 316
/
recovery.go
156 lines (134 loc) · 4.01 KB
/
recovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package db
import (
"encoding/json"
"fmt"
"os"
"sort"
"time"
"github.com/rudderlabs/rudder-server/services/alert"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/utils/logger"
)
const (
normalMode = "normal"
degradedMode = "degraded"
)
type RecoveryHandler interface {
RecordAppStart(int64)
HasThresholdReached() bool
Handle()
}
var (
CurrentMode = normalMode // default mode
storagePath string
)
// RecoveryDataT : DS to store the recovery process data
type RecoveryDataT struct {
StartTimes []int64
ReadableStartTimes []string
DegradedModeStartTimes []int64
ReadableDegradedModeStartTimes []string
Mode string
}
var pkgLogger logger.Logger
func Init() {
config.RegisterStringConfigVariable("/tmp/recovery_data.json", &storagePath, false, "recovery.storagePath")
pkgLogger = logger.NewLogger().Child("db").Child("recovery")
}
func getRecoveryData() (RecoveryDataT, error) {
var recoveryData RecoveryDataT
data, err := os.ReadFile(storagePath)
if os.IsNotExist(err) {
defaultRecoveryJSON := "{\"mode\":\"" + normalMode + "\"}"
data = []byte(defaultRecoveryJSON)
} else if err != nil {
return recoveryData, err
}
err = json.Unmarshal(data, &recoveryData)
if err != nil {
pkgLogger.Errorf("Failed to Unmarshall %s. Error: %v", storagePath, err)
if renameErr := os.Rename(storagePath, fmt.Sprintf("%s.bkp", storagePath)); renameErr != nil {
pkgLogger.Errorf("Failed to back up: %s. Error: %v", storagePath, err)
}
recoveryData = RecoveryDataT{Mode: normalMode}
}
return recoveryData, nil
}
func saveRecoveryData(recoveryData RecoveryDataT) error {
recoveryDataJSON, err := json.MarshalIndent(&recoveryData, "", " ")
if err != nil {
return err
}
return os.WriteFile(storagePath, recoveryDataJSON, 0o644)
}
// IsNormalMode checks if the current mode is normal
func IsNormalMode() bool {
return CurrentMode == normalMode
}
// CheckOccurrences : check if this occurred numTimes times in numSecs seconds
func CheckOccurrences(occurrences []int64, numTimes, numSecs int) (occurred bool) {
sort.Slice(occurrences, func(i, j int) bool {
return occurrences[i] < occurrences[j]
})
recentOccurrences := 0
checkPointTime := time.Now().Unix() - int64(numSecs)
for i := len(occurrences) - 1; i >= 0; i-- {
if occurrences[i] < checkPointTime {
break
}
recentOccurrences++
}
if recentOccurrences >= numTimes {
occurred = true
}
return
}
func getForceRecoveryMode(forceNormal, forceDegraded bool) string {
switch {
case forceNormal:
return normalMode
case forceDegraded:
return degradedMode
}
return ""
}
func NewRecoveryHandler(recoveryData *RecoveryDataT) RecoveryHandler {
var recoveryHandler RecoveryHandler
switch recoveryData.Mode {
case normalMode:
recoveryHandler = &NormalModeHandler{recoveryData: recoveryData}
case degradedMode:
recoveryHandler = &DegradedModeHandler{recoveryData: recoveryData}
default:
// If the recovery mode is not one of the above modes, defaulting to degraded mode.
pkgLogger.Info("DB Recovery: Invalid recovery mode. Defaulting to degraded mode.")
recoveryData.Mode = degradedMode
recoveryHandler = &DegradedModeHandler{recoveryData: recoveryData}
}
return recoveryHandler
}
func alertOps(mode string) {
instanceName := config.GetString("INSTANCE_ID", "")
alertManager, err := alert.New()
if err != nil {
pkgLogger.Errorf("Unable to initialize the alertManager: %s", err.Error())
} else {
alertManager.Alert(fmt.Sprintf("Dataplane server %s entered %s mode", instanceName, mode))
}
}
// sendRecoveryModeStat sends the recovery mode metric every 10 seconds
func sendRecoveryModeStat(appType string) {
recoveryModeStat := stats.Default.NewTaggedStat("recovery.mode_normal", stats.GaugeType, stats.Tags{
"appType": appType,
})
for {
time.Sleep(10 * time.Second)
switch CurrentMode {
case normalMode:
recoveryModeStat.Gauge(1)
case degradedMode:
recoveryModeStat.Gauge(2)
}
}
}