-
Notifications
You must be signed in to change notification settings - Fork 0
/
watcher.go
121 lines (110 loc) · 2.82 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package runtime
import (
"fmt"
"github.com/fsnotify/fsnotify"
"github.com/spf13/viper"
"github.com/stubborn-gaga-0805/prepare2go/configs/conf"
"github.com/stubborn-gaga-0805/prepare2go/internal/repo"
"github.com/stubborn-gaga-0805/prepare2go/pkg/mysql"
"github.com/stubborn-gaga-0805/prepare2go/pkg/rabbitmq"
pkgredis "github.com/stubborn-gaga-0805/prepare2go/pkg/redis"
"golang.org/x/net/context"
"os/signal"
"syscall"
)
func ConfigWatcher(ctx context.Context) {
signalCtx, signalStop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer signalStop()
// 用viper先读取文件
viper.SetConfigType("yaml")
viper.SetConfigFile(conf.GetConfigPath())
fmt.Println("开始监听配置文件变化...")
viper.OnConfigChange(func(in fsnotify.Event) {
// 遍历根节点, 针对不同配置的变更要发送的不同的channel处理
for key, _ := range viper.AllSettings() {
switch key {
case dataConfigKey:
dataWatcherChan <- dataConfigKey
case mqConfigKey:
mqWatcherChan <- mqConfigKey
}
}
})
viper.WatchConfig()
go listener(ctx)
// 阻塞, 直到收到Ctrl+C, 然后关闭所有监听
for {
select {
case <-signalCtx.Done():
stopListenerChan <- true
return
}
}
}
// 监听变化
func listener(ctx context.Context) {
fmt.Println("\nConfig Listener Start...")
for {
select {
case key := <-dataWatcherChan:
dataWatcher(ctx, viper.Sub(key))
case key := <-mqWatcherChan:
mqWatcher(ctx, viper.Sub(key))
case <-stopListenerChan:
fmt.Println("\nConfig Listener Stopped...")
return
}
}
}
func dataWatcher(ctx context.Context, v *viper.Viper) {
for key, _ := range v.AllSettings() {
// 判断mysql变更
if v.Sub(key).IsSet("driver") {
var db mysql.DB
if err := v.Sub("db").Unmarshal(&db); err != nil {
fmt.Printf("Unmarshal db err: %v\n", err)
return
}
if conf.GetConfig().Data.Db.NotEquals(db) {
fmt.Println("DB配置变动...")
cfg := conf.GetConfig()
cfg.Data.Db = db
conf.SetConfig(cfg)
repo.ReloadDB(ctx)
}
}
// 判断redis变更
if v.Sub(key).IsSet("db") {
var redis pkgredis.Redis
if err := v.Sub("redis").Unmarshal(&redis); err != nil {
fmt.Printf("Unmarshal redis err: %v\n", err)
return
}
if redis != conf.GetConfig().Data.Redis {
fmt.Println("redis配置变动...")
cfg := conf.GetConfig()
cfg.Data.Redis = redis
conf.SetConfig(cfg)
repo.ReloadRedis(ctx)
}
}
}
return
}
func mqWatcher(ctx context.Context, v *viper.Viper) {
var (
rabbitMQ rabbitmq.Config
)
if err := v.Sub("rabbitMQ").Unmarshal(&rabbitMQ); err != nil {
fmt.Printf("Unmarshal rabbitMQ err: %v\n", err)
return
}
// 判断
if rabbitMQ != conf.GetConfig().MQ.RabbitMQ {
fmt.Println("RabbitMQ配置变动")
cfg := conf.GetConfig()
cfg.MQ.RabbitMQ = rabbitMQ
conf.SetConfig(cfg)
}
return
}