Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
retroflexer
committed
Jun 11, 2020
1 parent
abe09ec
commit 1a28818
Showing
9 changed files
with
658 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package rollbackcopy | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
"os" | ||
"path/filepath" | ||
"time" | ||
) | ||
|
||
//This backup mimics the functionality of cluster-backup.sh | ||
|
||
var backupResourcePodList = []string{ | ||
"kube-apiserver-pod", | ||
"kube-controller-manager-pod", | ||
"kube-scheduler-pod", | ||
"etcd-pod", | ||
} | ||
|
||
func archiveLatestResources(configDir, backupFile string) error { | ||
log.Println("In backup, backupFile is", backupFile) | ||
|
||
paths := []string{} | ||
for _, podName := range backupResourcePodList { | ||
latestPod, err := findTheLatestRevision(filepath.Join(configDir, "static-pod-resources"), podName) | ||
if err != nil { | ||
return err | ||
} | ||
paths = append(paths, latestPod) | ||
log.Println("The latest revision for podName", podName, " ", latestPod) | ||
} | ||
log.Println("Paths=", paths) | ||
|
||
err := createTarball(backupFile, paths, configDir) | ||
if err != nil { | ||
log.Println("Got error creating tar", err) | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func backup(configDir string) error { | ||
|
||
if upgradeInProgress() { | ||
return fmt.Errorf("Upgrade is in progress. Skipping backup!") | ||
} | ||
|
||
tmpBackupDir := filepath.Join(configDir, "rollbackcopy", "tmp") | ||
defer os.RemoveAll(tmpBackupDir) | ||
|
||
if err := checkAndCreateDir(tmpBackupDir); err != nil { | ||
return err | ||
} | ||
|
||
// Trying to match the output file formats with the formats of the current cluster-backup.sh script | ||
dateString := time.Now().Format("2006-01-02_150405") | ||
outputArchive := "static_kuberesources_" + dateString + ".tar.gz" | ||
snapshotOutFile := "snapshot_" + dateString + ".db" | ||
|
||
// Save snapshot | ||
if err := SaveSnapshot(filepath.Join(tmpBackupDir, snapshotOutFile)); err != nil { | ||
return err | ||
} | ||
|
||
// Save the corresponding static pod resources | ||
if err := archiveLatestResources(configDir, filepath.Join(tmpBackupDir, outputArchive)); err != nil { | ||
return err | ||
} | ||
|
||
// Write the version | ||
version := backupVersion{os.Getenv("OPERATOR_IMAGE_VERSION")} | ||
putVersion(&version, tmpBackupDir) | ||
|
||
latestDir := filepath.Join(configDir, "rollbackcopy", "latest") | ||
|
||
if versionChanged(latestDir, tmpBackupDir) { | ||
fmt.Println("Version changed") | ||
prevDir := filepath.Join(configDir, "rollbackcopy", "prev") | ||
os.RemoveAll(prevDir) | ||
os.Rename(latestDir, prevDir) | ||
os.Rename(tmpBackupDir, latestDir) | ||
} else { | ||
os.RemoveAll(latestDir) | ||
os.Rename(tmpBackupDir, latestDir) | ||
} | ||
|
||
log.Println("Backed up!") | ||
return nil | ||
} | ||
|
||
func upgradeInProgress() bool { | ||
return false | ||
} | ||
|
||
func checkLeadership(name string) bool { | ||
flag, err := IsLeader(name) | ||
if err != nil { | ||
log.Print("Failed to check leadership", err) | ||
return false | ||
} | ||
return flag | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
package rollbackcopy | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"google.golang.org/grpc" | ||
"io" | ||
"k8s.io/klog" | ||
"log" | ||
"os" | ||
"time" | ||
|
||
"go.etcd.io/etcd/clientv3" | ||
"go.etcd.io/etcd/pkg/transport" | ||
) | ||
|
||
func getEtcdClient(endpoints []string) (*clientv3.Client, error) { | ||
dialOptions := []grpc.DialOption{ | ||
grpc.WithBlock(), // block until the underlying connection is up | ||
} | ||
|
||
tlsInfo := transport.TLSInfo{ | ||
CertFile: os.Getenv("ETCDCTL_CERT"), | ||
KeyFile: os.Getenv("ETCDCTL_KEY"), | ||
TrustedCAFile: os.Getenv("ETCDCTL_CACERT"), | ||
} | ||
tlsConfig, err := tlsInfo.ClientConfig() | ||
|
||
cfg := &clientv3.Config{ | ||
DialOptions: dialOptions, | ||
Endpoints: endpoints, | ||
DialTimeout: 2 * time.Second, | ||
TLS: tlsConfig, | ||
} | ||
|
||
cli, err := clientv3.New(*cfg) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to make etcd client for endpoints %v: %w", endpoints, err) | ||
} | ||
return cli, err | ||
} | ||
|
||
func IsLeader(name string) (bool, error) { | ||
cli, err := getEtcdClient([]string{"localhost:2379"}) | ||
if err != nil { | ||
return false, err | ||
} | ||
defer cli.Close() | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
membersResp, err := cli.MemberList(ctx) | ||
if err != nil { | ||
return false, err | ||
} | ||
for _, member := range membersResp.Members { | ||
if member.Name != name { | ||
continue | ||
} | ||
if len(member.ClientURLs) == 0 && member.Name == "" { | ||
return false, fmt.Errorf("EtcdMemberNotStarted") | ||
} | ||
|
||
resp, err := cli.Status(ctx, member.ClientURLs[0]) | ||
if err != nil { | ||
klog.Errorf("error getting etcd member %s status: %#v", member.Name, err) | ||
return false, err | ||
} | ||
return resp.Header.MemberId == resp.Leader, nil | ||
} | ||
return false, fmt.Errorf("EtcdMemberStatusUnknown") | ||
} | ||
|
||
func SaveSnapshot(dbPath string) error { | ||
cli, err := getEtcdClient([]string{"localhost:2379"}) | ||
if err != nil { | ||
return err | ||
} | ||
defer cli.Close() | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
partpath := dbPath + ".part" | ||
defer os.RemoveAll(partpath) | ||
|
||
var f *os.File | ||
f, err = os.OpenFile(partpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) | ||
if err != nil { | ||
return fmt.Errorf("could not open %s (%v)", partpath, err) | ||
} | ||
|
||
now := time.Now() | ||
var rd io.ReadCloser | ||
rd, err = cli.Snapshot(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if _, err := io.Copy(f, rd); err != nil { | ||
return err | ||
} | ||
if err := f.Close(); err != nil { | ||
return err | ||
} | ||
log.Println( | ||
"fetched snapshot", | ||
"took", time.Since(now), | ||
) | ||
|
||
if err := os.Rename(partpath, dbPath); err != nil { | ||
return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err) | ||
} | ||
log.Println("saved snapshot to path", dbPath) | ||
return nil | ||
} |
Oops, something went wrong.