-
-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
filer_replication.go
148 lines (127 loc) · 4.16 KB
/
filer_replication.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package command
import (
"context"
"strings"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/replication"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/sub"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func init() {
cmdFilerReplicate.Run = runFilerReplicate // break init cycle
}
var cmdFilerReplicate = &Command{
UsageLine: "filer.replicate",
Short: "replicate file changes to another destination",
Long: `replicate file changes to another destination
filer.replicate listens on filer notifications. If any file is updated, it will fetch the updated content,
and write to the other destination.
Run "weed scaffold -config=replication" to generate a replication.toml file and customize the parameters.
`,
}
func runFilerReplicate(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadConfiguration("replication", true)
util.LoadConfiguration("notification", true)
config := util.GetViper()
var notificationInput sub.NotificationInput
validateOneEnabledInput(config)
for _, input := range sub.NotificationInputs {
if config.GetBool("notification." + input.GetName() + ".enabled") {
if err := input.Initialize(config, "notification."+input.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize notification input for %s: %+v",
input.GetName(), err)
}
glog.V(0).Infof("Configure notification input to %s", input.GetName())
notificationInput = input
break
}
}
if notificationInput == nil {
println("No notification is defined in notification.toml file.")
println("Please follow 'weed scaffold -config=notification' to see example notification configurations.")
return true
}
// avoid recursive replication
if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") {
if config.GetString("source.filer.grpcAddress") == config.GetString("sink.filer.grpcAddress") {
fromDir := config.GetString("source.filer.directory")
toDir := config.GetString("sink.filer.directory")
if strings.HasPrefix(toDir, fromDir) {
glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
}
}
}
dataSink := findSink(config)
if dataSink == nil {
println("no data sink configured in replication.toml:")
for _, sk := range sink.Sinks {
println(" " + sk.GetName())
}
return true
}
replicator := replication.NewReplicator(config, "source.filer.", dataSink)
for {
key, m, onSuccessFn, onFailureFn, err := notificationInput.ReceiveMessage()
if err != nil {
glog.Errorf("receive %s: %+v", key, err)
if onFailureFn != nil {
onFailureFn()
}
continue
}
if key == "" {
// long poll received no messages
if onSuccessFn != nil {
onSuccessFn()
}
continue
}
if m.OldEntry != nil && m.NewEntry == nil {
glog.V(1).Infof("delete: %s", key)
} else if m.OldEntry == nil && m.NewEntry != nil {
glog.V(1).Infof("add: %s", key)
} else {
glog.V(1).Infof("modify: %s", key)
}
if err = replicator.Replicate(context.Background(), key, m); err != nil {
glog.Errorf("replicate %s: %+v", key, err)
if onFailureFn != nil {
onFailureFn()
}
} else {
glog.V(1).Infof("replicated %s", key)
if onSuccessFn != nil {
onSuccessFn()
}
}
}
}
func findSink(config *util.ViperProxy) sink.ReplicationSink {
var dataSink sink.ReplicationSink
for _, sk := range sink.Sinks {
if config.GetBool("sink." + sk.GetName() + ".enabled") {
if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize sink for %s: %+v",
sk.GetName(), err)
}
glog.V(0).Infof("Configure sink to %s", sk.GetName())
dataSink = sk
break
}
}
return dataSink
}
func validateOneEnabledInput(config *util.ViperProxy) {
enabledInput := ""
for _, input := range sub.NotificationInputs {
if config.GetBool("notification." + input.GetName() + ".enabled") {
if enabledInput == "" {
enabledInput = input.GetName()
} else {
glog.Fatalf("Notification input is enabled for both %s and %s", enabledInput, input.GetName())
}
}
}
}