forked from pshima/consul-snapshot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
backup.go
189 lines (156 loc) · 5.11 KB
/
backup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package backup
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
consulapi "github.com/hashicorp/consul/api"
"github.com/pshima/consul-snapshot/config"
"github.com/pshima/consul-snapshot/consul"
"github.com/pshima/consul-snapshot/health"
)
// Backup is the backup itself including configuration and data
type Backup struct {
StartTime int64
JSONData []byte
LocalFileName string
LocalFilePath string
RemoteFilePath string
Config config.Config
Client *consul.Consul
}
// Runner is the main runner for a backup
func Runner(t string) int {
// Start up the http server health checks
go health.StartServer()
conf := config.ParseConfig()
client := &consul.Consul{Client: *consul.Client()}
if t == "test" {
doWork(conf, client, t)
} else {
log.Printf("[DEBUG] Backup starting on interval: %v", conf.BackupInterval)
ticker := time.NewTicker(conf.BackupInterval)
for range ticker.C {
doWork(conf, client, t)
}
}
return 0
}
func doWork(conf config.Config, client *consul.Consul, t string) {
b := &Backup{
Config: conf,
Client: client,
}
// Loop over and over at interval time.
b.StartTime = time.Now().Unix()
if t == "test" {
b.LocalFileName = "acceptancetest.gz"
}
startString := fmt.Sprintf("%v", b.StartTime)
log.Printf("[INFO] Starting Backup At: %s", startString)
log.Print("[INFO] Listing keys from consul")
b.Client.ListKeys()
log.Printf("[INFO] Converting %v keys to JSON", b.Client.KeyDataLen)
b.KeysToJSON()
log.Print("[INFO] Writing Local Backup File")
b.writeBackupLocal()
if t != "test" {
log.Print("[INFO] Writing Backup to Remote File")
b.writeBackupRemote()
} else {
log.Print("[INFO] Skipping remote backup during testing")
}
if t != "test" {
log.Print("[INFO] Running post processing")
b.postProcess()
} else {
log.Print("[INFO] Skipping post processing during testing")
}
log.Print("[INFO] Backup completed successfully")
}
// KeysToJSON used to marshall the data and put it on a Backup object
func (b *Backup) KeysToJSON() {
jsonData, err := json.Marshal(b.Client.KeyData)
if err != nil {
log.Fatalf("[ERR] Could not encode keys to json!: %v", err)
}
b.JSONData = jsonData
}
// Write a local gzipped file in the temporary dir
func (b *Backup) writeBackupLocal() {
// Create a filename with a unix timestamp
startString := fmt.Sprintf("%v", b.StartTime)
filename := fmt.Sprintf("consul.backup.%s.gz", startString)
if b.LocalFileName == "" {
b.LocalFileName = filename
}
b.LocalFilePath = b.Config.TmpDir
filepath := fmt.Sprintf("%v/%v", b.LocalFilePath, b.LocalFileName)
// Write the json to a gzip
handle, err := os.OpenFile(filepath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
log.Fatalf("[ERR] Could not open file for writing!: %v", err)
}
// Create a new gzip writer
gz := gzip.NewWriter(handle)
// Actually write the json to the file
bytesWritten, err := gz.Write([]byte(b.JSONData))
if err != nil {
log.Fatalf("[ERR] Could not write data to file!: %v", err)
}
log.Printf("[DEBUG] Wrote %v bytes to file, %v", bytesWritten, filepath)
// explicitly close the file handles
gz.Close()
handle.Close()
}
// Write the local backup file to S3.
// There are no tests for this remote operation
func (b *Backup) writeBackupRemote() {
s3Conn := session.New(&aws.Config{Region: aws.String(string(b.Config.S3Region))})
filepath := fmt.Sprintf("%v/%v", b.LocalFilePath, b.LocalFileName)
t := time.Unix(b.StartTime, 0)
remotePath := fmt.Sprintf("backups/%v/%d/%v/%v", t.Year(), t.Month(), t.Day(), b.LocalFileName)
b.RemoteFilePath = remotePath
// re-read the compressed file. There is probably a better way to do this
localFileContents, err := ioutil.ReadFile(filepath)
if err != nil {
log.Fatalf("[ERR] Could not read compressed file!: %v", err)
}
// Create the params to pass into the actual uploader
params := &s3manager.UploadInput{
Bucket: &b.Config.S3Bucket,
Key: &b.RemoteFilePath,
Body: bytes.NewReader(localFileContents),
}
log.Printf("[INFO] Uploading %v/%v to S3 in %v", string(b.Config.S3Bucket), b.RemoteFilePath, string(b.Config.S3Region))
uploader := s3manager.NewUploader(s3Conn)
_, err = uploader.Upload(params)
if err != nil {
log.Fatalf("[ERR] Could not upload to S3!: %v", err)
}
}
// Run post processing on the backup, acking the key and removing and temp files.
// There are no tests for the remote operation.
func (b *Backup) postProcess() {
// Mark a key in consul for our last backup time.
writeOpt := &consulapi.WriteOptions{}
startstring := fmt.Sprintf("%v", b.StartTime)
var err error
lastbackup := &consulapi.KVPair{Key: "service/consul-snapshot/lastbackup", Value: []byte(startstring)}
_, err = b.Client.Client.KV().Put(lastbackup, writeOpt)
if err != nil {
log.Fatalf("[ERR] Failed writing last backup timestamp to consul: %v", err)
}
filepath := fmt.Sprintf("%v/%v", b.LocalFilePath, b.LocalFileName)
err = os.Remove(filepath)
if err != nil {
log.Printf("Unable to remove temporary backup file: %v", err)
}
}