From a40e6c771fd566d092e58a6f1940161e76f959bc Mon Sep 17 00:00:00 2001 From: Alexandre Lamarre Date: Fri, 24 Nov 2023 17:28:35 -0500 Subject: [PATCH 1/4] simple snapshot manager implementation --- pkg/snapshot/manager.go | 479 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 479 insertions(+) create mode 100644 pkg/snapshot/manager.go diff --git a/pkg/snapshot/manager.go b/pkg/snapshot/manager.go new file mode 100644 index 000000000..138a27722 --- /dev/null +++ b/pkg/snapshot/manager.go @@ -0,0 +1,479 @@ +package snapshot + +import ( + "archive/zip" + "context" + "crypto/tls" + "crypto/x509" + "encoding/base64" + "encoding/pem" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "path" + "path/filepath" + "strings" + "time" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + + "emperror.dev/errors" + "github.com/rancher/opni/pkg/logger" +) + +const ( + maxConcurrentSnapshots = 1 + compressedExtension = ".zip" + metadataDir = ".metadata" +) + +const ( + successfulSnapshotStatus SnapshotStatus = "successful" + failedSnapshotStatus SnapshotStatus = "failed" +) + +type SnapshotStatus string + +type SnapshotConfig struct { + SnapshotDir string + SnapshotName string + + DataDir string + + Retention int + + Compression *CompressionConfig + S3 *S3Config +} + +func (s *SnapshotConfig) compressionEnabled() bool { + return s.Compression != nil +} + +func (s *SnapshotConfig) s3Enabled() bool { + return s.S3 != nil +} + +type S3Config struct { + Endpoint string `json:"-"` + EndpointCA string `json:"-"` + SkipSSLVerify bool `json:"-"` + AccessKey string `json:"-"` + SecretKey string `json:"-"` + BucketName string `json:"-"` + Region string `json:"-"` + Folder string `json:"-"` + Timeout time.Duration `json:"-"` + Insecure bool `json:"-"` +} + +type CompressionConfig struct { + Type string +} + +type AbstractSnapshotter interface { + Save(ctx context.Context, snapshotPath string) error +} + +type AbstractSnapshotManager interface { + Save(ctx context.Context) error + List(ctx context.Context) error +} + +type SnapshotManager struct { + lg *slog.Logger + config SnapshotConfig + impl AbstractSnapshotter +} + +var _ AbstractSnapshotManager = (*SnapshotManager)(nil) + +func NewSnapshotManager( + impl AbstractSnapshotter, +) *SnapshotManager { + return &SnapshotManager{ + impl: impl, + } +} + +func (s *SnapshotManager) List(ctx context.Context) error { + return nil +} + +func (s *SnapshotManager) Save(ctx context.Context) error { + nodeName := "TODO" + now := time.Now().Round(time.Second) + snapshotName := fmt.Sprintf( + "%s-%s-%d", + s.config.SnapshotName, + nodeName, + now.Unix(), + ) + snapshotDir, err := s.snapshotDir(true) + if err != nil { + return err + } + snapshotPath := filepath.Join(snapshotDir, snapshotName) + var sf *snapshotFile + if err := s.impl.Save(ctx, snapshotPath); err != nil { + sf = &snapshotFile{ + Name: snapshotName, + Location: "", + CreatedAt: now, + Status: failedSnapshotStatus, + Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), + Size: 0, + // metadataSource: extraMetadata, + } + } + + if sf == nil { + if s.config.compressionEnabled() { + zipPath, err := s.compressSnapshot( + s.config.SnapshotDir, snapshotName, snapshotPath, now, + ) + if err != nil { + return errors.Wrap(err, "failed to compress etcd snapshot") + } + snapshotPath = zipPath + s.lg.Info("compressed snapshot") + } + f, err := os.Stat(snapshotPath) + if err != nil { + return errors.Wrap(err, "unable to retrieve snapshot information from local snapshot") + } + sf = &snapshotFile{ + Name: f.Name(), + Location: "file://" + snapshotPath, + CreatedAt: now, + Status: successfulSnapshotStatus, + Size: f.Size(), + Compressed: s.config.compressionEnabled(), + } + + // TODO : persist snapshot metadata file locally + + // TODO : check/prune retention limits + if err := s.retention(s.config.Retention, s.config.SnapshotName, s.config.SnapshotDir); err != nil { + return errors.Wrap(err, "failed to apply local snapshot retention policy") + } + + if s.config.s3Enabled() { + // TODO : init client here + sf = &snapshotFile{ + Name: filepath.Base(snapshotPath), + CreatedAt: now, + Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), + Size: 0, + Status: failedSnapshotStatus, + S3: &s3Config{ + Endpoint: s.config.S3.Endpoint, + EndpointCA: s.config.S3.EndpointCA, + SkipSSLVerify: s.config.S3.SkipSSLVerify, + Bucket: s.config.S3.BucketName, + Region: s.config.S3.Region, + Folder: s.config.S3.Folder, + Insecure: s.config.S3.Insecure, + }, + } + // if init succeeds here, try upload + sf, err := s.uploadS3(ctx, snapshotPath, now) + if err != nil { + s.lg.With(logger.Err(err)).Error( + "Error received during snapshot upload %s", err) + } else { + s.lg.Info("S3 upload complete") + } + + if err := s.retentionS3(); err != nil { + s.lg.With(logger.Err(err)).Error( + "failed to apply s3 snapshot retention policy", + ) + } + + // TODO : persist metadata file locally + fmt.Println(sf) + // either it is snapshot md or s3 failure record + } + } + return nil +} + +// snapshotDir ensures that the snapshot directory exists, and then returns its path. +func (s *SnapshotManager) snapshotDir(create bool) (string, error) { + if s.config.SnapshotDir == "" { + // we have to create the snapshot dir if we are using + // the default snapshot dir if it doesn't exist + defaultSnapshotDir := filepath.Join(s.config.DataDir, "db", "snapshots") + s, err := os.Stat(defaultSnapshotDir) + if err != nil { + if create && os.IsNotExist(err) { + if err := os.MkdirAll(defaultSnapshotDir, 0700); err != nil { + return "", err + } + return defaultSnapshotDir, nil + } + return "", err + } + if s.IsDir() { + return defaultSnapshotDir, nil + } + } + return s.config.SnapshotDir, nil +} + +// isValidCertificate checks to see if the given +// byte slice is a valid x509 certificate. +func isValidCertificate(c []byte) bool { + p, _ := pem.Decode(c) + if p == nil { + return false + } + if _, err := x509.ParseCertificates(p.Bytes); err != nil { + return false + } + return true +} + +func setTransportCA(tr http.RoundTripper, endpointCA string, insecureSkipVerify bool) (http.RoundTripper, error) { + // TODO : we probably don't need this + ca, err := readS3EndpointCA(endpointCA) + if err != nil { + return tr, err + } + if !isValidCertificate(ca) { + return tr, errors.New("endpoint-ca is not a valid x509 certificate") + } + + certPool := x509.NewCertPool() + certPool.AppendCertsFromPEM(ca) + + tr.(*http.Transport).TLSClientConfig = &tls.Config{ + RootCAs: certPool, + InsecureSkipVerify: insecureSkipVerify, + } + + return tr, nil +} + +func readS3EndpointCA(endpointCA string) ([]byte, error) { + ca, err := base64.StdEncoding.DecodeString(endpointCA) + if err != nil { + return os.ReadFile(endpointCA) + } + return ca, nil +} + +func (s *SnapshotManager) initS3Client() (*minio.Client, error) { + if s.config.S3.BucketName == "" { + return nil, errors.New("s3 bucket name was not set") + } + tr := http.DefaultTransport + + switch { + case s.config.S3.EndpointCA != "": + trCA, err := setTransportCA(tr, s.config.S3.EndpointCA, s.config.S3.SkipSSLVerify) + if err != nil { + return nil, err + } + tr = trCA + case s.config.S3.SkipSSLVerify: + tr.(*http.Transport).TLSClientConfig = &tls.Config{ + InsecureSkipVerify: s.config.S3.SkipSSLVerify, + } + } + + creds := credentials.NewStaticV4(s.config.S3.AccessKey, s.config.S3.SecretKey, "") + opt := minio.Options{ + Creds: creds, + Secure: !s.config.S3.Insecure, + Region: s.config.S3.Region, + Transport: tr, + BucketLookup: minio.BucketLookupAuto, + } + c, err := minio.New(s.config.S3.Endpoint, &opt) + if err != nil { + return nil, err + } + return c, nil +} + +func (s *SnapshotManager) uploadS3(ctx context.Context, snapshotPath string, now time.Time) (*snapshotFile, error) { + s.lg.Info("Uploading snapshot to s3://%s/%s", s.config.S3.BucketName, snapshotPath) + + basename := filepath.Base(snapshotPath) + metadata := filepath.Join(filepath.Dir(snapshotPath), "..", metadataDir, basename) + snapshotKey := path.Join(s.config.S3.Folder, basename) + metadataKey := path.Join(s.config.S3.Folder, metadataDir, basename) + + sf := &snapshotFile{ + Name: basename, + Location: fmt.Sprintf("s3://%s/%s", s.config.S3.BucketName, snapshotKey), + CreatedAt: now, + S3: &s3Config{ + Endpoint: s.config.S3.Endpoint, + EndpointCA: s.config.S3.EndpointCA, + SkipSSLVerify: s.config.S3.SkipSSLVerify, + Bucket: s.config.S3.BucketName, + Region: s.config.S3.Region, + Folder: s.config.S3.Folder, + Insecure: s.config.S3.Insecure, + }, + Compressed: strings.HasSuffix(snapshotPath, compressedExtension), + } + + client, err := s.initS3Client() + if err != nil { + return nil, errors.Wrap(err, "failed to initialize s3 client") + } + + uploadInfo, err := s.uploadS3Snapshot(client, ctx, snapshotKey, snapshotPath) + if err != nil { + sf.Status = failedSnapshotStatus + sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error())) + } else { + sf.Status = successfulSnapshotStatus + sf.Size = uploadInfo.Size + } + if _, err := s.uploadS3Metadata(client, ctx, metadataKey, metadata); err != nil { + s.lg.With(logger.Err(err)).Warn("failed to upload snapshot metadata to S3") + } else { + s.lg.With("bucket", s.config.S3.BucketName, "key", metadataKey).Info( + "Uploaded snapshot metadata", + ) + } + return sf, nil +} + +func (s *SnapshotManager) uploadS3Snapshot( + client *minio.Client, + ctx context.Context, + key, path string, +) (minio.UploadInfo, error) { + opts := minio.PutObjectOptions{ + NumThreads: 2, + UserMetadata: map[string]string{ + // TODO : put meaningful info here + }, + } + if strings.HasSuffix(key, compressedExtension) { + opts.ContentType = "application/zip" + } else { + opts.ContentType = "application/octet-stream" + } + ctxca, ca := context.WithTimeout(ctx, s.config.S3.Timeout) + defer ca() + return client.FPutObject(ctxca, s.config.S3.BucketName, key, path, opts) +} + +func (s *SnapshotManager) uploadS3Metadata( + client *minio.Client, + ctx context.Context, + key, path string, +) (minio.UploadInfo, error) { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return minio.UploadInfo{}, nil + } + } + + opts := minio.PutObjectOptions{ + NumThreads: 2, + ContentType: "application/json", + UserMetadata: map[string]string{ + // TODO : meaningful md here + }, + } + ctxca, ca := context.WithTimeout(ctx, s.config.S3.Timeout) + defer ca() + return client.FPutObject(ctxca, s.config.S3.BucketName, key, path, opts) +} + +func (s *SnapshotManager) retention(retention int, snapshotPrefix string, snapshotDir string) error { + return nil +} + +func (s *SnapshotManager) retentionS3() error { + return nil +} + +func (s *SnapshotManager) compressSnapshot( + snapshotDir, snapshotName, snapshotPath string, + now time.Time, +) (string, error) { + s.lg.Info(fmt.Sprintf("Compressing etcd snapshot file : %s", snapshotName)) + + zippedSnapshotName := snapshotName + compressedExtension + zipPath := filepath.Join(snapshotDir, zippedSnapshotName) + + zf, err := os.Create(zipPath) + if err != nil { + return "", err + } + defer zf.Close() + + zipWriter := zip.NewWriter(zf) + defer zipWriter.Close() + + uncompressedPath := filepath.Join(snapshotDir, snapshotName) + fileToZip, err := os.Open(uncompressedPath) + if err != nil { + os.Remove(zipPath) + return "", err + } + defer fileToZip.Close() + + info, err := fileToZip.Stat() + if err != nil { + os.Remove(zipPath) + return "", err + } + + header, err := zip.FileInfoHeader(info) + if err != nil { + os.Remove(zipPath) + return "", err + } + + header.Name = snapshotName + header.Method = zip.Deflate + header.Modified = now + + writer, err := zipWriter.CreateHeader(header) + if err != nil { + os.Remove(zipPath) + return "", err + } + _, err = io.Copy(writer, fileToZip) + + return zipPath, err +} + +// snapshotFile represents a single snapshot and it's +// metadata. +type snapshotFile struct { + Name string `json:"name"` + // Location contains the full path of the snapshot. For + // local paths, the location will be prefixed with "file://". + Location string `json:"location,omitempty"` + Metadata string `json:"metadata,omitempty"` + Message string `json:"message,omitempty"` + CreatedAt time.Time `json:"createdAt,omitempty"` + Size int64 `json:"size,omitempty"` + Status SnapshotStatus `json:"status,omitempty"` + S3 *s3Config `json:"s3Config,omitempty"` + Compressed bool `json:"compressed"` +} + +type s3Config struct { + Endpoint string `json:"endpoint,omitempty"` + EndpointCA string `json:"endpointCA,omitempty"` + SkipSSLVerify bool `json:"skipSSLVerify,omitempty"` + Bucket string `json:"bucket,omitempty"` + Region string `json:"region,omitempty"` + Folder string `json:"folder,omitempty"` + Insecure bool `json:"insecure,omitempty"` +} From 955e55583b3a04b37f56916ab398271e25d7c1a9 Mon Sep 17 00:00:00 2001 From: Alexandre Lamarre Date: Mon, 27 Nov 2023 16:18:26 -0500 Subject: [PATCH 2/4] snapshot manager --- pkg/snapshot/manager.go | 493 +++++++++++++++++++++------- pkg/snapshot/manager_test.go | 228 +++++++++++++ pkg/snapshot/snapshot_suite_test.go | 13 + 3 files changed, 617 insertions(+), 117 deletions(-) create mode 100644 pkg/snapshot/manager_test.go create mode 100644 pkg/snapshot/snapshot_suite_test.go diff --git a/pkg/snapshot/manager.go b/pkg/snapshot/manager.go index 138a27722..fe83164fc 100644 --- a/pkg/snapshot/manager.go +++ b/pkg/snapshot/manager.go @@ -6,9 +6,11 @@ import ( "crypto/tls" "crypto/x509" "encoding/base64" + "encoding/json" "encoding/pem" "fmt" "io" + "io/fs" "log/slog" "net/http" "os" @@ -19,42 +21,68 @@ import ( "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/rancher/opni/pkg/validation" + "github.com/samber/lo" "emperror.dev/errors" "github.com/rancher/opni/pkg/logger" + + // TODO : one or more inflight transactions + "golang.org/x/sync/semaphore" ) const ( maxConcurrentSnapshots = 1 compressedExtension = ".zip" - metadataDir = ".metadata" + _metadataDir = ".metadata" +) + +const ( + LocationPrefixLocal = "file://" + LocationPrefixS3 = "s3://" ) const ( - successfulSnapshotStatus SnapshotStatus = "successful" - failedSnapshotStatus SnapshotStatus = "failed" + SnapshotStatusSuccessful SnapshotStatus = "successful" + SnapshotStatusFailed SnapshotStatus = "failed" +) + +const ( + CompressionZip CompressionType = "zip" ) type SnapshotStatus string +type CompressionType string + type SnapshotConfig struct { SnapshotDir string SnapshotName string DataDir string - + // 0 indicates no retention limit is enforced Retention int Compression *CompressionConfig S3 *S3Config } -func (s *SnapshotConfig) compressionEnabled() bool { - return s.Compression != nil +func (c *SnapshotConfig) Validate() error { + if c.SnapshotName == "" { + return validation.Error("snapshot name required") + } + if c.DataDir == "" { + return validation.Error("data dir required") + } + return nil +} + +func (c *SnapshotConfig) compressionEnabled() bool { + return c.Compression != nil } -func (s *SnapshotConfig) s3Enabled() bool { - return s.S3 != nil +func (c *SnapshotConfig) s3Enabled() bool { + return c.S3 != nil } type S3Config struct { @@ -71,45 +99,164 @@ type S3Config struct { } type CompressionConfig struct { - Type string + Type CompressionType } -type AbstractSnapshotter interface { +type Snapshotter interface { Save(ctx context.Context, snapshotPath string) error } -type AbstractSnapshotManager interface { +type Restorer interface { + Restore(ctx context.Context, path string) error +} + +type BackupRestore interface { + Snapshotter + Restorer +} + +type SnapshotManager interface { Save(ctx context.Context) error - List(ctx context.Context) error + List(ctx context.Context) ([]SnapshotMetadata, error) + Restore(ctx context.Context, snapMd SnapshotMetadata) error } -type SnapshotManager struct { +type snapshotManager struct { lg *slog.Logger config SnapshotConfig - impl AbstractSnapshotter + impl BackupRestore + + s3Client *minio.Client + semaphore *semaphore.Weighted } -var _ AbstractSnapshotManager = (*SnapshotManager)(nil) +var _ SnapshotManager = (*snapshotManager)(nil) func NewSnapshotManager( - impl AbstractSnapshotter, -) *SnapshotManager { - return &SnapshotManager{ - impl: impl, + impl BackupRestore, + config SnapshotConfig, + lg *slog.Logger, +) SnapshotManager { + return &snapshotManager{ + impl: impl, + config: config, + lg: lg, + semaphore: semaphore.NewWeighted(1), } } -func (s *SnapshotManager) List(ctx context.Context) error { - return nil +// TODO : we may not need the default snapshot dir behaviour +func (s *snapshotManager) snapshotDir(create bool) (string, error) { + if s.config.SnapshotDir == "" { + // we have to create the snapshot dir if we are using + // the default snapshot dir if it doesn't exist + defaultSnapshotDir := filepath.Join(s.config.DataDir, "db", "snapshots") + s, err := os.Stat(defaultSnapshotDir) + if err != nil { + if create && os.IsNotExist(err) { + if err := os.MkdirAll(defaultSnapshotDir, 0700); err != nil { + return "", err + } + return defaultSnapshotDir, nil + } + return "", err + } + if s.IsDir() { + return defaultSnapshotDir, nil + } + } + return s.config.SnapshotDir, nil +} + +func (s *snapshotManager) metadataDir(create bool) (string, error) { + snapDir, err := s.snapshotDir(create) + if err != nil { + return snapDir, err + } + + metadataDir := filepath.Join(snapDir, _metadataDir) + _, err = os.Stat(metadataDir) + if err != nil { + if create && os.IsNotExist(err) { + if err := os.MkdirAll(metadataDir, 0700); err != nil { + return "", err + } + } + } + + return metadataDir, nil } -func (s *SnapshotManager) Save(ctx context.Context) error { - nodeName := "TODO" +func (s *snapshotManager) commitMetadata(sf *SnapshotMetadata) error { + metadataDir, err := s.metadataDir(true) + if err != nil { + return err + } + mdPath := filepath.Join(metadataDir, sf.Name) + data, err := json.Marshal(sf) + if err != nil { + return err + } + s.lg.With("path", mdPath).Info("commiting metadata") + return os.WriteFile(mdPath, data, 0700) +} + +func (s *snapshotManager) List(ctx context.Context) ([]SnapshotMetadata, error) { + metadataDir, err := s.metadataDir(false) + if err != nil { + // TODO : maybe do some error handling here and return appropriate status codes + return []SnapshotMetadata{}, err + } + s.lg.With("path", metadataDir).Info("listing local metadata") + fSys := os.DirFS(metadataDir) + res := []SnapshotMetadata{} + err = fs.WalkDir(fSys, ".", func(path string, d fs.DirEntry, _ error) error { + s.lg.With("path", path).Info("checking") + if path == "" { + return nil + } + if d != nil && !d.IsDir() { + data, err := os.ReadFile(filepath.Join(metadataDir, path)) + if err != nil { + s.lg.With(logger.Err(err)).Error("failed to read snapshot metadata contents") + return err + } + var sf SnapshotMetadata + if err := json.Unmarshal(data, &sf); err != nil { + s.lg.With(logger.Err(err)).Warn("failed to unmarshal snapshot file contents, skipping") + } else { + res = append(res, sf) + } + } + return nil + }) + // TODO : don't necessarily return error here + if err != nil { + return nil, err + } + + // TODO : check S3 data here if config is defined + return res, nil +} + +func (s *snapshotManager) compressionConfig() *compressionMetadata { + return lo.Ternary[*compressionMetadata]( + s.config.compressionEnabled(), + &compressionMetadata{ + Type: CompressionZip, + }, + nil, + ) +} + +func (s *snapshotManager) Save(ctx context.Context) error { + if ack := s.semaphore.TryAcquire(1); !ack { + return errors.New("snapshot already in progress") + } now := time.Now().Round(time.Second) snapshotName := fmt.Sprintf( - "%s-%s-%d", + "%s-opni-%d", s.config.SnapshotName, - nodeName, now.Unix(), ) snapshotDir, err := s.snapshotDir(true) @@ -117,16 +264,21 @@ func (s *SnapshotManager) Save(ctx context.Context) error { return err } snapshotPath := filepath.Join(snapshotDir, snapshotName) - var sf *snapshotFile + lg := s.lg.With("snapshot", snapshotName, "path", snapshotPath) + var sf *SnapshotMetadata if err := s.impl.Save(ctx, snapshotPath); err != nil { - sf = &snapshotFile{ + sf = &SnapshotMetadata{ Name: snapshotName, Location: "", CreatedAt: now, - Status: failedSnapshotStatus, + Status: SnapshotStatusFailed, Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), Size: 0, - // metadataSource: extraMetadata, + } + + if err := s.commitMetadata(sf); err != nil { + lg.With(logger.Err(err)).Error("failed to commit metadata after failed snapshot") + return err } } @@ -139,92 +291,86 @@ func (s *SnapshotManager) Save(ctx context.Context) error { return errors.Wrap(err, "failed to compress etcd snapshot") } snapshotPath = zipPath - s.lg.Info("compressed snapshot") + lg.Info("compressed snapshot") } f, err := os.Stat(snapshotPath) if err != nil { return errors.Wrap(err, "unable to retrieve snapshot information from local snapshot") } - sf = &snapshotFile{ - Name: f.Name(), - Location: "file://" + snapshotPath, - CreatedAt: now, - Status: successfulSnapshotStatus, - Size: f.Size(), - Compressed: s.config.compressionEnabled(), + sf = &SnapshotMetadata{ + Name: f.Name(), + Location: LocationPrefixLocal + snapshotPath, + CreatedAt: now, + Status: SnapshotStatusSuccessful, + Size: f.Size(), + Compressed: s.config.compressionEnabled(), + CompressionMetadata: s.compressionConfig(), } - // TODO : persist snapshot metadata file locally + if err := s.commitMetadata(sf); err != nil { + lg.Warn("failed to commit metadata after successful snapshot") + } - // TODO : check/prune retention limits + // TODO : check/prune based on retention limits if err := s.retention(s.config.Retention, s.config.SnapshotName, s.config.SnapshotDir); err != nil { return errors.Wrap(err, "failed to apply local snapshot retention policy") } if s.config.s3Enabled() { - // TODO : init client here - sf = &snapshotFile{ - Name: filepath.Base(snapshotPath), - CreatedAt: now, - Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), - Size: 0, - Status: failedSnapshotStatus, - S3: &s3Config{ - Endpoint: s.config.S3.Endpoint, - EndpointCA: s.config.S3.EndpointCA, - SkipSSLVerify: s.config.S3.SkipSSLVerify, - Bucket: s.config.S3.BucketName, - Region: s.config.S3.Region, - Folder: s.config.S3.Folder, - Insecure: s.config.S3.Insecure, - }, - } - // if init succeeds here, try upload - sf, err := s.uploadS3(ctx, snapshotPath, now) - if err != nil { - s.lg.With(logger.Err(err)).Error( - "Error received during snapshot upload %s", err) + if err := s.preRunS3(); err != nil { + sf = &SnapshotMetadata{ + Name: filepath.Base(snapshotPath), + CreatedAt: now, + Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), + Size: 0, + Status: SnapshotStatusFailed, + S3: &s3Metadata{ + Endpoint: s.config.S3.Endpoint, + EndpointCA: s.config.S3.EndpointCA, + SkipSSLVerify: s.config.S3.SkipSSLVerify, + Bucket: s.config.S3.BucketName, + Region: s.config.S3.Region, + Folder: s.config.S3.Folder, + Insecure: s.config.S3.Insecure, + }, + } } else { - s.lg.Info("S3 upload complete") - } + // if init succeeds here, try upload + sf, err = s.uploadS3(ctx, snapshotPath, now) + if err != nil { + lg.With(logger.Err(err)).Error( + "Error received during snapshot upload %s", err) + } else { + lg.Info("S3 upload complete") + } - if err := s.retentionS3(); err != nil { - s.lg.With(logger.Err(err)).Error( - "failed to apply s3 snapshot retention policy", - ) + if err := s.retentionS3(); err != nil { + lg.With(logger.Err(err)).Error( + "failed to apply s3 snapshot retention policy", + ) + } } - - // TODO : persist metadata file locally - fmt.Println(sf) - // either it is snapshot md or s3 failure record + } + if err := s.commitMetadata(sf); err != nil { + lg.Warn("failed to persist metadata locally") } } return nil } -// snapshotDir ensures that the snapshot directory exists, and then returns its path. -func (s *SnapshotManager) snapshotDir(create bool) (string, error) { - if s.config.SnapshotDir == "" { - // we have to create the snapshot dir if we are using - // the default snapshot dir if it doesn't exist - defaultSnapshotDir := filepath.Join(s.config.DataDir, "db", "snapshots") - s, err := os.Stat(defaultSnapshotDir) +func (s *snapshotManager) preRunS3() error { + if s.s3Client == nil { + client, err := s.initS3Client() if err != nil { - if create && os.IsNotExist(err) { - if err := os.MkdirAll(defaultSnapshotDir, 0700); err != nil { - return "", err - } - return defaultSnapshotDir, nil - } - return "", err - } - if s.IsDir() { - return defaultSnapshotDir, nil + return err } + s.s3Client = client } - return s.config.SnapshotDir, nil + return nil } +// snapshotDir ensures that the snapshot directory exists, and then returns its path. + // isValidCertificate checks to see if the given // byte slice is a valid x509 certificate. func isValidCertificate(c []byte) bool { @@ -267,7 +413,7 @@ func readS3EndpointCA(endpointCA string) ([]byte, error) { return ca, nil } -func (s *SnapshotManager) initS3Client() (*minio.Client, error) { +func (s *snapshotManager) initS3Client() (*minio.Client, error) { if s.config.S3.BucketName == "" { return nil, errors.New("s3 bucket name was not set") } @@ -301,19 +447,19 @@ func (s *SnapshotManager) initS3Client() (*minio.Client, error) { return c, nil } -func (s *SnapshotManager) uploadS3(ctx context.Context, snapshotPath string, now time.Time) (*snapshotFile, error) { - s.lg.Info("Uploading snapshot to s3://%s/%s", s.config.S3.BucketName, snapshotPath) +func (s *snapshotManager) uploadS3(ctx context.Context, snapshotPath string, now time.Time) (*SnapshotMetadata, error) { + s.lg.Info(fmt.Sprintf("Uploading snapshot to s3://%s/%s", s.config.S3.BucketName, snapshotPath)) basename := filepath.Base(snapshotPath) - metadata := filepath.Join(filepath.Dir(snapshotPath), "..", metadataDir, basename) + metadata := filepath.Join(filepath.Dir(snapshotPath), "..", _metadataDir, basename) snapshotKey := path.Join(s.config.S3.Folder, basename) - metadataKey := path.Join(s.config.S3.Folder, metadataDir, basename) + metadataKey := path.Join(s.config.S3.Folder, _metadataDir, basename) - sf := &snapshotFile{ + sf := &SnapshotMetadata{ Name: basename, - Location: fmt.Sprintf("s3://%s/%s", s.config.S3.BucketName, snapshotKey), + Location: LocationPrefixS3 + fmt.Sprintf("%s/%s", s.config.S3.BucketName, snapshotKey), CreatedAt: now, - S3: &s3Config{ + S3: &s3Metadata{ Endpoint: s.config.S3.Endpoint, EndpointCA: s.config.S3.EndpointCA, SkipSSLVerify: s.config.S3.SkipSSLVerify, @@ -322,7 +468,8 @@ func (s *SnapshotManager) uploadS3(ctx context.Context, snapshotPath string, now Folder: s.config.S3.Folder, Insecure: s.config.S3.Insecure, }, - Compressed: strings.HasSuffix(snapshotPath, compressedExtension), + Compressed: strings.HasSuffix(snapshotPath, compressedExtension), + CompressionMetadata: s.compressionConfig(), } client, err := s.initS3Client() @@ -332,10 +479,10 @@ func (s *SnapshotManager) uploadS3(ctx context.Context, snapshotPath string, now uploadInfo, err := s.uploadS3Snapshot(client, ctx, snapshotKey, snapshotPath) if err != nil { - sf.Status = failedSnapshotStatus + sf.Status = SnapshotStatusFailed sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error())) } else { - sf.Status = successfulSnapshotStatus + sf.Status = SnapshotStatusSuccessful sf.Size = uploadInfo.Size } if _, err := s.uploadS3Metadata(client, ctx, metadataKey, metadata); err != nil { @@ -348,7 +495,7 @@ func (s *SnapshotManager) uploadS3(ctx context.Context, snapshotPath string, now return sf, nil } -func (s *SnapshotManager) uploadS3Snapshot( +func (s *snapshotManager) uploadS3Snapshot( client *minio.Client, ctx context.Context, key, path string, @@ -369,7 +516,7 @@ func (s *SnapshotManager) uploadS3Snapshot( return client.FPutObject(ctxca, s.config.S3.BucketName, key, path, opts) } -func (s *SnapshotManager) uploadS3Metadata( +func (s *snapshotManager) uploadS3Metadata( client *minio.Client, ctx context.Context, key, path string, @@ -392,15 +539,17 @@ func (s *SnapshotManager) uploadS3Metadata( return client.FPutObject(ctxca, s.config.S3.BucketName, key, path, opts) } -func (s *SnapshotManager) retention(retention int, snapshotPrefix string, snapshotDir string) error { +func (s *snapshotManager) retention(retention int, snapshotPrefix string, snapshotDir string) error { + // TODO return nil } -func (s *SnapshotManager) retentionS3() error { +func (s *snapshotManager) retentionS3() error { + // TODO return nil } -func (s *SnapshotManager) compressSnapshot( +func (s *snapshotManager) compressSnapshot( snapshotDir, snapshotName, snapshotPath string, now time.Time, ) (string, error) { @@ -452,23 +601,119 @@ func (s *SnapshotManager) compressSnapshot( return zipPath, err } -// snapshotFile represents a single snapshot and it's -// metadata. -type snapshotFile struct { +func (s *snapshotManager) Restore(ctx context.Context, snapMd SnapshotMetadata) error { + if err := snapMd.Validate(); err != nil { + return err + } + switch { + case strings.HasPrefix(snapMd.Location, LocationPrefixLocal): + return s.restoreFromFile(ctx, snapMd) + case strings.HasPrefix(snapMd.Location, LocationPrefixS3): + return s.restoreFromS3(ctx, snapMd) + default: + return errors.New( + fmt.Sprintf( + "unimplemented encoded storage type in '%s' for restore", + snapMd.Location, + )) + } +} + +func (s *snapshotManager) restoreFromFile(ctx context.Context, snapMd SnapshotMetadata) error { + path := strings.TrimPrefix(snapMd.Location, LocationPrefixLocal) + + if snapMd.Compressed { + var err error + path, err = s.decompress(path) + if err != nil { + return err + } + } + + return s.impl.Restore(ctx, path) + +} + +func (s *snapshotManager) restoreFromS3(ctx context.Context, snapMd SnapshotMetadata) error { + if !s.config.s3Enabled() { + return errors.New("s3 is not enabled in snapshot manager despite having an s3 snapshot") + } + client, err := s.initS3Client() + if err != nil { + return err + } + opts := minio.GetObjectOptions{} + exists, err := client.BucketExists(ctx, s.config.S3.BucketName) + if err != nil { + return errors.Wrap(err, "failed to verify bucket existence") + } + if !exists { + return fmt.Errorf("bucket '%s' does not exist", s.config.S3.BucketName) + } + path := strings.TrimPrefix(snapMd.Location, LocationPrefixS3) + + if err := client.FGetObject(ctx, s.config.S3.BucketName, path, path, opts); err != nil { + return err + } + + if snapMd.Compressed { + var err error + path, err = s.decompress(path) + if err != nil { + return errors.Wrap(err, "failed to decompress downloaded S3 snapshot") + } + } + + return s.impl.Restore(ctx, path) +} + +func (s *snapshotManager) decompress(path string) (string, error) { + zf, err := zip.OpenReader(path) + if err != nil { + return "", err + } + defer zf.Close() + + if len(zf.File) != 1 { + return "", errors.New("expected only one file") + } + fileToExtract := zf.File[0] + fileReader, err := fileToExtract.Open() + if err != nil { + return "", err + } + defer fileReader.Close() + + extractedFileP := strings.TrimSuffix(path, string(CompressionZip)) + extractedFile, err := os.Create(extractedFileP) + if err != nil { + s.lg.Error(err.Error()) + } + defer extractedFile.Close() + + if _, err := io.Copy(extractedFile, fileReader); err != nil { + return "", err + } + return extractedFileP, nil +} + +// SnapshotMetadata represents a single snapshot and its metadata. +type SnapshotMetadata struct { Name string `json:"name"` // Location contains the full path of the snapshot. For // local paths, the location will be prefixed with "file://". - Location string `json:"location,omitempty"` - Metadata string `json:"metadata,omitempty"` - Message string `json:"message,omitempty"` - CreatedAt time.Time `json:"createdAt,omitempty"` - Size int64 `json:"size,omitempty"` - Status SnapshotStatus `json:"status,omitempty"` - S3 *s3Config `json:"s3Config,omitempty"` - Compressed bool `json:"compressed"` + Location string `json:"location,omitempty"` + Metadata string `json:"metadata,omitempty"` + Message string `json:"message,omitempty"` + CreatedAt time.Time `json:"createdAt,omitempty"` + Size int64 `json:"size,omitempty"` + Status SnapshotStatus `json:"status,omitempty"` + S3 *s3Metadata `json:"s3Metadata,omitempty"` + Compressed bool `json:"compressed"` + CompressionMetadata *compressionMetadata `json:"compressionMetadata"` } -type s3Config struct { +type s3Metadata struct { Endpoint string `json:"endpoint,omitempty"` EndpointCA string `json:"endpointCA,omitempty"` SkipSSLVerify bool `json:"skipSSLVerify,omitempty"` @@ -477,3 +722,17 @@ type s3Config struct { Folder string `json:"folder,omitempty"` Insecure bool `json:"insecure,omitempty"` } + +type compressionMetadata struct { + Type CompressionType `json:"type"` +} + +func (c *SnapshotMetadata) Validate() error { + if c.Location == "" { + return errors.New("snapshot location required") + } + if c.Compressed && c.CompressionMetadata == nil { + return errors.New("mismatched compression fields") + } + return nil +} diff --git a/pkg/snapshot/manager_test.go b/pkg/snapshot/manager_test.go new file mode 100644 index 000000000..19b1857e6 --- /dev/null +++ b/pkg/snapshot/manager_test.go @@ -0,0 +1,228 @@ +package snapshot_test + +import ( + "context" + "encoding/base64" + "fmt" + "os" + "strings" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/rancher/opni/pkg/logger" + "github.com/rancher/opni/pkg/snapshot" + _ "github.com/rancher/opni/pkg/test/setup" + "github.com/rancher/opni/pkg/test/testruntime" +) + +type testSnapshotter struct { + contents []byte +} + +var _ snapshot.Snapshotter = (*testSnapshotter)(nil) + +func (t *testSnapshotter) Save(_ context.Context, path string) error { + return os.WriteFile(path, t.contents, 0700) +} + +func (t *testSnapshotter) Restore(_ context.Context, path string) error { + data, err := os.ReadFile(path) + if err != nil { + return err + } + t.contents = data + return nil +} + +func (t *testSnapshotter) data() []byte { + return t.contents +} + +func newTestSnapshotter(contents []byte) *testSnapshotter { + return &testSnapshotter{} +} + +var _ = Describe("Snapshot Manager", Ordered, Label("unit"), func() { + var ctx context.Context + var snapshotDir string + BeforeAll(func() { + dir, err := os.MkdirTemp("/tmp", "snapshots-*") + Expect(err).To(Succeed()) + snapshotDir = dir + ctx = context.TODO() + DeferCleanup(func() { + os.RemoveAll(snapshotDir) + }) + }) + + AfterEach(func() { + Expect(removeAllContents(snapshotDir)).To(Succeed()) + }) + + It("should create local snapshots", func() { + cfg := snapshot.SnapshotConfig{ + SnapshotDir: snapshotDir, + DataDir: snapshotDir, + SnapshotName: "my-snapshot", + } + Expect(cfg.Validate()).To(Succeed()) + + s := newTestSnapshotter([]byte("data")) + backedData := []byte{} + copy(backedData, s.data()) + + m := snapshot.NewSnapshotManager( + s, + cfg, + logger.New(), + ) + + By("verifying the snapshots are successful") + Expect(m.Save(ctx)).To(Succeed()) + + items, err := m.List(ctx) + Expect(err).To(Succeed()) + Expect(items).To(HaveLen(1)) + + By("veryfying the identifying snapshot metadata is correct") + snapMd := items[0] + Expect(snapMd.Name).To(HavePrefix("my-snapshot")) + Expect(snapMd.Compressed).To(BeFalse()) + Expect(snapMd.Status).To(Equal(snapshot.SnapshotStatusSuccessful)) + finfo, err := os.Stat(strings.TrimPrefix(snapMd.Location, "file://")) + Expect(err).To(Succeed()) + Expect(finfo.Size()).NotTo(Equal(0)) + Expect(finfo.Size()).To(Equal(snapMd.Size)) + + By("verifying we can restore from the snapshot metadata") + s.contents = []byte("newer data") + Expect(m.Restore(ctx, snapMd)).To(Succeed()) + Expect(s.data()).To(Equal(backedData)) + }) + + It("should create compressed (zip) local snapshots", func() { + cfg := snapshot.SnapshotConfig{ + SnapshotDir: snapshotDir, + DataDir: snapshotDir, + SnapshotName: "compr-snapshot", + Compression: &snapshot.CompressionConfig{ + Type: snapshot.CompressionZip, + }, + } + Expect(cfg.Validate()).To(Succeed()) + + uncompressedSize := 10000 + s := newTestSnapshotter(simpleTestData(uncompressedSize)) + backedData := []byte{} + copy(backedData, s.data()) + + m := snapshot.NewSnapshotManager( + s, + cfg, + logger.New(), + ) + By("verifying that compressed snapshots are successful") + Expect(m.Save(ctx)).To(Succeed()) + + items, err := m.List(ctx) + Expect(err).To(Succeed()) + Expect(items).To(HaveLen(1)) + + By("verifying the snapshot has the correct identifying metadata") + snapMd := items[0] + Expect(snapMd.Name).To(HavePrefix("compr-snapshot")) + Expect(snapMd.Compressed).To(BeTrue()) + Expect(snapMd.Status).To(Equal(snapshot.SnapshotStatusSuccessful)) + finfo, err := os.Stat(strings.TrimPrefix(snapMd.Location, "file://")) + Expect(err).To(Succeed()) + Expect(finfo.Size()).To(BeNumerically("<", uncompressedSize)) + + By("verifying we can restore from a compressed snapshot") + s.contents = []byte("newer data") + Expect(m.Restore(ctx, snapMd)).To(Succeed()) + Expect(s.data()).To(Equal(backedData)) + }) + + It("should be able to upload snapshots to S3", func() { + testruntime.IfCI( + func() { + Skip("Skipping S3 uploads in CI") + }, + ) + accessKeyId := os.Getenv("AWS_ACCESS_KEY_ID") + secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY") + bucket := os.Getenv("AWS_SNAPSHOT_BUCKET") + if accessKeyId == "" || secretAccessKey == "" || bucket == "" { + Fail("Insufficient config for running S3 suite") + } + + s := newTestSnapshotter( + []byte("hello from remote"), + ) + + m := snapshot.NewSnapshotManager( + s, + snapshot.SnapshotConfig{ + SnapshotDir: snapshotDir, + SnapshotName: "remote-backup", + Retention: 0, + S3: &snapshot.S3Config{ + Endpoint: "s3.us-east-2.amazonaws.com", + AccessKey: accessKeyId, + SecretKey: secretAccessKey, + Region: "us-east-2", + Folder: "snapshots", + BucketName: bucket, + }, + }, + logger.New(), + ) + + Expect(m.Save(ctx)).To(Succeed()) + items, err := m.List(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(items).To(HaveLen(1)) + + snapMd := items[0] + msg, err := base64.StdEncoding.DecodeString(snapMd.Message) + Expect(err).To(Succeed()) + Expect(snapMd.Status).To(Equal(snapshot.SnapshotStatusSuccessful), string(msg)) + }) +}) + +func removeAllContents(dirPath string) error { + // Read the contents of the directory + entries, err := os.ReadDir(dirPath) + if err != nil { + return err + } + + // Iterate through each entry and remove it + for _, entry := range entries { + entryPath := fmt.Sprintf("%s/%s", dirPath, entry.Name()) + + // Remove files + if entry.IsDir() { + // Remove directories recursively + err = os.RemoveAll(entryPath) + if err != nil { + return err + } + } else { + err = os.Remove(entryPath) + if err != nil { + return err + } + } + } + + return nil +} + +func simpleTestData(n int) []byte { + ret := make([]byte, n) + for i := 0; i < len(ret); i++ { + ret[i] = 'a' + } + return ret +} diff --git a/pkg/snapshot/snapshot_suite_test.go b/pkg/snapshot/snapshot_suite_test.go new file mode 100644 index 000000000..4be216249 --- /dev/null +++ b/pkg/snapshot/snapshot_suite_test.go @@ -0,0 +1,13 @@ +package snapshot_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestSnapshot(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Snapshot Suite") +} From ba16b3cd4ad582a917eefaec18388b00d6005515 Mon Sep 17 00:00:00 2001 From: Alexandre Lamarre Date: Tue, 28 Nov 2023 16:38:44 -0500 Subject: [PATCH 3/4] backup restore backend implementations --- go.mod | 19 +- go.sum | 28 +-- pkg/storage/etcd/etcd_suite_test.go | 39 ++++ pkg/storage/etcd/snapshot.go | 129 +++++++++++++ pkg/storage/jetstream/jetstream_suite_test.go | 3 + pkg/storage/jetstream/snapshot.go | 169 ++++++++++++++++++ pkg/test/conformance/storage/snapshot.go | 158 ++++++++++++++++ 7 files changed, 522 insertions(+), 23 deletions(-) create mode 100644 pkg/storage/etcd/snapshot.go create mode 100644 pkg/storage/jetstream/snapshot.go create mode 100644 pkg/test/conformance/storage/snapshot.go diff --git a/go.mod b/go.mod index 6305e1366..c97e8f9e3 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/jwalton/go-supportscolor v1.1.0 github.com/karlseguin/ccache/v3 v3.0.3 github.com/kazegusuri/channelzcli v0.0.0-20230307031351-17bac34c51ca - github.com/klauspost/compress v1.16.7 + github.com/klauspost/compress v1.17.0 github.com/kralicky/gpkg v0.0.0-20231114180450-2f4bff8c5588 github.com/kralicky/kmatch v0.0.0-20230301203314-20f658a0e56c github.com/kralicky/protols v0.0.0-20231102164430-669d20c5e11a @@ -73,8 +73,9 @@ require ( github.com/mholt/archiver/v4 v4.0.0-alpha.8 github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 github.com/muesli/termenv v0.15.2 - github.com/nats-io/nats.go v1.28.0 - github.com/nats-io/nkeys v0.4.4 + github.com/nats-io/jsm.go v0.1.0 + github.com/nats-io/nats.go v1.30.0 + github.com/nats-io/nkeys v0.4.5 github.com/nsf/jsondiff v0.0.0-20230430225905-43f6cf3098c1 github.com/olebedev/when v0.0.0-20221205223600-4d190b02b8d8 github.com/onsi/biloba v0.1.5 @@ -202,7 +203,7 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9 // indirect - github.com/antonmedv/expr v1.15.0 // indirect + github.com/antonmedv/expr v1.15.2 // indirect github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect @@ -408,7 +409,7 @@ require ( github.com/miekg/dns v1.1.55 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/minio/md5-simd v1.1.2 // indirect - github.com/minio/minio-go/v7 v7.0.63 // indirect + github.com/minio/minio-go/v7 v7.0.63 github.com/minio/sha256-simd v1.0.1 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect @@ -431,8 +432,8 @@ require ( github.com/muesli/reflow v0.3.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect - github.com/nats-io/jwt/v2 v2.3.0 // indirect - github.com/nats-io/nats-server/v2 v2.8.4 + github.com/nats-io/jwt/v2 v2.5.2 // indirect + github.com/nats-io/nats-server/v2 v2.10.0 github.com/nats-io/nuid v1.0.1 // indirect github.com/ncw/swift v1.0.53 // indirect github.com/nwaples/rardecode/v2 v2.0.0-beta.2 // indirect @@ -517,10 +518,10 @@ require ( go.etcd.io/bbolt v1.3.7 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect go.etcd.io/etcd/client/v2 v2.305.9 // indirect - go.etcd.io/etcd/etcdutl/v3 v3.5.9 // indirect + go.etcd.io/etcd/etcdutl/v3 v3.5.9 go.etcd.io/etcd/pkg/v3 v3.5.9 // indirect go.etcd.io/etcd/raft/v3 v3.5.9 // indirect - go.etcd.io/etcd/server/v3 v3.5.9 // indirect + go.etcd.io/etcd/server/v3 v3.5.9 go.mongodb.org/mongo-driver v1.12.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector v0.84.0 // indirect diff --git a/go.sum b/go.sum index 32132d5e3..db1f0e007 100644 --- a/go.sum +++ b/go.sum @@ -696,8 +696,8 @@ github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuW github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9 h1:goHVqTbFX3AIo0tzGr14pgfAW2ZfPChKO21Z9MGf/gk= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM= -github.com/antonmedv/expr v1.15.0 h1:sBHNMx1i+b1lZfkBFGhicvSLW6RLnca3R0B7jWrk8iM= -github.com/antonmedv/expr v1.15.0/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4JUv1ihsE= +github.com/antonmedv/expr v1.15.2 h1:afFXpDWIC2n3bF+kTZE1JvFo+c34uaM3sTqh8z0xfdU= +github.com/antonmedv/expr v1.15.2/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4JUv1ihsE= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -1779,8 +1779,8 @@ github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0 github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -2070,21 +2070,22 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/nats-io/jsm.go v0.1.0 h1:H2gYCee/iyBDjUftPOr5fEPWAcG/+fyVl89IWiy6AC4= +github.com/nats-io/jsm.go v0.1.0/go.mod h1:snnYORje42cEDCX5QygzeoVA2KiWVbiIJbLfGIvXW08= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= -github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= -github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= +github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= -github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4= -github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= +github.com/nats-io/nats-server/v2 v2.10.0 h1:rcU++Hzo+wARxtJugrV3J5z5iGdHeVG8tT8Chb3bKDg= +github.com/nats-io/nats-server/v2 v2.10.0/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= -github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= -github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/nats.go v1.30.0 h1:bj/rVsRCrFXxmm9mJiDhb74UKl2HhKpDwKRBtvCjZjc= +github.com/nats-io/nats.go v1.30.0/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= -github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= -github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= -github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= +github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= @@ -2841,7 +2842,6 @@ golang.org/x/crypto v0.0.0-20200414173820-0848c9571904/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= diff --git a/pkg/storage/etcd/etcd_suite_test.go b/pkg/storage/etcd/etcd_suite_test.go index 2c459b4bf..33c065fb1 100644 --- a/pkg/storage/etcd/etcd_suite_test.go +++ b/pkg/storage/etcd/etcd_suite_test.go @@ -2,10 +2,14 @@ package etcd_test import ( "context" + "crypto/tls" + "fmt" + "path/filepath" "testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/rancher/opni/pkg/config/v1beta1" "github.com/rancher/opni/pkg/logger" "github.com/rancher/opni/pkg/storage" "github.com/rancher/opni/pkg/storage/etcd" @@ -13,8 +17,10 @@ import ( . "github.com/rancher/opni/pkg/test/conformance/storage" _ "github.com/rancher/opni/pkg/test/setup" "github.com/rancher/opni/pkg/test/testruntime" + "github.com/rancher/opni/pkg/util" "github.com/rancher/opni/pkg/util/future" "github.com/samber/lo" + clientv3 "go.etcd.io/etcd/client/v3" ) func TestEtcd(t *testing.T) { @@ -22,12 +28,38 @@ func TestEtcd(t *testing.T) { RunSpecs(t, "Etcd Storage Suite") } +func clientFromConfig( + ctx context.Context, + conf *v1beta1.EtcdStorageSpec, +) (*clientv3.Client, *clientv3.Config, error) { + var tlsConfig *tls.Config + if conf.Certs != nil { + var err error + tlsConfig, err = util.LoadClientMTLSConfig(*conf.Certs) + if err != nil { + return nil, nil, fmt.Errorf("failed to load client TLS config: %w", err) + } + } + + clientConfig := &clientv3.Config{ + Endpoints: conf.Endpoints, + TLS: tlsConfig, + Context: context.WithoutCancel(ctx), + } + cli, err := clientv3.New(*clientConfig) + if err != nil { + return nil, clientConfig, fmt.Errorf("failed to create etcd client: %w", err) + } + return cli, clientConfig, nil +} + var store = future.New[*etcd.EtcdStore]() var lmF = future.New[storage.LockManager]() var lmSet = future.New[lo.Tuple3[ storage.LockManager, storage.LockManager, storage.LockManager, ]]() +var snapshotter = future.New[*etcd.Snapshotter]() var _ = BeforeSuite(func() { testruntime.IfIntegration(func() { @@ -68,7 +100,14 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) store.Set(client) + + cli, config, err := clientFromConfig(context.Background(), env.EtcdConfig()) Expect(err).To(Succeed()) + dataDir := filepath.Join(env.GetTempDirectory(), "etcd") + + s := etcd.NewSnapshotter(config, cli, dataDir, logger.New()) + + snapshotter.Set(s) DeferCleanup(env.Stop, "Test Suite Finished") }) }) diff --git a/pkg/storage/etcd/snapshot.go b/pkg/storage/etcd/snapshot.go new file mode 100644 index 000000000..1a8ec10bb --- /dev/null +++ b/pkg/storage/etcd/snapshot.go @@ -0,0 +1,129 @@ +package etcd + +import ( + "context" + "fmt" + "log/slog" + "strings" + + "github.com/rancher/opni/pkg/logger" + "go.uber.org/zap" + + opnisnapshot "github.com/rancher/opni/pkg/snapshot" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/etcdutl/v3/snapshot" + "go.etcd.io/etcd/server/v3/datadir" +) + +const ( + defaultName = "default" + defaultInitialAdvertisePeerURLs = "http://localhost:2380" +) + +type Snapshotter struct { + lg *slog.Logger + + clientConfig *clientv3.Config + client *clientv3.Client + + dataDir string +} + +func NewSnapshotter( + clientConfig *clientv3.Config, + client *clientv3.Client, + dataDir string, + lg *slog.Logger, +) *Snapshotter { + return &Snapshotter{ + lg: lg, + clientConfig: clientConfig, + client: client, + dataDir: dataDir, + } +} + +var _ opnisnapshot.BackupRestore = (*Snapshotter)(nil) + +func (s *Snapshotter) Save(ctx context.Context, path string) error { + endpoint := s.clientConfig.Endpoints[0] + status, err := s.client.Status(ctx, endpoint) + if err != nil { + return err + } + if status.IsLearner { + s.lg.Warn("Unable to take snapshot : not supported for learner") + return nil + } + + if err := snapshot.NewV3(zap.NewNop()).Save( + ctx, *s.clientConfig, path, + ); err != nil { + s.lg.With(logger.Err(err)).Error("failed to take etcd snapshot") + return err + } + + return nil +} + +func (s *Snapshotter) Restore(ctx context.Context, path string) error { + return s.restoreFunc( + path, + //FIXME: the following are auto assigned defaults + //TODO : add link to reference + initialClusterFromName(defaultName), + "etcd-cluster", + "", + "", + defaultInitialAdvertisePeerURLs, + defaultName, + false, + ) +} +func (s *Snapshotter) restoreFunc( + snapshotPath string, + // other thingies + restoreCluster string, + restoreClusterToken string, + restoreDataDir string, + restoreWalDir string, + restorePeerURLs string, + restoreName string, + skipHashCheck bool, +) error { + // TODO : this kinda janky + sp := snapshot.NewV3(zap.NewNop()) + + dataDir := restoreDataDir + if dataDir == "" { + dataDir = "restore.etcd" + // dataDir = filepath.Join(s.dataDir, "..", restoreName+".etcd") + } + + walDir := restoreWalDir + if walDir == "" { + walDir = datadir.ToWalDir(dataDir) + } + + if err := sp.Restore(snapshot.RestoreConfig{ + SnapshotPath: snapshotPath, + Name: restoreName, + OutputDataDir: dataDir, + OutputWALDir: walDir, + PeerURLs: strings.Split(restorePeerURLs, ","), + InitialCluster: restoreCluster, + InitialClusterToken: restoreClusterToken, + SkipHashCheck: skipHashCheck, + }); err != nil { + return err + } + return nil +} + +func initialClusterFromName(name string) string { + n := name + if name == "" { + n = defaultName + } + return fmt.Sprintf("%s=http://localhost:2380", n) +} diff --git a/pkg/storage/jetstream/jetstream_suite_test.go b/pkg/storage/jetstream/jetstream_suite_test.go index d3eadef42..55bcd81ec 100644 --- a/pkg/storage/jetstream/jetstream_suite_test.go +++ b/pkg/storage/jetstream/jetstream_suite_test.go @@ -29,6 +29,8 @@ var lmSetF = future.New[lo.Tuple3[ ]]() var store = future.New[*jetstream.JetStreamStore]() +var snapshotter = future.New[*jetstream.Snapshotter]() + var _ = BeforeSuite(func() { testruntime.IfIntegration(func() { env := test.Environment{} @@ -95,6 +97,7 @@ var _ = Describe("Jetstream Keyring Store", Ordered, Label("integration", "slow" var _ = Describe("Jetstream KV Store", Ordered, Label("integration", "slow"), KeyValueStoreTestSuite(store, NewBytes, Equal)) var _ = Describe("Jetstream Lock Manager", Ordered, Label("integration", "slow"), LockManagerTestSuite(lmF, lmSetF)) +var _ = XDescribe("Jetstream Backup Restore", Ordered, Label("integration", "slow"), SnapshotSuiteTest(snapshotter)) var _ = Context("Error Codes", func() { Specify("Nats KeyNotFound errors should be equal to ErrNotFound", func() { diff --git a/pkg/storage/jetstream/snapshot.go b/pkg/storage/jetstream/snapshot.go new file mode 100644 index 000000000..c32453518 --- /dev/null +++ b/pkg/storage/jetstream/snapshot.go @@ -0,0 +1,169 @@ +// Implementation references the nats CLI implementation : +// https://github.com/nats-io/natscli/blob/875003bdaf4b55a263b1221cf1a64d1b9d482412/cli/account_command.go#L66-L76 +package jetstream + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "path/filepath" + "time" + + "github.com/pkg/errors" + + "github.com/dustin/go-humanize" + "github.com/nats-io/jsm.go" + "github.com/nats-io/jsm.go/api" + "github.com/rancher/opni/pkg/logger" + "github.com/rancher/opni/pkg/snapshot" +) + +type Snapshotter struct { + mgr *jsm.Manager + + lg *slog.Logger +} + +var _ snapshot.BackupRestore = (*Snapshotter)(nil) + +func NewSnapshotter( + mgr *jsm.Manager, + lg *slog.Logger, +) *Snapshotter { + return &Snapshotter{ + mgr: mgr, + lg: lg, + } +} + +func (s *Snapshotter) Save(ctx context.Context, path string) error { + streams, missing, err := s.mgr.Streams(nil) + if err != nil { + return err + } + if len(missing) > 0 { + return fmt.Errorf("could not obtain stream information for %d streams", len(missing)) + } + + if len(streams) == 0 { + return fmt.Errorf("no streams found") + } + + totalSize := uint64(0) + totalConsumers := 0 + for _, s := range streams { + state, _ := s.LatestState() + totalConsumers += state.Consumers + totalSize += state.Bytes + } + + if err := os.MkdirAll(path, 0700); err != nil { + return err + } + + for _, stream := range streams { + where := filepath.Join(path, stream.Name()) + err := s.backupStream(ctx, stream, where) + if errors.Is(err, jsm.ErrMemoryStreamNotSupported) { + s.lg.With(logger.Err(err)).Warn(fmt.Sprintf("backup of %s failed", stream.Name())) + } else if err != nil { + return fmt.Errorf("backup failed : %w", err) + } + } + return nil +} + +func (s *Snapshotter) backupStream(ctx context.Context, stream *jsm.Stream, outputPath string) error { + fp, err := stream.SnapshotToDirectory(ctx, outputPath, jsm.SnapshotHealthCheck()) + if err != nil { + return err + } + + s.lg.Info( + fmt.Sprintf( + "Received %s compressed data in %d chunks for stream %q in %v, %s uncompressed \n", + humanize.IBytes(fp.BytesReceived()), + fp.ChunksReceived(), + stream.Name(), + fp.EndTime().Sub(fp.StartTime()).Round(time.Millisecond), + humanize.IBytes(fp.UncompressedBytesReceived()), + ), + ) + + return nil +} + +func (s *Snapshotter) Restore(ctx context.Context, path string) error { + streams, err := s.mgr.StreamNames(nil) + if err != nil { + return err + } + existingStreams := map[string]struct{}{} + for _, n := range streams { + existingStreams[n] = struct{}{} + } + de, err := os.ReadDir(path) + if err != nil { + return err + } + for _, d := range de { + if !d.IsDir() { + return fmt.Errorf("expected a directory") + } + if _, ok := existingStreams[d.Name()]; ok { + return fmt.Errorf("stream already exists") + } + if _, err := os.Stat(filepath.Join(path, d.Name(), "backup.json")); err != nil { + return errors.Wrap(err, "expected backup.json") + } + } + s.lg.Info(fmt.Sprintf("restoring backup of all %d streams in directory : %q", len(de), path)) + for _, d := range de { + restorePath := filepath.Join(path, d.Name()) + if err := s.restoreStream(ctx, restorePath); err != nil { + return err + } + } + return nil +} + +func (s *Snapshotter) restoreStream(ctx context.Context, path string) error { + var bm api.JSApiStreamRestoreRequest + bmj, err := os.ReadFile(filepath.Join(path, "backup.json")) + if err != nil { + return errors.Wrap(err, "restore failed : backup.json not found") + } + if err := json.Unmarshal(bmj, &bm); err != nil { + return errors.Wrap(err, "restore failed") + } + + known, err := s.mgr.IsKnownStream(bm.Config.Name) + if err != nil { + return errors.Wrap(err, "could not check if the stream already exists") + } + if known { + return errors.Wrap(err, fmt.Sprintf("Stream %q already exist", bm.Config.Name)) + } + + opts := []jsm.SnapshotOption{ + jsm.SnapshotDebug(), + jsm.RestoreConfiguration(bm.Config), + } + + fp, _, err := s.mgr.RestoreSnapshotFromDirectory(ctx, bm.Config.Name, path, opts...) + if err != nil { + return errors.Wrap(err, "restore failed") + } + s.lg.Info(fmt.Sprintf( + "Restored stream %q in %v\n", + bm.Config.Name, + fp.EndTime().Sub(fp.StartTime()).Round(time.Second), + )) + + if _, err := s.mgr.LoadStream(bm.Config.Name); err != nil { + return err + } + return nil +} diff --git a/pkg/test/conformance/storage/snapshot.go b/pkg/test/conformance/storage/snapshot.go new file mode 100644 index 000000000..8b32d98a4 --- /dev/null +++ b/pkg/test/conformance/storage/snapshot.go @@ -0,0 +1,158 @@ +package conformance_storage + +import ( + "context" + "fmt" + "os" + "strings" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/rancher/opni/pkg/logger" + "github.com/rancher/opni/pkg/snapshot" + "github.com/rancher/opni/pkg/util/future" +) + +func SnapshotSuiteTest[T snapshot.BackupRestore]( + implF future.Future[T], +) func() { + return func() { + var ctx context.Context + var snapshotDir string + var impl snapshot.BackupRestore + BeforeAll(func() { + dir, err := os.MkdirTemp("/tmp", "snapshots-*") + Expect(err).To(Succeed()) + snapshotDir = dir + ctx = context.TODO() + impl = implF.Get() + DeferCleanup(func() { + os.RemoveAll(snapshotDir) + }) + }) + + AfterEach(func() { + Expect(removeAllContents(snapshotDir)) + }) + + Context("filesystem snapshots", func() { + It("should snapshot/restore", func() { + cfg := snapshot.SnapshotConfig{ + SnapshotDir: snapshotDir, + DataDir: snapshotDir, + SnapshotName: "my-snapshot", + } + Expect(cfg.Validate()).To(Succeed()) + + m := snapshot.NewSnapshotManager( + impl, + cfg, + logger.New(), + ) + + By("verifying the snapshots are successful") + Expect(m.Save(ctx)).To(Succeed()) + + items, err := m.List(ctx) + Expect(err).To(Succeed()) + Expect(items).To(HaveLen(1)) + + By("verifying the identifying snapshot metadata is correct") + snapMd := items[0] + Expect(snapMd.Name).To(HavePrefix("my-snapshot")) + Expect(snapMd.Compressed).To(BeFalse()) + Expect(snapMd.Status).To(Equal(snapshot.SnapshotStatusSuccessful)) + finfo, err := os.Stat(strings.TrimPrefix(snapMd.Location, "file://")) + Expect(err).To(Succeed()) + Expect(finfo.Size()).NotTo(Equal(0)) + Expect(finfo.Size()).To(Equal(snapMd.Size)) + + By("verifying we can restore from the snapshot metadata") + Expect(m.Restore(ctx, snapMd)).To(Succeed()) + }) + It("should snapshot/restore with compression", func() { + cfg := snapshot.SnapshotConfig{ + SnapshotDir: snapshotDir, + DataDir: snapshotDir, + SnapshotName: "compr-snapshot", + Compression: &snapshot.CompressionConfig{ + Type: snapshot.CompressionZip, + }, + } + Expect(cfg.Validate()).To(Succeed()) + + m := snapshot.NewSnapshotManager( + impl, + cfg, + logger.New(), + ) + + By("verifying the snapshots are successful") + Expect(m.Save(ctx)).To(Succeed()) + + items, err := m.List(ctx) + Expect(err).To(Succeed()) + Expect(items).To(HaveLen(1)) + + By("verifying the identifying snapshot metadata is correct") + snapMd := items[0] + Expect(snapMd.Name).To(HavePrefix("compr-snapshot")) + Expect(snapMd.Compressed).To(BeTrue()) + Expect(snapMd.Status).To(Equal(snapshot.SnapshotStatusSuccessful)) + finfo, err := os.Stat(strings.TrimPrefix(snapMd.Location, "file://")) + Expect(err).To(Succeed()) + Expect(finfo.Size()).NotTo(Equal(0)) + Expect(finfo.Size()).To(Equal(snapMd.Size)) + + By("verifying we can restore from the snapshot metadata") + Expect(m.Restore(ctx, snapMd)).To(Succeed()) + }) + + It("should enforce the configured retention limit", func() { + + }) + }) + + Context("s3 snapshots", func() { + It("should backup/restore to/from s3", func() { + + }) + It("should backup/restore to/from s3 w/ compression", func() { + + }) + It("should enforce the configured retention policy", func() { + + }) + }) + + } +} + +func removeAllContents(dirPath string) error { + // Read the contents of the directory + entries, err := os.ReadDir(dirPath) + if err != nil { + return err + } + + // Iterate through each entry and remove it + for _, entry := range entries { + entryPath := fmt.Sprintf("%s/%s", dirPath, entry.Name()) + + // Remove files + if entry.IsDir() { + // Remove directories recursively + err = os.RemoveAll(entryPath) + if err != nil { + return err + } + } else { + err = os.Remove(entryPath) + if err != nil { + return err + } + } + } + + return nil +} From 653dcb435cb158ecf25b08739607bd86bf8bfe8d Mon Sep 17 00:00:00 2001 From: Alexandre Lamarre Date: Tue, 28 Nov 2023 16:57:59 -0500 Subject: [PATCH 4/4] [WIP] snapping and restoring tests --- go.mod | 2 +- internal/lock/lock.go | 2 +- pkg/storage/etcd/etcd_suite_test.go | 1 + pkg/storage/jetstream/jetstream_suite_test.go | 17 ++++++++++++----- pkg/storage/jetstream/util.go | 12 ++++++------ .../benchmark/storage/storage_suite_test.go | 2 +- 6 files changed, 22 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index c97e8f9e3..eff3c005c 100644 --- a/go.mod +++ b/go.mod @@ -270,7 +270,7 @@ require ( github.com/docker/go-metrics v0.0.1 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect - github.com/dustin/go-humanize v1.0.1 // indirect + github.com/dustin/go-humanize v1.0.1 github.com/dvsekhvalnov/jose2go v1.5.0 // indirect github.com/ecordell/optgen v0.0.9 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect diff --git a/internal/lock/lock.go b/internal/lock/lock.go index 2c85c6f85..fc3abfed2 100644 --- a/internal/lock/lock.go +++ b/internal/lock/lock.go @@ -352,7 +352,7 @@ func getLockManager(ctx context.Context, config *LockBackendConfig) (storage.Loc return lm, nil } if config.Jetstream != nil { - js, err := jetstream.AcquireJetstreamConn(context.Background(), config.Jetstream, logger.New().WithGroup("js")) + _, js, err := jetstream.AcquireJetstreamConn(context.Background(), config.Jetstream, logger.New().WithGroup("js")) if err != nil { return nil, err } diff --git a/pkg/storage/etcd/etcd_suite_test.go b/pkg/storage/etcd/etcd_suite_test.go index 33c065fb1..6bbf34ea3 100644 --- a/pkg/storage/etcd/etcd_suite_test.go +++ b/pkg/storage/etcd/etcd_suite_test.go @@ -118,3 +118,4 @@ var _ = Describe("Etcd RBAC Store", Ordered, Label("integration", "slow"), RBACS var _ = Describe("Etcd Keyring Store", Ordered, Label("integration", "slow"), KeyringStoreTestSuite(store)) var _ = Describe("Etcd KV Store", Ordered, Label("integration", "slow"), KeyValueStoreTestSuite(store, NewBytes, Equal)) var _ = Describe("Etcd Lock Manager", Ordered, Label("integration", "slow"), LockManagerTestSuite(lmF, lmSet)) +var _ = Describe("Etcd Backup Restore", Ordered, Label("integration", "slow"), SnapshotSuiteTest(snapshotter)) diff --git a/pkg/storage/jetstream/jetstream_suite_test.go b/pkg/storage/jetstream/jetstream_suite_test.go index 55bcd81ec..176302460 100644 --- a/pkg/storage/jetstream/jetstream_suite_test.go +++ b/pkg/storage/jetstream/jetstream_suite_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/nats-io/jsm.go" "github.com/nats-io/nats.go" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -43,7 +44,7 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) store.Set(s) - js, err := jetstream.AcquireJetstreamConn( + nc, js, err := jetstream.AcquireJetstreamConn( context.Background(), env.JetStreamConfig(), logger.NewNop(), @@ -58,21 +59,21 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) lmF.Set(lm) - js1, err := jetstream.AcquireJetstreamConn( + _, js1, err := jetstream.AcquireJetstreamConn( context.Background(), env.JetStreamConfig(), logger.NewNop(), ) Expect(err).NotTo(HaveOccurred()) - js2, err := jetstream.AcquireJetstreamConn( + _, js2, err := jetstream.AcquireJetstreamConn( context.Background(), env.JetStreamConfig(), logger.NewNop(), ) Expect(err).NotTo(HaveOccurred()) - js3, err := jetstream.AcquireJetstreamConn( + _, js3, err := jetstream.AcquireJetstreamConn( context.Background(), env.JetStreamConfig(), logger.New(), @@ -86,6 +87,12 @@ var _ = BeforeSuite(func() { lmSetF.Set(lo.Tuple3[storage.LockManager, storage.LockManager, storage.LockManager]{ A: x, B: y, C: z, }) + mgr, err := jsm.New(nc) + Expect(err).NotTo(HaveOccurred()) + + sn := jetstream.NewSnapshotter(mgr, logger.New()) + snapshotter.Set(sn) + DeferCleanup(env.Stop, "Test Suite Finished") }) }) @@ -97,7 +104,7 @@ var _ = Describe("Jetstream Keyring Store", Ordered, Label("integration", "slow" var _ = Describe("Jetstream KV Store", Ordered, Label("integration", "slow"), KeyValueStoreTestSuite(store, NewBytes, Equal)) var _ = Describe("Jetstream Lock Manager", Ordered, Label("integration", "slow"), LockManagerTestSuite(lmF, lmSetF)) -var _ = XDescribe("Jetstream Backup Restore", Ordered, Label("integration", "slow"), SnapshotSuiteTest(snapshotter)) +var _ = Describe("Jetstream Backup Restore", Ordered, Label("integration", "slow"), SnapshotSuiteTest(snapshotter)) var _ = Context("Error Codes", func() { Specify("Nats KeyNotFound errors should be equal to ErrNotFound", func() { diff --git a/pkg/storage/jetstream/util.go b/pkg/storage/jetstream/util.go index ae696fb2c..6af37efa2 100644 --- a/pkg/storage/jetstream/util.go +++ b/pkg/storage/jetstream/util.go @@ -12,10 +12,10 @@ import ( "github.com/rancher/opni/pkg/logger" ) -func AcquireJetstreamConn(ctx context.Context, conf *v1beta1.JetStreamStorageSpec, lg *slog.Logger) (nats.JetStreamContext, error) { +func AcquireJetstreamConn(ctx context.Context, conf *v1beta1.JetStreamStorageSpec, lg *slog.Logger) (*nats.Conn, nats.JetStreamContext, error) { nkeyOpt, err := nats.NkeyOptionFromSeed(conf.NkeySeedPath) if err != nil { - return nil, err + return nil, nil, err } nc, err := nats.Connect(conf.Endpoint, nkeyOpt, @@ -40,7 +40,7 @@ func AcquireJetstreamConn(ctx context.Context, conf *v1beta1.JetStreamStorageSpe }), ) if err != nil { - return nil, err + return nil, nil, err } ctrl := backoff.Exponential( @@ -56,16 +56,16 @@ func AcquireJetstreamConn(ctx context.Context, conf *v1beta1.JetStreamStorageSpe } select { case <-ctrl.Done(): - return nil, ctx.Err() + return nil, nil, ctx.Err() case <-ctrl.Next(): } } js, err := nc.JetStream(nats.Context(ctx)) if err != nil { - return nil, err + return nil, nil, err } - return js, nil + return nc, js, nil } // Takes a prefix path and replaces invalid elements for jetstream with their valid identifiers diff --git a/pkg/test/benchmark/storage/storage_suite_test.go b/pkg/test/benchmark/storage/storage_suite_test.go index cc4a66a17..07096701a 100644 --- a/pkg/test/benchmark/storage/storage_suite_test.go +++ b/pkg/test/benchmark/storage/storage_suite_test.go @@ -43,7 +43,7 @@ var _ = BeforeSuite(func() { } lmsJ := make([]*jetstream.LockManager, 7) for i := 0; i < 7; i++ { - js, err := jetstream.AcquireJetstreamConn(context.Background(), env.JetStreamConfig(), logger.New().WithGroup("js")) + _, js, err := jetstream.AcquireJetstreamConn(context.Background(), env.JetStreamConfig(), logger.New().WithGroup("js")) Expect(err).To(Succeed()) j := jetstream.NewLockManager(context.Background(), js, "test", logger.NewNop())