Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added flag to skip file errors during copy #23

Merged
merged 1 commit into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cmd/skbn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ func NewRootCmd(args []string) *cobra.Command {
}

type cpCmd struct {
src string
dst string
parallel int
bufferSize float64
src string
dst string
parallel int
bufferSize float64
skipErrorFiles bool

out io.Writer
}
Expand All @@ -52,7 +53,7 @@ func NewCpCmd(out io.Writer) *cobra.Command {
Short: "Copy files or directories Kubernetes and Cloud storage",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
if err := skbn.Copy(c.src, c.dst, c.parallel, c.bufferSize); err != nil {
if err := skbn.Copy(c.src, c.dst, c.parallel, c.bufferSize, c.skipErrorFiles); err != nil {
log.Fatal(err)
}
},
Expand Down
2 changes: 1 addition & 1 deletion examples/code/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func main() {
bufferSize := 1.0 // 1GB of in memory buffer size

start := time.Now()
if err := skbn.Copy(src, dst, parallel, bufferSize); err != nil {
if err := skbn.Copy(src, dst, parallel, bufferSize, false); err != nil {
log.Fatal(err)
}
elapsed := time.Since(start)
Expand Down
28 changes: 18 additions & 10 deletions pkg/skbn/skbn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/maorfr/skbn/pkg/utils"

"github.com/djherbis/buffer"
"gopkg.in/djherbis/nio.v2"
)

// FromToPair is a pair of FromPath and ToPath
Expand All @@ -21,7 +20,7 @@ type FromToPair struct {
}

// Copy copies files from src to dst
func Copy(src, dst string, parallel int, bufferSize float64) error {
func Copy(src, dst string, parallel int, bufferSize float64, skipErrorFiles bool) error {
srcPrefix, srcPath := utils.SplitInTwo(src, "://")
dstPrefix, dstPath := utils.SplitInTwo(dst, "://")

Expand All @@ -37,7 +36,7 @@ func Copy(src, dst string, parallel int, bufferSize float64) error {
if err != nil {
return err
}
err = PerformCopy(srcClient, dstClient, srcPrefix, dstPrefix, fromToPaths, parallel, bufferSize)
err = PerformCopy(srcClient, dstClient, srcPrefix, dstPrefix, fromToPaths, parallel, bufferSize, skipErrorFiles)
if err != nil {
return err
}
Expand Down Expand Up @@ -103,7 +102,7 @@ func GetFromToPaths(srcClient interface{}, srcPrefix, srcPath, dstPath string) (
}

// PerformCopy performs the actual copy action
func PerformCopy(srcClient, dstClient interface{}, srcPrefix, dstPrefix string, fromToPaths []FromToPair, parallel int, bufferSize float64) error {
func PerformCopy(srcClient, dstClient interface{}, srcPrefix, dstPrefix string, fromToPaths []FromToPair, parallel int, bufferSize float64, skipErrorFiles bool) error {
// Execute in parallel
totalFiles := len(fromToPaths)
if parallel == 0 {
Expand All @@ -115,7 +114,7 @@ func PerformCopy(srcClient, dstClient interface{}, srcPrefix, dstPrefix string,
currentLine := 0
for _, ftp := range fromToPaths {

if len(errc) != 0 {
if !skipErrorFiles && len(errc) != 0 {
break
}

Expand All @@ -127,39 +126,48 @@ func PerformCopy(srcClient, dstClient interface{}, srcPrefix, dstPrefix string,

go func(srcClient, dstClient interface{}, srcPrefix, fromPath, dstPrefix, toPath, currentLinePadded string, totalFiles int) {

if len(errc) != 0 {
if !skipErrorFiles && len(errc) != 0 {
return
}

newBufferSize := (int64)(bufferSize * 1024 * 1024) // may not be super accurate
buf := buffer.New(newBufferSize)
pr, pw := nio.Pipe(buf)
fileErrorChannel := make(chan error, 1)

log.Printf("[%s/%d] copy: %s://%s -> %s://%s", currentLinePadded, totalFiles, srcPrefix, fromPath, dstPrefix, toPath)

go func() {
defer pw.Close()
if len(errc) != 0 {
if !skipErrorFiles && len(errc) != 0 {
return
}
err := Download(srcClient, srcPrefix, fromPath, pw)
if err != nil {
log.Println(err, fmt.Sprintf(" src: file: %s", fromPath))
errc <- err
fileErrorChannel <- err
if !skipErrorFiles {
errc <- err
}
}
}()

go func() {
defer pr.Close()
defer bwg.Done()
if len(errc) != 0 {
if len(fileErrorChannel) != 0 {
return
}
if !skipErrorFiles && len(errc) != 0 {
return
}
defer log.Printf("[%s/%d] done: %s://%s -> %s://%s", currentLinePadded, totalFiles, srcPrefix, fromPath, dstPrefix, toPath)
err := Upload(dstClient, dstPrefix, toPath, fromPath, pr)
if err != nil {
log.Println(err, fmt.Sprintf(" dst: file: %s", toPath))
errc <- err
if !skipErrorFiles {
errc <- err
}
}
}()
}(srcClient, dstClient, srcPrefix, ftp.FromPath, dstPrefix, ftp.ToPath, currentLinePadded, totalFiles)
Expand Down