Skip to content

Commit

Permalink
Add support for replication of object tags, retention
Browse files Browse the repository at this point in the history
and legalhold metadata
  • Loading branch information
Poorna Krishnamoorthy committed Nov 19, 2020
1 parent 240622a commit d9547b3
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 38 deletions.
27 changes: 15 additions & 12 deletions cmd/bucket-object-lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"math"
"net/http"

xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/bucket/replication"
)

// BucketObjectLockSys - map of bucket and retention configuration.
Expand Down Expand Up @@ -245,13 +247,13 @@ func enforceRetentionBypassForPut(ctx context.Context, r *http.Request, bucket,
// For objects in "Compliance" mode, retention date cannot be shortened, and mode cannot be altered.
// For objects with legal hold header set, the s3:PutObjectLegalHold permission is expected to be set
// Both legal hold and retention can be applied independently on an object
func checkPutObjectLockAllowed(ctx context.Context, r *http.Request, bucket, object string, getObjectInfoFn GetObjectInfoFn, retentionPermErr, legalHoldPermErr APIErrorCode) (objectlock.RetMode, objectlock.RetentionDate, objectlock.ObjectLegalHold, APIErrorCode) {
func checkPutObjectLockAllowed(ctx context.Context, rq *http.Request, bucket, object string, getObjectInfoFn GetObjectInfoFn, retentionPermErr, legalHoldPermErr APIErrorCode) (objectlock.RetMode, objectlock.RetentionDate, objectlock.ObjectLegalHold, APIErrorCode) {
var mode objectlock.RetMode
var retainDate objectlock.RetentionDate
var legalHold objectlock.ObjectLegalHold

retentionRequested := objectlock.IsObjectLockRetentionRequested(r.Header)
legalHoldRequested := objectlock.IsObjectLockLegalHoldRequested(r.Header)
retentionRequested := objectlock.IsObjectLockRetentionRequested(rq.Header)
legalHoldRequested := objectlock.IsObjectLockLegalHoldRequested(rq.Header)

retentionCfg, err := globalBucketObjectLockSys.Get(bucket)
if err != nil {
Expand All @@ -267,25 +269,24 @@ func checkPutObjectLockAllowed(ctx context.Context, r *http.Request, bucket, obj
return mode, retainDate, legalHold, ErrNone
}

opts, err := getOpts(ctx, r, bucket, object)
opts, err := getOpts(ctx, rq, bucket, object)
if err != nil {
return mode, retainDate, legalHold, toAPIErrorCode(ctx, err)
}

if opts.VersionID != "" {
replica := rq.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String()

if opts.VersionID != "" && !replica {
if objInfo, err := getObjectInfoFn(ctx, bucket, object, opts); err == nil {
r := objectlock.GetObjectRetentionMeta(objInfo.UserDefined)

t, err := objectlock.UTCNowNTP()
if err != nil {
logger.LogIf(ctx, err)
return mode, retainDate, legalHold, ErrObjectLocked
}

if r.Mode == objectlock.RetCompliance && r.RetainUntilDate.After(t) {
return mode, retainDate, legalHold, ErrObjectLocked
}

mode = r.Mode
retainDate = r.RetainUntilDate
legalHold = objectlock.GetObjectLegalHoldMeta(objInfo.UserDefined)
Expand All @@ -298,17 +299,17 @@ func checkPutObjectLockAllowed(ctx context.Context, r *http.Request, bucket, obj

if legalHoldRequested {
var lerr error
if legalHold, lerr = objectlock.ParseObjectLockLegalHoldHeaders(r.Header); lerr != nil {
if legalHold, lerr = objectlock.ParseObjectLockLegalHoldHeaders(rq.Header); lerr != nil {
return mode, retainDate, legalHold, toAPIErrorCode(ctx, err)
}
}

if retentionRequested {
legalHold, err := objectlock.ParseObjectLockLegalHoldHeaders(r.Header)
legalHold, err := objectlock.ParseObjectLockLegalHoldHeaders(rq.Header)
if err != nil {
return mode, retainDate, legalHold, toAPIErrorCode(ctx, err)
}
rMode, rDate, err := objectlock.ParseObjectLockRetentionHeaders(r.Header)
rMode, rDate, err := objectlock.ParseObjectLockRetentionHeaders(rq.Header)
if err != nil {
return mode, retainDate, legalHold, toAPIErrorCode(ctx, err)
}
Expand All @@ -317,7 +318,9 @@ func checkPutObjectLockAllowed(ctx context.Context, r *http.Request, bucket, obj
}
return rMode, rDate, legalHold, ErrNone
}

if replica { // replica inherits retention metadata only from source
return "", objectlock.RetentionDate{}, legalHold, ErrNone
}
if !retentionRequested && retentionCfg.Validity > 0 {
if retentionPermErr != ErrNone {
return mode, retainDate, legalHold, retentionPermErr
Expand Down
111 changes: 99 additions & 12 deletions cmd/bucket-replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"time"

minio "github.com/minio/minio-go/v7"
miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/tags"
Expand Down Expand Up @@ -241,6 +242,45 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA
}
}

func getCopyObjMetadata(oi ObjectInfo, dest replication.Destination) map[string]string {
meta := make(map[string]string, len(oi.UserDefined))
for k, v := range oi.UserDefined {
if k == xhttp.AmzBucketReplicationStatus {
continue
}
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
continue
}
meta[k] = v
}
if oi.ContentEncoding != "" {
meta[xhttp.ContentEncoding] = oi.ContentEncoding
}
if oi.ContentType != "" {
meta[xhttp.ContentType] = oi.ContentType
}
tag, err := tags.ParseObjectTags(oi.UserTags)
if err != nil {
return nil
}
if tag != nil {
meta[xhttp.AmzObjectTagging] = tag.String()
meta[xhttp.AmzTagDirective] = "REPLACE"
}
sc := dest.StorageClass
if sc == "" {
sc = oi.StorageClass
}
meta[xhttp.AmzStorageClass] = sc
if oi.UserTags != "" {
meta[xhttp.AmzObjectTagging] = oi.UserTags
}
meta[xhttp.MinIOSourceMTime] = oi.ModTime.Format(time.RFC3339)
meta[xhttp.MinIOSourceETag] = oi.ETag
meta[xhttp.AmzBucketReplicationStatus] = replication.Replica.String()
return meta
}

func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) {
meta := make(map[string]string)
for k, v := range objInfo.UserDefined {
Expand Down Expand Up @@ -294,6 +334,53 @@ func putReplicationOpts(ctx context.Context, dest replication.Destination, objIn
return
}

type replicationAction string

const (
replicateMetadata replicationAction = "metadata"
replicateNone replicationAction = "none"
replicateAll replicationAction = "all"
)

// returns replicationAction by comparing metadata between source and target
func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationAction {
// needs full replication
if oi1.ETag != oi2.ETag ||
oi1.VersionID != oi2.VersionID ||
oi1.Size != oi2.Size ||
oi1.DeleteMarker != oi2.IsDeleteMarker {
return replicateAll
}

if !oi1.ModTime.Equal(oi2.LastModified) ||
oi1.ContentType != oi2.ContentType ||
oi1.StorageClass != oi2.StorageClass {
return replicateMetadata
}
if oi1.ContentEncoding != "" {
enc, ok := oi2.UserMetadata[xhttp.ContentEncoding]
if !ok || enc != oi1.ContentEncoding {
return replicateMetadata
}
}
for k, v := range oi2.UserMetadata {
oi2.Metadata[k] = []string{v}
}
if len(oi2.Metadata) != len(oi1.UserDefined) {
return replicateMetadata
}
for k1, v1 := range oi1.UserDefined {
if v2, ok := oi2.Metadata[k1]; !ok || v1 != strings.Join(v2, "") {
return replicateMetadata
}
}
t, _ := tags.MapToObjectTags(oi2.UserTags)
if t.String() != oi1.UserTags {
return replicateMetadata
}
return replicateNone
}

// replicateObject replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLayer) {
Expand Down Expand Up @@ -330,16 +417,11 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
return
}

// if heal encounters a pending replication status, either replication
// has failed due to server shutdown or crawler and PutObject replication are in contention.
healPending := objInfo.ReplicationStatus == replication.Pending

// In the rare event that replication is in pending state either due to
// server shut down/crash before replication completed or healing and PutObject
// race - do an additional stat to see if the version ID exists
if healPending {
_, err := tgt.StatObject(ctx, dest.Bucket, object, miniogo.StatObjectOptions{VersionID: objInfo.VersionID})
if err == nil {
rtype := replicateAll
oi, err := tgt.StatObject(ctx, dest.Bucket, object, miniogo.StatObjectOptions{VersionID: objInfo.VersionID})
if err == nil {
rtype = getReplicationAction(objInfo, oi)
if rtype == replicateNone {
gr.Close()
// object with same VersionID already exists, replication kicked off by
// PutObject might have completed.
Expand Down Expand Up @@ -367,8 +449,13 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
headerSize += len(k) + len(v)
}
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit)

_, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts)
if rtype == replicateAll {
_, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts)
} else {
// replicate metadata for object tagging/copy with metadata replacement
dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}}
_, err = tgt.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts)
}
r.Close()
if err != nil {
replicationStatus = replication.Failed
Expand Down
8 changes: 7 additions & 1 deletion cmd/erasure-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,12 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
}
modTime = UTCNow()
}

fi.VersionID = versionID // set any new versionID we might have created
fi.ModTime = modTime // set modTime for the new versionID
if !dstOpts.MTime.IsZero() {
modTime = dstOpts.MTime
fi.ModTime = dstOpts.MTime
}

srcInfo.UserDefined["etag"] = srcInfo.ETag

Expand Down Expand Up @@ -1096,6 +1099,9 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
if tags != "" {
fi.Metadata[xhttp.AmzObjectTagging] = tags
}
for k, v := range opts.UserDefined {
fi.Metadata[k] = v
}
metaArr[i].Metadata = fi.Metadata
}

Expand Down
1 change: 1 addition & 0 deletions cmd/erasure-server-sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ func (z *erasureServerSets) CopyObject(ctx context.Context, srcBucket, srcObject
UserDefined: srcInfo.UserDefined,
Versioned: dstOpts.Versioned,
VersionID: dstOpts.VersionID,
MTime: dstOpts.MTime,
}

return z.serverSets[zoneIdx].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
Expand Down
3 changes: 1 addition & 2 deletions cmd/erasure-sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,10 +774,8 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB
dstSet := s.getHashedSet(dstObject)

cpSrcDstSame := srcSet == dstSet

// Check if this request is only metadata update.
if cpSrcDstSame && srcInfo.metadataOnly {

// Version ID is set for the destination and source == destination version ID.
// perform an in-place update.
if dstOpts.VersionID != "" && srcOpts.VersionID == dstOpts.VersionID {
Expand All @@ -803,6 +801,7 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB
UserDefined: srcInfo.UserDefined,
Versioned: dstOpts.Versioned,
VersionID: dstOpts.VersionID,
MTime: dstOpts.MTime,
}

return dstSet.putObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
Expand Down
2 changes: 1 addition & 1 deletion cmd/gateway/s3/gateway-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (l *s3Objects) CopyObject(ctx context.Context, srcBucket string, srcObject
srcInfo.UserDefined[k] = v[0]
}

if _, err = l.Client.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo.UserDefined); err != nil {
if _, err = l.Client.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo.UserDefined, miniogo.PutObjectOptions{}); err != nil {
return objInfo, minio.ErrorRespToObjectError(err, srcBucket, srcObject)
}
return l.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts)
Expand Down

0 comments on commit d9547b3

Please sign in to comment.