Skip to content

Commit

Permalink
Adding rollbackcopy subcommand.
Browse files Browse the repository at this point in the history
  • Loading branch information
retroflexer committed Jun 11, 2020
1 parent abe09ec commit 3abeeef
Show file tree
Hide file tree
Showing 9 changed files with 638 additions and 0 deletions.
28 changes: 28 additions & 0 deletions bindata/etcd/pod.yaml
Expand Up @@ -233,11 +233,39 @@ ${COMPUTED_ENV_VARS}
name: cert-dir
- mountPath: /var/lib/etcd/
name: data-dir
- name: rollbackcopier
image: ${OPERATOR_IMAGE}
imagePullPolicy: IfNotPresent
terminationMessagePolicy: FallbackToLogsOnError
command:
- /bin/sh
- -c
- |
#!/bin/sh
set -euo pipefail
export ETCD_NAME=${NODE_NODE_ENVVAR_NAME_ETCD_NAME}
exec cluster-etcd-operator rollbackcopy
resources:
requests:
memory: 60Mi
cpu: 30m
securityContext:
privileged: true
volumeMounts:
- mountPath: /etc/kubernetes
name: etc-kubernetes
env:
${COMPUTED_ENV_VARS}
hostNetwork: true
priorityClassName: system-node-critical
tolerations:
- operator: "Exists"
volumes:
- hostPath:
path: /etc/kubernetes
name: etc-kubernetes
- hostPath:
path: /etc/kubernetes/manifests
name: static-pod-dir
Expand Down
2 changes: 2 additions & 0 deletions cmd/cluster-etcd-operator/main.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/openshift/cluster-etcd-operator/pkg/cmd/mount"
operatorcmd "github.com/openshift/cluster-etcd-operator/pkg/cmd/operator"
"github.com/openshift/cluster-etcd-operator/pkg/cmd/render"
"github.com/openshift/cluster-etcd-operator/pkg/cmd/rollbackcopy"
"github.com/openshift/cluster-etcd-operator/pkg/cmd/staticpodcontroller"
"github.com/openshift/cluster-etcd-operator/pkg/cmd/staticsynccontroller"
"github.com/openshift/cluster-etcd-operator/pkg/cmd/waitforceo"
Expand Down Expand Up @@ -65,6 +66,7 @@ func NewSSCSCommand() *cobra.Command {
cmd.AddCommand(staticsynccontroller.NewStaticSyncCommand(os.Stderr))
cmd.AddCommand(staticpodcontroller.NewStaticPodCommand(os.Stderr))
cmd.AddCommand(mount.NewMountCommand(os.Stderr))
cmd.AddCommand(rollbackcopy.NewRollbackCopy(os.Stderr))
cmd.AddCommand(waitforceo.NewWaitForCeoCommand(os.Stderr))
cmd.AddCommand(waitforkube.NewWaitForKubeCommand(os.Stderr))

Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -19,6 +19,7 @@ require (
github.com/vincent-petithory/dataurl v0.0.0-20191104211930-d1553a71de50
go.etcd.io/etcd v0.0.0-20200401174654-e694b7bb0875
google.golang.org/grpc v1.26.0
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.18.3
k8s.io/apimachinery v0.18.3
k8s.io/client-go v0.18.3
Expand Down
102 changes: 102 additions & 0 deletions pkg/cmd/rollbackcopy/backuputils.go
@@ -0,0 +1,102 @@
package rollbackcopy

import (
"fmt"
"log"
"os"
"path/filepath"
"time"
)

//This backup mimics the functionality of cluster-backup.sh

var backupResourcePodList = []string{
"kube-apiserver-pod",
"kube-controller-manager-pod",
"kube-scheduler-pod",
"etcd-pod",
}

func archiveLatestResources(configDir, backupFile string) error {
log.Println("In backup, backupFile is", backupFile)

paths := []string{}
for _, podName := range backupResourcePodList {
latestPod, err := findTheLatestRevision(filepath.Join(configDir, "static-pod-resources"), podName)
if err != nil {
return err
}
paths = append(paths, latestPod)
log.Println("The latest revision for podName", podName, " ", latestPod)
}
log.Println("Paths=", paths)

err := createTarball(backupFile, paths, configDir)
if err != nil {
log.Println("Got error creating tar", err)
return err
}
return nil
}

func backup(configDir string) error {

if upgradeInProgress() {
return fmt.Errorf("Upgrade is in progress. Skipping backup!")
}

tmpBackupDir := filepath.Join(configDir, "rollbackcopy", "tmp")
defer os.RemoveAll(tmpBackupDir)

if err := checkAndCreateDir(tmpBackupDir); err != nil {
return err
}

// Trying to match the output file formats with the formats of the current cluster-backup.sh script
dateString := time.Now().Format("2006-01-02_150405")
outputArchive := "static_kuberesources_" + dateString + ".tar.gz"
snapshotOutFile := "snapshot_" + dateString + ".db"

// Save snapshot
if err := SaveSnapshot(filepath.Join(tmpBackupDir, snapshotOutFile)); err != nil {
return err
}

// Save the corresponding static pod resources
if err := archiveLatestResources(configDir, filepath.Join(tmpBackupDir, outputArchive)); err != nil {
return err
}

// Write the version
version := backupVersion{os.Getenv("OPERATOR_IMAGE_VERSION")}
putVersion(&version, tmpBackupDir)

latestDir := filepath.Join(configDir, "rollbackcopy", "latest")

if versionChanged(latestDir, tmpBackupDir) {
fmt.Println("Version changed")
prevDir := filepath.Join(configDir, "rollbackcopy", "prev")
os.RemoveAll(prevDir)
os.Rename(latestDir, prevDir)
os.Rename(tmpBackupDir, latestDir)
} else {
os.RemoveAll(latestDir)
os.Rename(tmpBackupDir, latestDir)
}

log.Println("Backed up!")
return nil
}

func upgradeInProgress() bool {
return false
}

func checkLeadership(name string) bool {
flag, err := IsLeader(name)
if err != nil {
log.Print("Failed to check leadership", err)
return false
}
return flag
}
117 changes: 117 additions & 0 deletions pkg/cmd/rollbackcopy/etcdclientutils.go
@@ -0,0 +1,117 @@
package rollbackcopy

import (
"context"
"fmt"
"google.golang.org/grpc"
"io"
"k8s.io/klog"
"log"
"os"
"time"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/pkg/transport"
)

func getEtcdClient(endpoints []string) (*clientv3.Client, error) {
dialOptions := []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
}

tlsInfo := transport.TLSInfo{
CertFile: os.Getenv("ETCDCTL_CERT"),
KeyFile: os.Getenv("ETCDCTL_KEY"),
TrustedCAFile: os.Getenv("ETCDCTL_CACERT"),
}
tlsConfig, err := tlsInfo.ClientConfig()

cfg := &clientv3.Config{
DialOptions: dialOptions,
Endpoints: endpoints,
DialTimeout: 2 * time.Second,
TLS: tlsConfig,
}

cli, err := clientv3.New(*cfg)
if err != nil {
return nil, fmt.Errorf("failed to make etcd client for endpoints %v: %w", endpoints, err)
}
return cli, err
}

func IsLeader(name string) (bool, error) {
cli, err := getEtcdClient([]string{"localhost:2379"})
if err != nil {
return false, err
}
defer cli.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

membersResp, err := cli.MemberList(ctx)
if err != nil {
return false, err
}
for _, member := range membersResp.Members {
if member.Name != name {
continue
}
if len(member.ClientURLs) == 0 && member.Name == "" {
return false, fmt.Errorf("EtcdMemberNotStarted")
}

resp, err := cli.Status(ctx, member.ClientURLs[0])
if err != nil {
klog.Errorf("error getting etcd member %s status: %#v", member.Name, err)
return false, err
}
return resp.Header.MemberId == resp.Leader, nil
}
return false, fmt.Errorf("EtcdMemberStatusUnknown")
}

func SaveSnapshot(dbPath string) error {
cli, err := getEtcdClient([]string{"localhost:2379"})
if err != nil {
return err
}
defer cli.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

partpath := dbPath + ".part"
defer os.RemoveAll(partpath)

var f *os.File
f, err = os.OpenFile(partpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return fmt.Errorf("could not open %s (%v)", partpath, err)
}

now := time.Now()
var rd io.ReadCloser
rd, err = cli.Snapshot(ctx)
if err != nil {
return err
}

if _, err := io.Copy(f, rd); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
log.Println(
"fetched snapshot",
"took", time.Since(now),
)

if err := os.Rename(partpath, dbPath); err != nil {
return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err)
}
log.Println("saved snapshot to path", dbPath)
return nil
}

0 comments on commit 3abeeef

Please sign in to comment.