-
Notifications
You must be signed in to change notification settings - Fork 26
/
manager.go
151 lines (131 loc) · 3.64 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Copyright 2023 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0
package config
import (
"context"
"path/filepath"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/pingcap/TiProxy/lib/config"
"github.com/pingcap/TiProxy/lib/util/errors"
"github.com/pingcap/TiProxy/lib/util/waitgroup"
"github.com/tidwall/btree"
"go.uber.org/zap"
)
const (
pathPrefixNamespace = "ns"
pathPrefixConfig = "config"
)
var (
ErrNoResults = errors.Errorf("has no results")
ErrFail2Update = errors.Errorf("failed to update")
)
type KVValue struct {
Key string
Value []byte
}
type ConfigManager struct {
wg waitgroup.WaitGroup
cancel context.CancelFunc
logger *zap.Logger
kv *btree.BTreeG[KVValue]
wch *fsnotify.Watcher
overlay []byte
sts struct {
sync.Mutex
listeners []chan<- *config.Config
current *config.Config
checksum uint32
}
}
func NewConfigManager() *ConfigManager {
return &ConfigManager{}
}
func (e *ConfigManager) Init(ctx context.Context, logger *zap.Logger, configFile string, overlay *config.Config) error {
var err error
var nctx context.Context
nctx, e.cancel = context.WithCancel(ctx)
e.logger = logger
// for namespace persistence
e.kv = btree.NewBTreeG(func(a, b KVValue) bool {
return a.Key < b.Key
})
// for config watch
if overlay != nil {
e.overlay, err = overlay.ToBytes()
if err != nil {
return errors.WithStack(err)
}
}
if configFile != "" {
e.wch, err = fsnotify.NewWatcher()
if err != nil {
return errors.WithStack(err)
}
// Watch the parent dir, because vim/k8s or other apps may not edit files in-place:
// e.g. k8s configmap is a symlink of a symlink to a file, which will only trigger
// a remove event for the file.
parentDir := filepath.Dir(configFile)
if err := e.reloadConfigFile(configFile); err != nil {
return errors.WithStack(err)
}
if err := e.wch.Add(parentDir); err != nil {
return errors.WithStack(err)
}
e.wg.Run(func() {
// Some apps will trigger rename/remove events, which means they will re-create/rename
// the new file to the directory. Watch possibly stopped after rename/remove events.
// So, we use a tick to repeatedly add the parent dir to re-watch files.
ticker := time.NewTicker(200 * time.Millisecond)
for {
select {
case <-nctx.Done():
return
case err := <-e.wch.Errors:
e.logger.Warn("failed to watch config file", zap.Error(err))
case ev := <-e.wch.Events:
e.handleFSEvent(ev, configFile)
case <-ticker.C:
// There may be concurrent issues:
// 1. Remove the directory and the watcher removes the directory automatically
// 2. Create the directory and the file again within a tick
// 3. Add it to the watcher again, but the CREATE event is not sent
//
// Checking the watch list still have a concurrent issue because the watcher may remove the
// directory between WatchList and Add. We'll fix it later because it's complex to fix it entirely.
exists := len(e.wch.WatchList()) > 0
if err := e.wch.Add(parentDir); err != nil {
e.logger.Warn("failed to rewatch config file", zap.Error(err))
} else if !exists {
e.logger.Info("config file reloaded", zap.Error(e.reloadConfigFile(configFile)))
}
}
}
})
} else {
if err := e.SetTOMLConfig(nil); err != nil {
return errors.WithStack(err)
}
}
return nil
}
func (e *ConfigManager) Close() error {
var wcherr error
if e.cancel != nil {
e.cancel()
e.cancel = nil
}
if e.wch != nil {
wcherr = e.wch.Close()
e.wch = nil
}
e.sts.Lock()
for _, ch := range e.sts.listeners {
close(ch)
}
e.sts.listeners = nil
e.sts.Unlock()
e.wg.Wait()
return wcherr
}