Skip to content

Commit

Permalink
fix: pick valid FileInfo additionally based on dataDir (#12116)
Browse files Browse the repository at this point in the history
* fix: pick valid FileInfo additionally based on dataDir

historically we have always relied on modTime
to be consistent and same, we can now add additional
reference to look for the same dataDir value.

A dataDir is the same for an object at a given point in
time for a given version, let's say a `null` version
is overwritten in quorum we do not by mistake pick
up the fileInfo's incorrectly.

* make sure to not preserve fi.Data

Signed-off-by: Harshavardhana <harsha@minio.io>
  • Loading branch information
harshavardhana committed Apr 22, 2021
1 parent cebada2 commit a7acfa6
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 120 deletions.
54 changes: 45 additions & 9 deletions cmd/erasure-healing-common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
)

// commonTime returns a maximally occurring time from a list of time.
func commonTime(modTimes []time.Time) (modTime time.Time, count int) {
func commonTime(modTimes []time.Time, dataDirs []string) (modTime time.Time, dataDir string) {
var maxima int // Counter for remembering max occurrence of elements.
timeOccurenceMap := make(map[int64]int)

timeOccurenceMap := make(map[int64]int, len(modTimes))
dataDirOccurenceMap := make(map[string]int, len(dataDirs))
// Ignore the uuid sentinel and count the rest.
for _, time := range modTimes {
if time.Equal(timeSentinel) {
Expand All @@ -36,6 +38,13 @@ func commonTime(modTimes []time.Time) (modTime time.Time, count int) {
timeOccurenceMap[time.UnixNano()]++
}

for _, dataDir := range dataDirs {
if dataDir == "" {
continue
}
dataDirOccurenceMap[dataDir]++
}

// Find the common cardinality from previously collected
// occurrences of elements.
for nano, count := range timeOccurenceMap {
Expand All @@ -46,8 +55,18 @@ func commonTime(modTimes []time.Time) (modTime time.Time, count int) {
}
}

// Find the common cardinality from the previously collected
// occurrences of elements.
var dmaxima int
for ddataDir, count := range dataDirOccurenceMap {
if count > dmaxima {
dmaxima = count
dataDir = ddataDir
}
}

// Return the collected common uuid.
return modTime, maxima
return modTime, dataDir
}

// Beginning of unix time is treated as sentinel value here.
Expand Down Expand Up @@ -101,24 +120,33 @@ func listObjectModtimes(partsMetadata []FileInfo, errs []error) (modTimes []time
// - a slice of disks where disk having 'older' xl.meta (or nothing)
// are set to nil.
// - latest (in time) of the maximally occurring modTime(s).
func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error) (onlineDisks []StorageAPI, modTime time.Time) {
func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error) (onlineDisks []StorageAPI, modTime time.Time, dataDir string) {
onlineDisks = make([]StorageAPI, len(disks))

// List all the file commit ids from parts metadata.
modTimes := listObjectModtimes(partsMetadata, errs)

dataDirs := make([]string, len(partsMetadata))
for idx, fi := range partsMetadata {
if errs[idx] != nil {
continue
}
dataDirs[idx] = fi.DataDir
}

// Reduce list of UUIDs to a single common value.
modTime, _ = commonTime(modTimes)
modTime, dataDir = commonTime(modTimes, dataDirs)

// Create a new online disks slice, which have common uuid.
for index, t := range modTimes {
if t.Equal(modTime) {
if partsMetadata[index].IsValid() && t.Equal(modTime) && partsMetadata[index].DataDir == dataDir {
onlineDisks[index] = disks[index]
} else {
onlineDisks[index] = nil
}
}
return onlineDisks, modTime

return onlineDisks, modTime, dataDir
}

// Returns the latest updated FileInfo files and error in case of failure.
Expand All @@ -131,16 +159,24 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err
// List all the file commit ids from parts metadata.
modTimes := listObjectModtimes(partsMetadata, errs)

dataDirs := make([]string, len(partsMetadata))
for idx, fi := range partsMetadata {
if errs[idx] != nil {
continue
}
dataDirs[idx] = fi.DataDir
}

// Count all latest updated FileInfo values
var count int
var latestFileInfo FileInfo

// Reduce list of UUIDs to a single common value - i.e. the last updated Time
modTime, _ := commonTime(modTimes)
modTime, dataDir := commonTime(modTimes, dataDirs)

// Interate through all the modTimes and count the FileInfo(s) with latest time.
for index, t := range modTimes {
if t.Equal(modTime) && partsMetadata[index].IsValid() {
if partsMetadata[index].IsValid() && t.Equal(modTime) && dataDir == partsMetadata[index].DataDir {
latestFileInfo = partsMetadata[index]
count++
}
Expand Down
48 changes: 27 additions & 21 deletions cmd/erasure-healing-common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestCommonTime(t *testing.T) {
// common modtime. Tests fail if modtime does not match.
for i, testCase := range testCases {
// Obtain a common mod time from modTimes slice.
ctime, _ := commonTime(testCase.times)
ctime, _ := commonTime(testCase.times, nil)
if !testCase.time.Equal(ctime) {
t.Fatalf("Test case %d, expect to pass but failed. Wanted modTime: %s, got modTime: %s\n", i+1, testCase.time, ctime)
}
Expand Down Expand Up @@ -181,8 +181,8 @@ func TestListOnlineDisks(t *testing.T) {
z := obj.(*erasureServerPools)
erasureDisks := z.serverPools[0].sets[0].getDisks()
for i, test := range testCases {
test := test
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {

_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{})
if err != nil {
t.Fatalf("Failed to putObject %v", err)
Expand All @@ -196,7 +196,7 @@ func TestListOnlineDisks(t *testing.T) {

for j := range partsMetadata {
if errs[j] != nil {
t.Fatalf("Test %d: expected error to be nil: %s", i+1, errs[j])
t.Fatalf("expected error to be nil: %s", errs[j])
}
partsMetadata[j].ModTime = test.modTimes[j]
}
Expand All @@ -215,8 +215,7 @@ func TestListOnlineDisks(t *testing.T) {
tamperedIndex = index
dErr := erasureDisks[index].Delete(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), false)
if dErr != nil {
t.Fatalf("Test %d: Failed to delete %s - %v", i+1,
filepath.Join(object, "part.1"), dErr)
t.Fatalf("Failed to delete %s - %v", filepath.Join(object, "part.1"), dErr)
}
break
}
Expand All @@ -242,19 +241,22 @@ func TestListOnlineDisks(t *testing.T) {

}

onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs)
onlineDisks, modTime, dataDir := listOnlineDisks(erasureDisks, partsMetadata, test.errs)
if !modTime.Equal(test.expectedTime) {
t.Fatalf("Test %d: Expected modTime to be equal to %v but was found to be %v",
i+1, test.expectedTime, modTime)
t.Fatalf("Expected modTime to be equal to %v but was found to be %v",
test.expectedTime, modTime)
}
if fi.DataDir != dataDir {
t.Fatalf("Expected dataDir to be equal to %v but was found to be %v",
fi.DataDir, dataDir)
}

availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan)
test.errs = newErrs

if test._tamperBackend != noTamper {
if tamperedIndex != -1 && availableDisks[tamperedIndex] != nil {
t.Fatalf("Test %d: disk (%v) with part.1 missing is not a disk with available data",
i+1, erasureDisks[tamperedIndex])
t.Fatalf("disk (%v) with part.1 missing is not a disk with available data",
erasureDisks[tamperedIndex])
}
}
})
Expand Down Expand Up @@ -354,22 +356,22 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
z := obj.(*erasureServerPools)
erasureDisks := z.serverPools[0].sets[0].getDisks()
for i, test := range testCases {
test := test
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {

_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{})
if err != nil {
t.Fatalf("Failed to putObject %v", err)
}

partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
_, err := getLatestFileInfo(ctx, partsMetadata, errs)
fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
if err != nil {
t.Fatalf("Failed to getLatestFileInfo %v", err)
}

for j := range partsMetadata {
if errs[j] != nil {
t.Fatalf("Test %d: expected error to be nil: %s", i+1, errs[j])
t.Fatalf("expected error to be nil: %s", errs[j])
}
partsMetadata[j].ModTime = test.modTimes[j]
}
Expand All @@ -392,8 +394,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
tamperedIndex = index
dErr := erasureDisks[index].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
if dErr != nil {
t.Fatalf("Test %d: Failed to delete %s - %v", i+1,
pathJoin(object, xlStorageFormatFile), dErr)
t.Fatalf("Failed to delete %s - %v", pathJoin(object, xlStorageFormatFile), dErr)
}
break
}
Expand Down Expand Up @@ -424,19 +425,24 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
t.Fatalf("Failed to getLatestFileInfo %v", err)
}

onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs)
onlineDisks, modTime, dataDir := listOnlineDisks(erasureDisks, partsMetadata, test.errs)
if !modTime.Equal(test.expectedTime) {
t.Fatalf("Test %d: Expected modTime to be equal to %v but was found to be %v",
i+1, test.expectedTime, modTime)
t.Fatalf("Expected modTime to be equal to %v but was found to be %v",
test.expectedTime, modTime)
}

if fi.DataDir != dataDir {
t.Fatalf("Expected dataDir to be equal to %v but was found to be %v",
fi.DataDir, dataDir)
}

availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan)
test.errs = newErrs

if test._tamperBackend != noTamper {
if tamperedIndex != -1 && availableDisks[tamperedIndex] != nil {
t.Fatalf("Test %d: disk (%v) with part.1 missing is not a disk with available data",
i+1, erasureDisks[tamperedIndex])
t.Fatalf("disk (%v) with part.1 missing is not a disk with available data",
erasureDisks[tamperedIndex])
}
}
})
Expand Down
6 changes: 3 additions & 3 deletions cmd/erasure-healing.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s

// List of disks having latest version of the object er.meta
// (by modtime).
latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
latestDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)

// List of disks having all parts as per latest er.meta.
availableDisks, dataErrs := disksWithAllParts(ctx, latestDisks, partsMetadata, errs, bucket, object, scanMode)
Expand Down Expand Up @@ -350,7 +350,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s

// Latest FileInfo for reference. If a valid metadata is not
// present, it is as good as object not found.
latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, result.DataBlocks)
latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, result.DataBlocks)
if err != nil {
return result, toObjectErr(err, bucket, object, versionID)
}
Expand Down Expand Up @@ -471,7 +471,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
Algorithm: checksumAlgo,
Hash: bitrotWriterSum(writers[i]),
})
if len(inlineBuffers) > 0 {
if len(inlineBuffers) > 0 && inlineBuffers[i] != nil {
partsMetadata[i].Data = inlineBuffers[i].Bytes()
} else {
partsMetadata[i].Data = nil
Expand Down
44 changes: 37 additions & 7 deletions cmd/erasure-metadata-utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,15 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve
return metadataArray, g.Wait()
}

// shuffleDisksAndPartsMetadataByIndex this function should be always used by GetObjectNInfo()
// and CompleteMultipartUpload code path, it is not meant to be used with PutObject,
// NewMultipartUpload metadata shuffling.
func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo, fi FileInfo) (shuffledDisks []StorageAPI, shuffledPartsMetadata []FileInfo) {
shuffledDisks = make([]StorageAPI, len(disks))
shuffledPartsMetadata = make([]FileInfo, len(disks))
var inconsistent int
distribution := fi.Erasure.Distribution

var inconsistent int
for i, meta := range metaArr {
if disks[i] == nil {
// Assuming offline drives as inconsistent,
Expand All @@ -161,6 +165,14 @@ func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo,
inconsistent++
continue
}
if !meta.IsValid() {
inconsistent++
continue
}
if len(fi.Data) != len(meta.Data) {
inconsistent++
continue
}
// check if erasure distribution order matches the index
// position if this is not correct we discard the disk
// and move to collect others
Expand All @@ -180,18 +192,36 @@ func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo,
}

// fall back to original distribution based order.
return shuffleDisksAndPartsMetadata(disks, metaArr, distribution)
return shuffleDisksAndPartsMetadata(disks, metaArr, fi)
}

