Skip to content

Commit

Permalink
re-implement metadata handling to preserve nanosecs as well (#3402)
Browse files Browse the repository at this point in the history
  • Loading branch information
harshavardhana committed Sep 12, 2020
1 parent 8f2ffbf commit 93209e8
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 137 deletions.
81 changes: 25 additions & 56 deletions cmd/client-fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cmd
import (
"context"
"errors"
"fmt"
"io"
"os"
"path"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/lifecycle"
"github.com/minio/minio-go/v7/pkg/replication"
"github.com/minio/minio/pkg/console"
)

// filesystem client
Expand All @@ -49,9 +51,10 @@ type fsClient struct {
}

const (
partSuffix = ".part.minio"
slashSeperator = "/"
metadataKey = "X-Amz-Meta-Mc-Attrs"
partSuffix = ".part.minio"
slashSeperator = "/"
metadataKey = "X-Amz-Meta-Mc-Attrs"
metadataKeyS3Cmd = "X-Amz-Meta-S3cmd-Attrs"
)

var ( // GOOS specific ignore list.
Expand Down Expand Up @@ -225,49 +228,33 @@ func (f *fsClient) Watch(ctx context.Context, options WatchOptions) (*WatchObjec
func preserveAttributes(fd *os.File, attr map[string]string) *probe.Error {
if val, ok := attr["mode"]; ok {
mode, e := strconv.ParseUint(val, 0, 32)
if e != nil {
return probe.NewError(e)
}

// Attempt to change the file mode.
if e := fd.Chmod(os.FileMode(mode)); e != nil {
return probe.NewError(e)
if e == nil {
// Attempt to change the file mode.
if e = fd.Chmod(os.FileMode(mode)); e != nil {
return probe.NewError(e)
}
}
}

var uid, gid int
var gidExists, uidExists bool
var e error
if val, ok := attr["uid"]; ok {
uid, e = strconv.Atoi(val)
if e != nil {
return probe.NewError(e)
uid = -1
}
uidExists = true
}

if val, ok := attr["gid"]; ok {
gid, e = strconv.Atoi(val)
if e != nil {
return probe.NewError(e)
gid = -1
}
gidExists = true
}

// Attempt to change the owner.
if gidExists && uidExists {
if e := fd.Chown(uid, gid); e != nil {
return probe.NewError(e)
}
} else if uidExists {
if e := fd.Chown(uid, -1); e != nil {
return probe.NewError(e)
}

} else {
if e := fd.Chown(-1, gid); e != nil {
return probe.NewError(e)
}
if e = fd.Chown(uid, gid); e != nil {
return probe.NewError(e)
}

return nil
Expand Down Expand Up @@ -312,14 +299,14 @@ func (f *fsClient) put(ctx context.Context, reader io.Reader, size int64, meta m

attr := make(map[string]string)
if _, ok := meta[metadataKey]; ok && preserve {
attr, e = parseAttribute(meta[metadataKey])
attr, e = parseAttribute(meta)
if e != nil {
tmpFile.Close()
return 0, probe.NewError(e)
}
if err := preserveAttributes(tmpFile, attr); err != nil {
tmpFile.Close()
return 0, err.Trace(objectPath)
err := preserveAttributes(tmpFile, attr)
if err != nil {
console.Println(console.Colorize("Error", fmt.Sprintf("unable to preserve attributes, continuing to copy the content %s\n", err.ToGoError())))
}
}

Expand Down Expand Up @@ -370,28 +357,12 @@ func (f *fsClient) put(ctx context.Context, reader io.Reader, size int64, meta m
}

if len(attr) != 0 && preserve {
var atime, mtime int64
var e error
var atimeChanged, mtimeChanged bool
if val, ok := attr["atime"]; ok {
atime, e = strconv.ParseInt(val, 10, 64)
if e != nil {
return totalWritten, probe.NewError(e)
}
atimeChanged = true
}

if val, ok := attr["mtime"]; ok {
mtime, e = strconv.ParseInt(val, 10, 64)
if e != nil {
return totalWritten, probe.NewError(e)
}
mtimeChanged = true
atime, mtime, err := parseAtimeMtime(attr)
if err != nil {
return totalWritten, err.Trace()
}

// Attempt to change the access and modify time
if atimeChanged && mtimeChanged {
if e := os.Chtimes(objectPath, time.Unix(atime, 0), time.Unix(mtime, 0)); e != nil {
if !atime.IsZero() && !mtime.IsZero() {
if e := os.Chtimes(objectPath, atime, mtime); e != nil {
return totalWritten, probe.NewError(e)
}
}
Expand Down Expand Up @@ -600,9 +571,7 @@ func readDir(dirname string) ([]os.FileInfo, error) {
if e != nil {
return nil, e
}
if e = f.Close(); e != nil {
return nil, e
}
defer f.Close()
sort.Sort(byDirName(list))
return list, nil
}
Expand Down
12 changes: 7 additions & 5 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ func (c *S3Client) Watch(ctx context.Context, options WatchOptions) (*WatchObjec
for notificationInfo := range eventsCh {
if notificationInfo.Err != nil {
var perr *probe.Error
if nErr, ok := notificationInfo.Err.(minio.ErrorResponse); ok && nErr.Code == "APINotSupported" {
if minio.ToErrorResponse(notificationInfo.Err).Code == "NotImplemented" {
perr = probe.NewError(APINotImplemented{
API: "Watch",
APIType: c.GetURL().String(),
Expand Down Expand Up @@ -1341,10 +1341,7 @@ func (c *S3Client) listObjectWrapper(ctx context.Context, bucket, object string,
// https://github.com/minio/mc/issues/3073
return c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, UseV1: true})
}
if metadata {
return c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive})
}
return c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive})
return c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, WithMetadata: metadata})
}

func (c *S3Client) statIncompleteUpload(ctx context.Context, bucket, object string) (*ClientContent, *probe.Error) {
Expand Down Expand Up @@ -2004,6 +2001,11 @@ func (c *S3Client) objectInfo2ClientContent(bucket string, entry minio.ObjectInf
content.ReplicationStatus = entry.ReplicationStatus
for k, v := range entry.UserMetadata {
content.UserMetadata[k] = v
attr, _ := parseAttribute(content.UserMetadata)
_, mtime, _ := parseAtimeMtime(attr)
if !mtime.IsZero() {
content.Time = mtime
}
}
for k := range entry.Metadata {
content.Metadata[k] = entry.Metadata.Get(k)
Expand Down
12 changes: 7 additions & 5 deletions cmd/difference.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,15 @@ func activeActiveModTimeUpdated(src, dst *ClientContent) bool {
return false
}

srcActualModTime := src.Time
dstActualModTime := dst.Time

srcModTime := getSourceModTimeKey(src.UserMetadata)
dstModTime := getSourceModTimeKey(dst.UserMetadata)
if srcModTime == "" && dstModTime == "" {
// No active-active mirror context found, consider src & dst as similar
return false
// No active-active mirror context found, fallback to modTimes presented
// by the client content
return srcActualModTime.After(dstActualModTime)
}

var srcOriginLastModified, dstOriginLastModified time.Time
Expand All @@ -118,12 +122,10 @@ func activeActiveModTimeUpdated(src, dst *ClientContent) bool {
}
}

srcActualModTime := src.Time
if !srcOriginLastModified.IsZero() && srcOriginLastModified.After(src.Time) {
srcActualModTime = srcOriginLastModified
}

dstActualModTime := dst.Time
if !dstOriginLastModified.IsZero() && dstOriginLastModified.After(dst.Time) {
dstActualModTime = dstOriginLastModified
}
Expand Down Expand Up @@ -265,7 +267,7 @@ func differenceInternal(ctx context.Context, sourceClnt, targetClnt Client, sour
}
continue
}
if (srcType.IsRegular() && tgtType.IsRegular()) && srcSize != tgtSize {
if srcSize != tgtSize {
// Regular files differing in size.
diffCh <- diffMessage{
FirstURL: srcCtnt.URL.String(),
Expand Down
38 changes: 23 additions & 15 deletions cmd/mirror-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ EXAMPLES:
{{.Prompt}} {{.HelpName}} -a backup/ s3/archive
15. Cross mirror between sites in a active-active deployment.
Site-A: {{.Prompt}} {{.HelpName}} --watch --active-active siteA siteB
Site-B: {{.Prompt}} {{.HelpName}} --watch --active-active siteB siteA
Site-A: {{.Prompt}} {{.HelpName}} --active-active siteA siteB
Site-B: {{.Prompt}} {{.HelpName}} --active-active siteB siteA
`,
}

Expand Down Expand Up @@ -719,13 +719,6 @@ func getEventPathURLWin(srcURL, eventPath string) string {

// runMirror - mirrors all buckets to another S3 server
func runMirror(ctx context.Context, cancelMirror context.CancelFunc, srcURL, dstURL string, cli *cli.Context, encKeyDB map[string][]prefixSSEPair) bool {
// This is kept for backward compatibility, `--force` means
// --overwrite.
isOverwrite := cli.Bool("force")
if !isOverwrite {
isOverwrite = cli.Bool("overwrite")
}

// Parse metadata.
userMetadata := make(map[string]string)
if cli.String("attr") != "" {
Expand All @@ -745,13 +738,25 @@ func runMirror(ctx context.Context, cancelMirror context.CancelFunc, srcURL, dst
(srcClt.GetURL().Type == objectStorage &&
srcClt.GetURL().Path == string(srcClt.GetURL().Separator))

// Create a new mirror job and execute it
mj := newMirrorJob(srcURL, dstURL, mirrorOptions{
// This is kept for backward compatibility, `--force` means
// --overwrite.
isOverwrite := cli.Bool("force")
if !isOverwrite {
isOverwrite = cli.Bool("overwrite")
}

isWatch := cli.Bool("watch") || cli.Bool("multi-master") || cli.Bool("active-active")

// preserve is also expected to be overwritten if necessary
isMetadata := cli.Bool("a") || isWatch || len(userMetadata) > 0
isOverwrite = isOverwrite || isMetadata

mopts := mirrorOptions{
isFake: cli.Bool("fake"),
isRemove: cli.Bool("remove"),
isOverwrite: isOverwrite,
isWatch: cli.Bool("watch") || cli.Bool("multi-master") || cli.Bool("active-active"),
isMetadata: cli.Bool("a") || cli.Bool("multi-master") || cli.Bool("active-active") || len(userMetadata) > 0,
isWatch: isWatch,
isMetadata: isMetadata,
md5: cli.Bool("md5"),
disableMultipart: cli.Bool("disable-multipart"),
excludeOptions: cli.StringSlice("exclude"),
Expand All @@ -760,8 +765,11 @@ func runMirror(ctx context.Context, cancelMirror context.CancelFunc, srcURL, dst
storageClass: cli.String("storage-class"),
userMetadata: userMetadata,
encKeyDB: encKeyDB,
activeActive: cli.Bool("multi-master") || cli.Bool("active-active"),
})
activeActive: isWatch,
}

// Create a new mirror job and execute it
mj := newMirrorJob(srcURL, dstURL, mopts)

if mirrorAllBuckets {
// Synchronize buckets using dirDifference function
Expand Down

0 comments on commit 93209e8

Please sign in to comment.