Skip to content

Commit

Permalink
Add dir support to kando with kopia location (#1028)
Browse files Browse the repository at this point in the history
* Add dir support to kando

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Fix errors formatting

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Function comment should start with func name

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
PrasadG193 and mergify[bot] committed Jul 27, 2021
1 parent 3f350bc commit 3632575
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 15 deletions.
19 changes: 12 additions & 7 deletions pkg/kando/location_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ func kopiaSnapshotFlag(cmd *cobra.Command) string {
}

func runLocationPull(cmd *cobra.Command, args []string) error {
target, err := targetWriter(args[0])
if err != nil {
return err
}
p, err := unmarshalProfileFlag(cmd)
if err != nil {
return err
Expand All @@ -73,7 +69,11 @@ func runLocationPull(cmd *cobra.Command, args []string) error {
if err = connectToKopiaServer(ctx, p); err != nil {
return err
}
return kopiaLocationPull(ctx, kopiaSnap.ID, s, target)
return kopiaLocationPull(ctx, kopiaSnap.ID, s, args[0])
}
target, err := targetWriter(args[0])
if err != nil {
return err
}
return locationPull(ctx, p, s, target)
}
Expand All @@ -90,8 +90,13 @@ func locationPull(ctx context.Context, p *param.Profile, path string, target io.
}

// kopiaLocationPull pulls the data from a kopia snapshot into the given target
func kopiaLocationPull(ctx context.Context, backupID, path string, target io.Writer) error {
return kopia.Read(ctx, backupID, path, target)
func kopiaLocationPull(ctx context.Context, backupID, path string, targetPath string) error {
switch targetPath {
case usePipeParam:
return kopia.Read(ctx, backupID, path, os.Stdout)
default:
return kopia.ReadFile(ctx, backupID, targetPath)
}
}

// connectToKopiaServer connects to the kopia server with given creds
Expand Down
21 changes: 14 additions & 7 deletions pkg/kando/location_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ func outputNameFlag(cmd *cobra.Command) string {
}

func runLocationPush(cmd *cobra.Command, args []string) error {
source, err := sourceReader(args[0])
if err != nil {
return err
}
p, err := unmarshalProfileFlag(cmd)
if err != nil {
return err
Expand All @@ -68,7 +64,11 @@ func runLocationPush(cmd *cobra.Command, args []string) error {
if err = connectToKopiaServer(ctx, p); err != nil {
return err
}
return kopiaLocationPush(ctx, s, outputName, source)
return kopiaLocationPush(ctx, s, outputName, args[0])
}
source, err := sourceReader(args[0])
if err != nil {
return err
}
return locationPush(ctx, p, s, source)
}
Expand All @@ -94,8 +94,15 @@ func locationPush(ctx context.Context, p *param.Profile, path string, source io.
}

// kopiaLocationPush pushes the data from the source using a kopia snapshot
func kopiaLocationPush(ctx context.Context, path, outputName string, source io.Reader) error {
snapInfo, err := kopia.Write(ctx, path, source)
func kopiaLocationPush(ctx context.Context, path, outputName string, sourcePath string) error {
var snapInfo *kopia.SnapshotInfo
var err error
switch sourcePath {
case usePipeParam:
snapInfo, err = kopia.Write(ctx, path, os.Stdin)
default:
snapInfo, err = kopia.WriteFile(ctx, path, sourcePath)
}
if err != nil {
return errors.Wrap(err, "Failed to push data using kopia")
}
Expand Down
113 changes: 112 additions & 1 deletion pkg/kopia/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ package kopia
import (
"context"
"io"
"os"
"path/filepath"
"sync"

"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/localfs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/restore"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -114,8 +117,80 @@ func Write(ctx context.Context, path string, source io.Reader) (*SnapshotInfo, e
return snapshotInfo, nil
}

// WriteFile creates a kopia snapshot from the given source file
func WriteFile(ctx context.Context, path string, sourcePath string) (*SnapshotInfo, error) {
password, ok := repo.GetPersistedPassword(ctx, defaultConfigFilePath)
if !ok || password == "" {
return nil, errors.New("Failed to retrieve kopia client passphrase")
}

rep, err := OpenRepository(ctx, defaultConfigFilePath, password, pushRepoPurpose)
if err != nil {
return nil, errors.Wrap(err, "Failed to open kopia repository")
}

dir, err := filepath.Abs(sourcePath)
if err != nil {
return nil, errors.Wrapf(err, "Invalid source path '%s'", sourcePath)
}

// Populate the source info with parent path as the source
sourceInfo := snapshot.SourceInfo{
UserName: rep.ClientOptions().Username,
Host: rep.ClientOptions().Hostname,
Path: filepath.Clean(dir),
}
rootDir, err := getLocalFSEntry(ctx, sourceInfo.Path)
if err != nil {
return nil, errors.Wrap(err, "Unable to get local filesystem entry")
}

// Setup kopia uploader
u := snapshotfs.NewUploader(rep)

// Create a kopia snapshot
snapID, snapshotSize, err := SnapshotSource(ctx, rep, u, sourceInfo, rootDir, "Kanister Database Backup")
if err != nil {
return nil, err
}

snapshotInfo := &SnapshotInfo{
ID: snapID,
LogicalSize: snapshotSize,
PhysicalSize: int64(0),
}
return snapshotInfo, nil
}

func getLocalFSEntry(ctx context.Context, path0 string) (fs.Entry, error) {
path, err := resolveSymlink(path0)
if err != nil {
return nil, errors.Wrap(err, "resolveSymlink")
}

e, err := localfs.NewEntry(path)
if err != nil {
return nil, errors.Wrap(err, "can't get local fs entry")
}

return e, nil
}

func resolveSymlink(path string) (string, error) {
st, err := os.Lstat(path)
if err != nil {
return "", errors.Wrap(err, "stat")
}

if (st.Mode() & os.ModeSymlink) == 0 {
return path, nil
}

// nolint:wrapcheck
return filepath.EvalSymlinks(path)
}

// Read reads a kopia snapshot with the given ID and copies it to the given target
// TODO@pavan: Support files as target
func Read(ctx context.Context, backupID, path string, target io.Writer) error {
password, ok := repo.GetPersistedPassword(ctx, defaultConfigFilePath)
if !ok || password == "" {
Expand Down Expand Up @@ -146,6 +221,42 @@ func Read(ctx context.Context, backupID, path string, target io.Writer) error {
return errors.Wrap(err, "Failed to copy snapshot data to the target")
}

// ReadFile restores a kopia snapshot with the given ID to the given target
func ReadFile(ctx context.Context, backupID, target string) error {
password, ok := repo.GetPersistedPassword(ctx, defaultConfigFilePath)
if !ok || password == "" {
return errors.New("Failed to retrieve kopia client passphrase")
}

rep, err := OpenRepository(ctx, defaultConfigFilePath, password, pullRepoPurpose)
if err != nil {
return errors.Wrap(err, "Failed to open kopia repository")
}

rootEntry, err := snapshotfs.FilesystemEntryFromIDWithPath(ctx, rep, backupID, false)
if err != nil {
return errors.Wrap(err, "Unable to get filesystem entry")
}

p, err := filepath.Abs(target)
if err != nil {
return errors.Wrap(err, "Unable to resolve path")
}
// TODO: Do we want to keep this flags configurable?
output := &restore.FilesystemOutput{
TargetPath: p,
OverwriteDirectories: true,
OverwriteFiles: true,
OverwriteSymlinks: true,
IgnorePermissionErrors: true,
}

_, err = restore.Entry(ctx, rep, output, rootEntry, restore.Options{
Parallel: 8,
})
return errors.Wrap(err, "Failed to copy snapshot data to the target")
}

// bufferPool is a pool of shared buffers used during kopia read
var bufferPool = sync.Pool{
New: func() interface{} {
Expand Down

0 comments on commit 3632575

Please sign in to comment.