// Return shuffled partsMetadata depending on distribution.
func shuffleDisksAndPartsMetadata(disks []StorageAPI, partsMetadata []FileInfo, distribution []int) (shuffledDisks []StorageAPI, shuffledPartsMetadata []FileInfo) {
if distribution == nil {
return disks, partsMetadata
}
// Return shuffled partsMetadata depending on fi.Distribution.
// additional validation is attempted and invalid metadata is
// automatically skipped only when fi.ModTime is non-zero
// indicating that this is called during read-phase
func shuffleDisksAndPartsMetadata(disks []StorageAPI, partsMetadata []FileInfo, fi FileInfo) (shuffledDisks []StorageAPI, shuffledPartsMetadata []FileInfo) {
shuffledDisks = make([]StorageAPI, len(disks))
shuffledPartsMetadata = make([]FileInfo, len(partsMetadata))
distribution := fi.Erasure.Distribution

init := fi.ModTime.IsZero()
// Shuffle slice xl metadata for expected distribution.
for index := range partsMetadata {
if disks[index] == nil {
continue
}
if !init && !partsMetadata[index].IsValid() {
// Check for parts metadata validity for only
// fi.ModTime is not empty - ModTime is always set,
// if object was ever written previously.
continue
}
if !init && len(fi.Data) != len(partsMetadata[index].Data) {
// Check for length of data parts only when
// fi.ModTime is not empty - ModTime is always set,
// if object was ever written previously.
continue
}
blockIndex := distribution[index]
shuffledPartsMetadata[blockIndex-1] = partsMetadata[index]
shuffledDisks[blockIndex-1] = disks[index]
Expand Down
10 changes: 6 additions & 4 deletions cmd/erasure-metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,17 @@ func (fi FileInfo) ObjectToPartOffset(ctx context.Context, offset int64) (partIn
return 0, 0, InvalidRange{}
}

func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (xmv FileInfo, e error) {
func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.Time, dataDir string, quorum int) (xmv FileInfo, e error) {
metaHashes := make([]string, len(metaArr))
h := sha256.New()
for i, meta := range metaArr {
if meta.IsValid() && meta.ModTime.Equal(modTime) {
if meta.IsValid() && meta.ModTime.Equal(modTime) && meta.DataDir == dataDir {
for _, part := range meta.Parts {
h.Write([]byte(fmt.Sprintf("part.%d", part.Number)))
}
h.Write([]byte(fmt.Sprintf("%v", meta.Erasure.Distribution)))
// make sure that length of Data is same
h.Write([]byte(fmt.Sprintf("%v", len(meta.Data))))
metaHashes[i] = hex.EncodeToString(h.Sum(nil))
h.Reset()
}
Expand Down Expand Up @@ -278,8 +280,8 @@ func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.

// pickValidFileInfo - picks one valid FileInfo content and returns from a
// slice of FileInfo.
func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (xmv FileInfo, e error) {
return findFileInfoInQuorum(ctx, metaArr, modTime, quorum)
func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, dataDir string, quorum int) (xmv FileInfo, e error) {
return findFileInfoInQuorum(ctx, metaArr, modTime, dataDir, quorum)
}

// writeUniqueFileInfo - writes unique `xl.meta` content for each disk concurrently.
Expand Down
Loading

0 comments on commit a7acfa6

Please sign in to comment.