Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup node.json with metastore backup #5467

Merged
merged 1 commit into from Jan 28, 2016
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -1,6 +1,7 @@
package backup

import (
"encoding/binary"
"encoding/json"
"errors"
"flag"
@@ -14,7 +15,6 @@ import (
"strings"
"time"

"github.com/influxdb/influxdb/services/meta"
"github.com/influxdb/influxdb/services/snapshotter"
"github.com/influxdb/influxdb/tcp"
)
@@ -228,15 +228,17 @@ func (cmd *Command) backupMetastore() error {
}

return cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) error {
var data meta.Data
binData, err := ioutil.ReadFile(file)
if err != nil {
return err
}
if data.UnmarshalBinary(binData) != nil {

magic := btou64(binData[:8])
if magic != snapshotter.BackupMagicHeader {
cmd.Logger.Println("Invalid metadata blob, ensure the metadata service is running (default port 8088)")
return errors.New("invalid metadata received")
}

return nil
})
}
@@ -363,3 +365,7 @@ func retentionAndShardFromPath(path string) (retention, shard string, err error)

return a[1], a[2], nil
}

func btou64(b []byte) uint64 {
return binary.BigEndian.Uint64(b)
}
@@ -3,6 +3,7 @@ package restore
import (
"archive/tar"
"bytes"
"encoding/binary"
"errors"
"flag"
"fmt"
@@ -17,6 +18,7 @@ import (

"github.com/influxdb/influxdb/cmd/influxd/backup"
"github.com/influxdb/influxdb/services/meta"
"github.com/influxdb/influxdb/services/snapshotter"
)

// Command represents the program execution for "influxd restore".
@@ -153,9 +155,30 @@ func (cmd *Command) unpackMeta() error {
return fmt.Errorf("copy: %s", err)
}

b := buf.Bytes()
var i int

// Make sure the file is actually a meta store backup file
magic := btou64(b[:8])
if magic != snapshotter.BackupMagicHeader {
return fmt.Errorf("invalid metadata file")
}
i += 8

// Size of the meta store bytes
length := int(btou64(b[i : i+8]))
i += 8
metaBytes := b[i : i+length]
i += int(length)

// Size of the node.json bytes
length = int(btou64(b[i : i+8]))
i += 8
nodeBytes := b[i:]

// Unpack into metadata.
var data meta.Data
if err := data.UnmarshalBinary(buf.Bytes()); err != nil {
if err := data.UnmarshalBinary(metaBytes); err != nil {
return fmt.Errorf("unmarshal: %s", err)
}

@@ -164,6 +187,16 @@ func (cmd *Command) unpackMeta() error {
c.JoinPeers = nil
c.LoggingEnabled = false

// Create the meta dir
if os.MkdirAll(c.Dir, 0700); err != nil {
return err
}

// Write node.json back to meta dir
if err := ioutil.WriteFile(filepath.Join(c.Dir, "node.json"), nodeBytes, 0655); err != nil {
return err
}

// Initialize meta store.
store := meta.NewService(c)
store.RaftListener = newNopListener()
@@ -366,3 +399,14 @@ func (ln *nopListener) Close() error {
}

func (ln *nopListener) Addr() net.Addr { return &net.TCPAddr{} }

// u64tob converts a uint64 into an 8-byte slice.
func u64tob(v uint64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, v)
return b
}

func btou64(b []byte) uint64 {
return binary.BigEndian.Uint64(b)
}
@@ -4,6 +4,7 @@ import (
"io/ioutil"
"net"
"os"
"path/filepath"
"testing"
"time"

@@ -75,6 +76,15 @@ func TestServer_BackupAndRestore(t *testing.T) {
t.Fatalf("error restoring: %s", err.Error())
}

// Make sure node.json was restored
nodePath := filepath.Join(config.Meta.Dir, "node.json")
if _, err := os.Stat(nodePath); err != nil || os.IsNotExist(err) {
t.Fatalf("node.json should exist")
}
// Need to remove the restored node.json because the metaservers it points to are to the ephemeral ports
// of itself. Opening the server below will hang indefinitely if we try to use them.
os.RemoveAll(nodePath)

// now open it up and verify we're good
s := OpenServer(config, "")
defer s.Close()
@@ -230,6 +230,7 @@ func (s *Server) appendSnapshotterService() {
srv := snapshotter.NewService()
srv.TSDBStore = s.TSDBStore
srv.MetaClient = s.MetaClient
srv.Node = s.Node
s.Services = append(s.Services, srv)
s.SnapshotterService = srv
}
@@ -1,7 +1,9 @@
package snapshotter

import (
"bytes"
"encoding"
"encoding/binary"
"encoding/json"
"fmt"
"log"
@@ -16,14 +18,22 @@ import (
"github.com/influxdb/influxdb/tsdb"
)

// MuxHeader is the header byte used for the TCP muxer.
const MuxHeader = 3
const (
// MuxHeader is the header byte used for the TCP muxer.
MuxHeader = 3

// BackupMagicHeader is the first 8 bytes used to identify and validate
// a metastore backup file
BackupMagicHeader = 0x59590101

This comment has been minimized.

Copy link
@joelegasse

joelegasse Jan 28, 2016

Contributor

I know this is already merged, but the 8 byte value of this is 0x0000000059590101. Did you intend to have this be eight non-zero bytes. Or are the four leading zero bytes intentional? Also, since this is intended to be a fixed size, would having BackupMagicHeader be a typed constant of a fixed size be better? Either [8]byte or uint64?

)

// Service manages the listener for the snapshot endpoint.
type Service struct {
wg sync.WaitGroup
err chan error

Node *influxdb.Node

MetaClient interface {
encoding.BinaryMarshaler
Database(name string) (*meta.DatabaseInfo, error)
@@ -109,12 +119,7 @@ func (s *Service) handleConn(conn net.Conn) error {
return err
}
case RequestMetastoreBackup:
// Retrieve and serialize the current meta data.
buf, err := s.MetaClient.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal meta: %s", err)
}
if _, err := conn.Write(buf); err != nil {
if err := s.writeMetaStore(conn); err != nil {
return err
}
case RequestDatabaseInfo:
@@ -128,6 +133,41 @@ func (s *Service) handleConn(conn net.Conn) error {
return nil
}

func (s *Service) writeMetaStore(conn net.Conn) error {
// Retrieve and serialize the current meta data.
buf, err := s.MetaClient.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal meta: %s", err)
}

var b bytes.Buffer
enc := json.NewEncoder(&b)
if err := enc.Encode(s.Node); err != nil {
return err
}

if _, err := conn.Write(u64tob(BackupMagicHeader)); err != nil {
return err
}

if _, err := conn.Write(u64tob(uint64(len(buf)))); err != nil {
return err
}

if _, err := conn.Write(buf); err != nil {
return err
}

if _, err := conn.Write(u64tob(uint64(b.Len()))); err != nil {
return err
}

if _, err := b.WriteTo(conn); err != nil {
return err
}
return nil
}

// writeDatabaseInfo will write the relative paths of all shards in the database on
// this server into the connection
func (s *Service) writeDatabaseInfo(conn net.Conn, database string) error {
@@ -247,3 +287,14 @@ type Request struct {
type Response struct {
Paths []string
}

// u64tob converts a uint64 into an 8-byte slice.
func u64tob(v uint64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, v)
return b
}

func btou64(b []byte) uint64 {
return binary.BigEndian.Uint64(b)
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.