Skip to content

Commit

Permalink
feat: add API and command to save etcd snapshot (backup)
Browse files Browse the repository at this point in the history
This adds a simple API and `talosctl etcd snapshot` command to stream
snapshot of etcd from one of the control plane nodes to the local file.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira authored and talos-bot committed Apr 2, 2021
1 parent 61b694b commit e664362
Show file tree
Hide file tree
Showing 10 changed files with 827 additions and 510 deletions.
9 changes: 9 additions & 0 deletions api/machine/machine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ service MachineService {
returns (EtcdLeaveClusterResponse);
rpc EtcdForfeitLeadership(EtcdForfeitLeadershipRequest)
returns (EtcdForfeitLeadershipResponse);

// EtcdSnapshot method creates etcd data snapshot (backup) from the local etcd instance
// and streams it back to the client.
//
// This method is available only on control plane nodes (which run etcd).
rpc EtcdSnapshot(EtcdSnapshotRequest) returns (stream common.Data);

rpc GenerateConfiguration(GenerateConfigurationRequest)
returns (GenerateConfigurationResponse);
rpc Hostname(google.protobuf.Empty) returns (HostnameResponse);
Expand Down Expand Up @@ -762,6 +769,8 @@ message EtcdMemberList {
}
message EtcdMemberListResponse { repeated EtcdMemberList messages = 1; }

message EtcdSnapshotRequest {}

// rpc generateConfiguration

message RouteConfig {
Expand Down
73 changes: 72 additions & 1 deletion cmd/talosctl/cmd/talos/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ package talos

import (
"context"
"crypto/sha256"
"fmt"
"io"
"os"
"strings"
"sync"
"text/tabwriter"

"github.com/spf13/cobra"

"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/talos-systems/talos/pkg/cli"
"github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
Expand Down Expand Up @@ -117,7 +121,74 @@ var etcdMemberListCmd = &cobra.Command{
},
}

var etcdSnapshotCmd = &cobra.Command{
Use: "snapshot <path>",
Short: "Stream snapshot of the etcd node to the path.",
Long: ``,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return WithClient(func(ctx context.Context, c *client.Client) error {
if err := helpers.FailIfMultiNodes(ctx, "etcd snapshot"); err != nil {
return err
}

dbPath := args[0]
partPath := dbPath + ".part"

defer os.RemoveAll(partPath) //nolint:errcheck

dest, err := os.OpenFile(partPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600)
if err != nil {
return fmt.Errorf("error creating temporary file: %w", err)
}

defer dest.Close() //nolint:errcheck

r, errCh, err := c.EtcdSnapshot(ctx, &machine.EtcdSnapshotRequest{})
if err != nil {
return fmt.Errorf("error reading file: %w", err)
}

defer r.Close() //nolint:errcheck

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
for err := range errCh {
fmt.Fprintln(os.Stderr, err.Error())
}
}()

defer wg.Wait()

size, err := io.Copy(dest, r)
if err != nil {
return fmt.Errorf("error reading: %w", err)
}

if err = dest.Sync(); err != nil {
return fmt.Errorf("failed to fsync: %w", err)
}

// this check is from https://github.com/etcd-io/etcd/blob/client/v3.5.0-alpha.0/client/v3/snapshot/v3_snapshot.go#L46
if (size % 512) != sha256.Size {
return fmt.Errorf("sha256 checksum not found (size %d)", size)
}

if err = os.Rename(partPath, dbPath); err != nil {
return fmt.Errorf("error renaming to final location: %w", err)
}

fmt.Printf("etcd snapshot saved to %q (%d bytes)\n", dbPath, size)

return nil
})
},
}

func init() {
etcdCmd.AddCommand(etcdLeaveCmd, etcdForfeitLeadershipCmd, etcdMemberListCmd, etcdMemberRemoveCmd)
etcdCmd.AddCommand(etcdLeaveCmd, etcdForfeitLeadershipCmd, etcdMemberListCmd, etcdMemberRemoveCmd, etcdSnapshotCmd)
addCommand(etcdCmd)
}
1 change: 1 addition & 0 deletions internal/app/apid/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func Main() {
"/machine.MachineService/Copy",
"/machine.MachineService/DiskUsage",
"/machine.MachineService/Dmesg",
"/machine.MachineService/EtcdSnapshot",
"/machine.MachineService/Events",
"/machine.MachineService/Kubeconfig",
"/machine.MachineService/List",
Expand Down
33 changes: 33 additions & 0 deletions internal/app/machined/internal/server/v1alpha1/v1alpha1_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,39 @@ func (s *Server) EtcdForfeitLeadership(ctx context.Context, in *machine.EtcdForf
return reply, nil
}

// EtcdSnapshot implements the machine.MachineServer interface.
func (s *Server) EtcdSnapshot(in *machine.EtcdSnapshotRequest, srv machine.MachineService_EtcdSnapshotServer) error {
client, err := etcd.NewLocalClient()
if err != nil {
return fmt.Errorf("failed to create etcd client: %w", err)
}

//nolint:errcheck
defer client.Close()

rd, err := client.Snapshot(srv.Context())
if err != nil {
return fmt.Errorf("failed reading etcd snapshot: %w", err)
}

ctx, cancel := context.WithCancel(srv.Context())
defer cancel()

chunker := stream.NewChunker(ctx, rd)
chunkCh := chunker.Read()

for data := range chunkCh {
err := srv.SendMsg(&common.Data{Bytes: data})
if err != nil {
cancel()

return err
}
}

return nil
}

// RemoveBootkubeInitializedKey implements machine.MachineService.
//
// Temporary API only used when converting from self-hosted to Talos-managed control plane.
Expand Down
21 changes: 21 additions & 0 deletions internal/integration/cli/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
package cli

import (
"io/ioutil"
"os"
"path/filepath"
"regexp"

"github.com/talos-systems/talos/internal/integration/base"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
)
Expand Down Expand Up @@ -39,6 +44,22 @@ func (suite *EtcdSuite) TestForfeitLeadership() {
)
}

// TestSnapshot tests etcd snapshot (backup).
func (suite *EtcdSuite) TestSnapshot() {
tempDir, err := ioutil.TempDir("", "talos")
suite.Require().NoError(err)

defer func() {
suite.Assert().NoError(os.RemoveAll(tempDir))
}()

dbPath := filepath.Join(tempDir, "snapshot.db")

suite.RunCLI([]string{"etcd", "snapshot", dbPath, "--nodes", suite.RandomDiscoveredNode(machine.TypeControlPlane)},
base.StdoutShouldMatch(regexp.MustCompile(`etcd snapshot saved to .+\d+ bytes.+`)),
)
}

func init() {
allSuites = append(allSuites, new(EtcdSuite))
}
Loading

0 comments on commit e664362

Please sign in to comment.