-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.go
191 lines (172 loc) · 7.75 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package main
import (
"context"
"time"
elastic "github.com/olivere/elastic/v7" // Elasticsearch client
"go.uber.org/zap" // Logging
kingpin "gopkg.in/alecthomas/kingpin.v2" // Command line args parser
"github.com/mintel/elasticsearch-asg/cmd" // Common logging setup func
)
// SnapshotFormat is the format for snapshot names (time.Time.Format()).
// Elasticsearch snapshot names may not contain spaces.
const SnapshotFormat = "2006-01-02-15-04-05"
// defaultURL is the default Elasticsearch URL.
const defaultURL = "http://localhost:9200"
// Command line opts
var (
esURL = kingpin.Arg("url", "Elasticsearch URL. Default: "+defaultURL).Default(defaultURL).URL()
windows = kingpin.Flag("window", "Snapshot frequency + TTL. May be set multiple times. ISO 8601 Duration string format. Example: `--window P1M=PT1H` == keep hourly snapshots for 1 month.").PlaceHolder("P1M=PT1H").Required().StringMap()
delete = kingpin.Flag("delete", "If set, clean up old snapshots. This is false by default for safety's sake.").Short('d').Bool()
repoName = kingpin.Flag("repo", "Name of the snapshot repository.").Default("backups").String()
repoType = kingpin.Flag("type", "If set, create a repository of this type before creating snapshots. See also: '--settings'").String()
repoSettings = kingpin.Flag("settings", "Use these settings creating the snapshot repository. May be set multiple times. Example: `--type=s3 --settings bucket=my_bucket`").StringMap()
)
var logger *zap.Logger // XXX: I don't like a global logger var like this. Refactor to derive logger from context.
func main() {
kingpin.CommandLine.Help = "Create and clean up Elasticsearch snapshots on a schedule."
kingpin.Parse()
// Deference global repoName flag pointer to local variable.
repoName := *repoName
// Set up logger.
logger = cmd.SetupLogging().With(zap.String("snapshot_repository", repoName))
defer func() {
// Make sure any buffered logs get flushed before exiting successfully.
// This should never happen because snapshooter should never exit successfully,
// but just in case...
// Subsequent calls to loger.Fatal() perform their own Sync().
// See: https://github.com/uber-go/zap/blob/master/FAQ.md#why-include-dedicated-panic-and-fatal-log-levels
// Do this inside a closure func so that the linter will stop complaining
// about not checking the error output of Sync().
_ = logger.Sync()
}()
// Parse the snapshot schedule.
snapshotSchedule := make(SnapshotWindows, 0)
for keepFor, every := range *windows {
w, err := NewSnapshotWindow(every, keepFor)
if err != nil {
logger.Fatal("error parsing snapshot window",
zap.String("keepFor", keepFor),
zap.String("every", every),
zap.Error(err),
)
}
snapshotSchedule = append(snapshotSchedule, w)
}
ctx := context.Background()
// Craete Elasticsearch client.
client, err := elastic.DialContext(ctx, elastic.SetURL((*esURL).String()))
if err != nil {
logger.Fatal("error creating Elasticsearch client", zap.Error(err))
}
// If --type/--settings flags are set, create the snapshot repository if it doesn't exist.
if repoType != nil && *repoType != "" {
if err := ensureSnapshotRepo(ctx, client, *repoType, repoName, *repoSettings); err != nil {
logger.Fatal("error ensuring snapshot repository exists", zap.Error(err))
}
}
for nextSnapshot := snapshotSchedule.Next(); ; nextSnapshot = snapshotSchedule.Next() {
time.Sleep(time.Until(nextSnapshot)) // Wait to start the snapshot
// Start a goroutine to create/delete snapshots.
// Accoring to https://www.elastic.co/guide/en/elasticsearch/reference/7.0/modules-snapshots.html
// Only one snapshot process can be executed in the cluster at any time.
// While snapshot of a particular shard is being created this shard cannot be moved to another node,
// which can interfere with rebalancing process and allocation filtering.
// Elasticsearch will only be able to move a shard to another node
// (according to the current allocation filtering settings and rebalancing algorithm)
// once the snapshot is finished.
// If this goroutine doesn't finish by the time the next one is started,
// Elasticsearch will probably return an error and snapshooter will exit.
go func(t time.Time) {
logger.Debug("starting snapshot create/delete goroutine")
if err := createSnapshot(ctx, client, repoName, t); err != nil {
logger.Fatal("error while creating new snapshot", zap.Error(err))
}
if !*delete {
return // If the --delete flag isn't set, don't clean up old snapshots.
}
if err := deleteOldSnapshots(ctx, client, repoName, snapshotSchedule); err != nil {
logger.Fatal("error while deleting old snapshots", zap.Error(err))
}
}(nextSnapshot)
}
}
// ensureSnapshotRepo ensures an Elasticsearch snapshot repository with the given type, name, and settings exists.
//
// If a repository with name doesn't exist, it will be created.
// If a repository with name does exist but is the wrong type, an error will be returned.
func ensureSnapshotRepo(ctx context.Context, client *elastic.Client, rType, name string, settings map[string]string) error {
resp, err := client.SnapshotGetRepository(name).Repository(name).Do(context.Background())
if err != nil && !elastic.IsNotFound(err) {
// Unexpected error while checking if snapshot repository exists.
logger.Error("error checking for existing snapshot repository", zap.Error(err))
return err
} else if repo, ok := resp[name]; elastic.IsNotFound(err) || !ok {
// Snapshot repository doesn't exist. Create it.
s := client.SnapshotCreateRepository(name).Type(rType)
for k, v := range settings {
s = s.Setting(k, v)
}
if _, err = s.Do(context.Background()); err != nil {
logger.Error("error creating snapshot repository", zap.Error(err))
return err
}
} else if ok && repo.Type != rType {
// Snapshot repository exists, but is of the wrong type e.g. fs != s3.
logger.Error(
"snapshot repository exists, but is the wrong type",
zap.String("want_type", rType),
zap.String("got_type", repo.Type),
)
return err
}
return nil
}
// createSnapshot creates a new Elasticsearch snapshot for the given time.
//
// If now is more than one second greater or less than time.Now(), this func will panic.
func createSnapshot(ctx context.Context, client *elastic.Client, repoName string, now time.Time) error {
// Sanity-check now: it should be pretty close to time.Now()
if d := time.Since(now); -time.Second < d && d < time.Second {
panic("now is not within one second of the current time")
}
snapshotName := now.Format(SnapshotFormat)
logger.Info("creating snapshot", zap.String("snapshot", snapshotName))
_, err := client.SnapshotCreate(repoName, snapshotName).WaitForCompletion(true).Do(ctx)
if err != nil {
logger.Error("error creating snapshot",
zap.String("snapshot", snapshotName),
zap.Error(err),
)
return err
}
return nil
}
// deleteOldSnapshots deletes Elaticsearch snapshots if they don't match schedule.
func deleteOldSnapshots(ctx context.Context, client *elastic.Client, repoName string, schedule SnapshotWindows) error {
resp, err := client.SnapshotGet(repoName).Do(ctx)
if err != nil {
logger.Fatal("error getting existing snapshots", zap.Error(err))
return err
}
for _, s := range resp.Snapshots {
t, err := time.Parse(SnapshotFormat, s.Snapshot)
if err != nil {
logger.Fatal("error parsing time from snapshot name",
zap.String("snapshot", s.Snapshot),
zap.Error(err),
)
return err
}
if !schedule.Keep(t) {
logger.Info("deleting snapshot", zap.String("snapshot", s.Snapshot))
if _, err := client.SnapshotDelete(repoName, s.Snapshot).Do(ctx); err != nil {
logger.Fatal("error deleting old snapshot",
zap.String("snapshot", s.Snapshot),
zap.Error(err),
)
return err
}
}
}
return nil
}