Skip to content

Commit

Permalink
command: append object error to multierror to return non-zero exit co…
Browse files Browse the repository at this point in the history
…de error (#328)

Creating two error objects instead of one, is to avoid data race. Since there is a goroutine running which tries to write merror object, there might be a data race for merror object in cp, rm and select commands. We can create 1 error object for the goroutine and 1 for the main routine, and append them at the end of the execution.

Tested #304 's first case, it returns exit code 1.

Resolves #304
  • Loading branch information
ocakhasan committed Aug 5, 2021
1 parent d80fc02 commit 9defce6
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 15 deletions.
20 changes: 20 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,25 @@
# Changelog

## not released yet

#### Features

- Added new `--raw` flag to `cp` and `rm` commands. It disables the wildcard operations. It is useful when only an object contains glob characters wants to be downloaded. ([#235](https://github.com/peak/s5cmd/issues/235))
- Added new `--cache-control` and `--expires` flags to `cp` and `rm` commands. It adds support for setting cache control and expires header to S3 objects. ([#318](https://github.com/peak/s5cmd/pull/318)) [@tombokombo](https://github.com/tombokombo)
- Added new `select` command. It allows to select JSON records from objects using SQL expressions. ([#299](https://github.com/peak/s5cmd/issues/299)) [@skeggse](https://github.com/skeggse)
- Added new `--force-glacier-transfer` flag to `cp` command. This flag forces transfer of GLACIER objects whether they are restored or not. ([#206](https://github.com/peak/s5cmd/issues/206))
- Added new `--source-region` and `destination-region` flags to `cp` command. It allows overriding bucket region. ([#262](https://github.com/peak/s5cmd/issues/262)) [@kemege](https://github.com/kemege)
- Added new `rb` command which allows users to remove buckets from command line. ([#303](https://github.com/peak/s5cmd/issues/303)).

#### Improvements

- Added new installation option MacPorts. ([#311](https://github.com/peak/s5cmd/pull/311)) [@manojkarthick](https://github.com/manojkarthick)

#### Bugfixes

- Fixed a bug where errors did not result a non-zero exit code. ([#304](https://github.com/peak/s5cmd/issues/304))
- Change the order of precedence in URL expansion in file system. Glob (*) expansion have precedence over directory expansion. ([#322](https://github.com/peak/s5cmd/pull/322))

## v1.3.0 - 1 Jul 2021

#### Features
Expand Down
14 changes: 10 additions & 4 deletions command/cp.go
Expand Up @@ -290,9 +290,13 @@ func (c Copy) Run(ctx context.Context) error {

waiter := parallel.NewWaiter()

// create two different error objects instead of single object to avoid the
// data race for merror object, since there is a goroutine running,
// there might be a data race for a single error object.
var (
merror error
errDoneCh = make(chan bool)
merrorWaiter error // for the errors from waiter
merrorObjects error // for the errors from object channel
errDoneCh = make(chan bool)
)

go func() {
Expand All @@ -305,7 +309,7 @@ func (c Copy) Run(ctx context.Context) error {
os.Exit(1)
}
printError(c.fullCommand, c.op, err)
merror = multierror.Append(merror, err)
merrorWaiter = multierror.Append(merrorWaiter, err)
}
}()

Expand All @@ -321,12 +325,14 @@ func (c Copy) Run(ctx context.Context) error {
}

if err := object.Err; err != nil {
merrorObjects = multierror.Append(merrorObjects, err)
printError(c.fullCommand, c.op, err)
continue
}

if object.StorageClass.IsGlacier() && !c.forceGlacierTransfer {
err := fmt.Errorf("object '%v' is on Glacier storage", object)
merrorObjects = multierror.Append(merrorObjects, err)
printError(c.fullCommand, c.op, err)
continue
}
Expand All @@ -351,7 +357,7 @@ func (c Copy) Run(ctx context.Context) error {
waiter.Wait()
<-errDoneCh

return merror
return multierror.Append(merrorWaiter, merrorObjects).ErrorOrNil()
}

func (c Copy) prepareCopyTask(
Expand Down
14 changes: 11 additions & 3 deletions command/rm.go
Expand Up @@ -97,6 +97,14 @@ func (d Delete) Run(ctx context.Context) error {

objch := expandSources(ctx, client, false, srcurls...)

// create two different error objects instead of single object to avoid the
// data race for merror object, since there is a goroutine running,
// there might be a data race for a single error object.
var (
merrorObjects error
merrorResult error
)

// do object->url transformation
urlch := make(chan *url.URL)
go func() {
Expand All @@ -108,6 +116,7 @@ func (d Delete) Run(ctx context.Context) error {
}

if err := object.Err; err != nil {
merrorObjects = multierror.Append(merrorObjects, err)
printError(d.fullCommand, d.op, err)
continue
}
Expand All @@ -117,14 +126,13 @@ func (d Delete) Run(ctx context.Context) error {

resultch := client.MultiDelete(ctx, urlch)

var merror error
for obj := range resultch {
if err := obj.Err; err != nil {
if errorpkg.IsCancelation(obj.Err) {
continue
}

merror = multierror.Append(merror, obj.Err)
merrorResult = multierror.Append(merrorResult, obj.Err)
printError(d.fullCommand, d.op, obj.Err)
continue
}
Expand All @@ -136,7 +144,7 @@ func (d Delete) Run(ctx context.Context) error {
log.Info(msg)
}

return merror
return multierror.Append(merrorResult, merrorObjects).ErrorOrNil()
}

// newSources creates object URL list from given sources.
Expand Down
14 changes: 11 additions & 3 deletions command/select.go
Expand Up @@ -114,7 +114,13 @@ func (s Select) Run(ctx context.Context) error {
return err
}

var merror error
// create two different error objects instead of single object to avoid the
// data race for merror object, since there is a goroutine running,
// there might be a data race for a single error object.
var (
merrorWaiter error
merrorObjects error
)

waiter := parallel.NewWaiter()
errDoneCh := make(chan bool)
Expand All @@ -125,7 +131,7 @@ func (s Select) Run(ctx context.Context) error {
defer close(errDoneCh)
for err := range waiter.Err() {
printError(s.fullCommand, s.op, err)
merror = multierror.Append(merror, err)
merrorWaiter = multierror.Append(merrorWaiter, err)
}
}()

Expand Down Expand Up @@ -156,12 +162,14 @@ func (s Select) Run(ctx context.Context) error {
}

if err := object.Err; err != nil {
merrorObjects = multierror.Append(merrorObjects, err)
printError(s.fullCommand, s.op, err)
continue
}

if object.StorageClass.IsGlacier() {
err := fmt.Errorf("object '%v' is on Glacier storage", object)
merrorObjects = multierror.Append(merrorObjects, err)
printError(s.fullCommand, s.op, err)
continue
}
Expand All @@ -174,7 +182,7 @@ func (s Select) Run(ctx context.Context) error {
<-errDoneCh
<-writeDoneCh

return merror
return multierror.Append(merrorWaiter, merrorObjects).ErrorOrNil()
}

func (s Select) prepareTask(ctx context.Context, client *storage.S3, url *url.URL, resultCh chan<- json.RawMessage) func() error {
Expand Down
26 changes: 26 additions & 0 deletions e2e/cp_test.go
Expand Up @@ -3249,6 +3249,7 @@ func TestCopyMultipleS3ObjectsToS3WithRawMode(t *testing.T) {
"file*.txt": "this is a test file 1",
}

// assert s3 objects in destination.
for filename, content := range expectedFiles {
assert.Assert(t, ensureS3Object(s3client, destBucket, filename, content))
}
Expand Down Expand Up @@ -3346,3 +3347,28 @@ func TestCopyRawModeAllowDestinationWithoutPrefix(t *testing.T) {
t.Errorf("testfile*.txt not exist in S3 bucket %v\n", dst)
}
}

func TestCopyExpectExitCode1OnUnreachableHost(t *testing.T) {
t.Parallel()

const bucket = "bucket"

_, s5cmd, cleanup := setup(t, withEndpointURL("nonExistingEndpointURL"))
defer cleanup()

folderLayout := []fs.PathOp{
fs.WithFile("testfile.txt", "this is a test file 1"),
}

workdir := fs.NewDir(t, "somedir", folderLayout...)
defer workdir.Remove()

src := fmt.Sprintf("s3://%s/*", bucket)
src = filepath.ToSlash(src)
dst := fmt.Sprintf("%v/", workdir.Path())

cmd := s5cmd("-r", "0", "cp", src, dst)
result := icmd.RunCmd(cmd)

result.Assert(t, icmd.Expected{ExitCode: 1})
}
20 changes: 15 additions & 5 deletions e2e/util_test.go
Expand Up @@ -55,7 +55,8 @@ func init() {
}

type setupOpts struct {
s3backend string
s3backend string
endpointURL string
}

type option func(*setupOpts)
Expand All @@ -66,6 +67,12 @@ func withS3Backend(backend string) option {
}
}

func withEndpointURL(url string) option {
return func(opts *setupOpts) {
opts.endpointURL = url
}
}

func setup(t *testing.T, options ...option) (*s3.S3, func(...string) icmd.Cmd, func()) {
t.Helper()

Expand All @@ -77,7 +84,7 @@ func setup(t *testing.T, options ...option) (*s3.S3, func(...string) icmd.Cmd, f
option(opts)
}

endpoint, workdir, cleanup := server(t, opts.s3backend)
endpoint, workdir, cleanup := server(t, opts)

client := s3client(t, storage.Options{
Endpoint: endpoint,
Expand All @@ -87,7 +94,7 @@ func setup(t *testing.T, options ...option) (*s3.S3, func(...string) icmd.Cmd, f
return client, s5cmd(workdir, endpoint), cleanup
}

func server(t *testing.T, s3backend string) (string, string, func()) {
func server(t *testing.T, setupOptions *setupOpts) (string, string, func()) {
t.Helper()

// testdir := fs.NewDir() tries to create a new directory which
Expand All @@ -109,13 +116,15 @@ func server(t *testing.T, s3backend string) (string, string, func()) {
s3LogLevel = "info" // aws has no level other than 'debug'
}

endpoint, dbcleanup := s3ServerEndpoint(t, testdir, s3LogLevel, s3backend)
endpoint, dbcleanup := s3ServerEndpoint(t, testdir, s3LogLevel, setupOptions.s3backend)

cleanup := func() {
testdir.Remove()
dbcleanup()
}

if setupOptions.endpointURL != "" {
endpoint = setupOptions.endpointURL
}
return endpoint, workdir, cleanup
}

Expand Down Expand Up @@ -158,6 +167,7 @@ func s5cmd(workdir, endpoint string) func(args ...string) icmd.Cmd {
)
cmd.Env = env
cmd.Dir = workdir

return cmd
}
}
Expand Down

0 comments on commit 9defce6

Please sign in to comment.