Skip to content

Commit

Permalink
Fixed how blob storage PutBlob errors are handled in content.Manager (#…
Browse files Browse the repository at this point in the history
…117)

* Fixed how blob storage PutBlob errors are handled in content.Manager

In order to guarantee that all index entries have corresponding
pack blobs, we must ensure that `content.Manager.Flush` will
not succeed unless all pending writes have completed.

Added test that simulates various patterns of PutBlock failures and
ensures that data remains durable despite those, assuming all calls
to `WriteContent()` and `Flush()` are retried.

* addressed review feedback
  • Loading branch information
jkowalski committed Oct 2, 2019
1 parent 3b1177c commit d0e4d4f
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 20 deletions.
68 changes: 48 additions & 20 deletions repo/content/content_manager.go
Expand Up @@ -70,6 +70,7 @@ type Manager struct {

pendingPacks map[blob.ID]*pendingPackInfo
writingPacks []*pendingPackInfo // list of packs that are being written
failedPacks []*pendingPackInfo // list of packs that failed to write, will be retried
packIndexBuilder packIndexBuilder // contents that are in index currently being built (all packs saved but not committed)

disableIndexFlushCount int
Expand Down Expand Up @@ -152,8 +153,22 @@ func (bm *Manager) addToPackUnlocked(ctx context.Context, contentID ID, data []b
bm.cond.Wait()
}

// see if we have any packs that have failed previously
// retry writing them now.
//
// we're making a copy of bm.failedPacks since bm.writePackAndAddToIndex()
// will remove from it on success.
fp := append([]*pendingPackInfo(nil), bm.failedPacks...)
for _, pp := range fp {
if err := bm.writePackAndAddToIndex(ctx, pp, true); err != nil {
bm.unlock()
return errors.Wrap(err, "error writing previously failed pack")
}
}

if bm.timeNow().After(bm.flushPackIndexesAfter) {
if err := bm.flushPackIndexesLocked(ctx); err != nil {
bm.unlock()
return err
}
}
Expand Down Expand Up @@ -298,41 +313,54 @@ func (bm *Manager) finishAllPacksLocked(ctx context.Context) error {
return nil
}

func (bm *Manager) writePackAndAddToIndex(ctx context.Context, pp *pendingPackInfo, lockHeld bool) error {
func (bm *Manager) writePackAndAddToIndex(ctx context.Context, pp *pendingPackInfo, holdingLock bool) error {
packFileIndex, err := bm.prepareAndWritePackInternal(ctx, pp)

if !holdingLock {
bm.lock()
defer func() {
bm.cond.Broadcast()
bm.unlock()
}()
}

// after finishing writing, remove from both writingPacks and failedPacks
bm.writingPacks = removePendingPack(bm.writingPacks, pp)
bm.failedPacks = removePendingPack(bm.failedPacks, pp)

if err == nil {
// success, add pack index builder entries to index.
for _, info := range packFileIndex {
bm.packIndexBuilder.Add(*info)
}
return nil
}

// failure - add to failedPacks slice again
bm.failedPacks = append(bm.failedPacks, pp)
return errors.Wrap(err, "error writing pack")
}

func (bm *Manager) prepareAndWritePackInternal(ctx context.Context, pp *pendingPackInfo) (packIndexBuilder, error) {
contentID := make([]byte, 16)
if _, err := cryptorand.Read(contentID); err != nil {
return errors.Wrap(err, "unable to read crypto bytes")
return nil, errors.Wrap(err, "unable to read crypto bytes")
}

packFile := blob.ID(fmt.Sprintf("%v%x", pp.prefix, contentID))
contentData, packFileIndex, err := bm.preparePackDataContent(ctx, pp, packFile)
if err != nil {
return errors.Wrap(err, "error preparing data content")
return nil, errors.Wrap(err, "error preparing data content")
}

if len(contentData) > 0 {
if err := bm.writePackFileNotLocked(ctx, packFile, contentData); err != nil {
return errors.Wrap(err, "can't save pack data content")
return nil, errors.Wrap(err, "can't save pack data content")
}
formatLog.Debugf("wrote pack file: %v (%v bytes)", packFile, len(contentData))
}

// after the file has been writte, add pack index builder entries to index.
if !lockHeld {
bm.lock()
}

bm.writingPacks = removePendingPack(bm.writingPacks, pp)
for _, info := range packFileIndex {
bm.packIndexBuilder.Add(*info)
}

if !lockHeld {
// we changed the in-flight packs, notify all waiting on the condition variable
bm.cond.Broadcast()
bm.unlock()
}
return nil
return packFileIndex, nil
}

func removePendingPack(slice []*pendingPackInfo, pp *pendingPackInfo) []*pendingPackInfo {
Expand Down
118 changes: 118 additions & 0 deletions repo/content/content_manager_test.go
Expand Up @@ -25,6 +25,7 @@ import (

const (
maxPackSize = 2000
maxRetries = 100
)

var fakeTime = time.Date(2017, 1, 1, 0, 0, 0, 0, time.UTC)
Expand Down Expand Up @@ -573,6 +574,84 @@ func verifyAllDataPresent(t *testing.T, data map[blob.ID][]byte, contentIDs map[
}
}

func TestHandleWriteErrors(t *testing.T) {
ctx := context.Background()

// genFaults(S0,F0,S1,F1,...,) generates a list of faults
// where success is returned Sn times followed by failure returned Fn times
genFaults := func(counts ...int) []*blobtesting.Fault {
var result []*blobtesting.Fault

for i, cnt := range counts {
if i%2 == 0 {
result = append(result, &blobtesting.Fault{
Repeat: cnt - 1,
})
} else {
result = append(result, &blobtesting.Fault{
Repeat: cnt - 1,
Err: errors.Errorf("some write error"),
})
}
}
return result
}

// simulate a stream of PutBlob failures, write some contents followed by flush
// count how many times we retried writes/flushes
// also, verify that all the data is durable
cases := []struct {
faults []*blobtesting.Fault // failures to similuate
numContents int // how many contents to write
contentSize int // size of each content
expectedFlushRetries int
expectedWriteRetries int
}{
{faults: genFaults(0, 10, 10, 10, 10, 10, 10, 10, 10, 10), numContents: 5, contentSize: maxPackSize, expectedWriteRetries: 10, expectedFlushRetries: 0},
{faults: genFaults(1, 2), numContents: 1, contentSize: maxPackSize, expectedWriteRetries: 0, expectedFlushRetries: 2},
{faults: genFaults(1, 2), numContents: 10, contentSize: maxPackSize, expectedWriteRetries: 2, expectedFlushRetries: 0},
// 2 failures, 2 successes (pack blobs), 1 failure (flush), 1 success (flush)
{faults: genFaults(0, 2, 2, 1, 1, 1, 1), numContents: 2, contentSize: maxPackSize, expectedWriteRetries: 2, expectedFlushRetries: 1},
{faults: genFaults(0, 2, 2, 1, 1, 1, 1), numContents: 4, contentSize: maxPackSize / 2, expectedWriteRetries: 2, expectedFlushRetries: 1},
}

for n, tc := range cases {
tc := tc
t.Run(fmt.Sprintf("case-%v", n), func(t *testing.T) {
data := blobtesting.DataMap{}
keyTime := map[blob.ID]time.Time{}
st := blobtesting.NewMapStorage(data, keyTime, nil)

// set up fake storage that is slow at PutBlob causing writes to be piling up
fs := &blobtesting.FaultyStorage{
Base: st,
Faults: map[string][]*blobtesting.Fault{
"PutBlob": tc.faults,
},
}

bm := newTestContentManagerWithStorage(fs, nil)
writeRetries := 0
var cids []ID
for i := 0; i < tc.numContents; i++ {
cid, retries := writeContentWithRetriesAndVerify(ctx, t, bm, seededRandomData(i, tc.contentSize))
writeRetries += retries
cids = append(cids, cid)
}
if got, want := flushWithRetries(ctx, t, bm), tc.expectedFlushRetries; got != want {
t.Errorf("invalid # of flush retries %v, wanted %v", got, want)
}
if got, want := writeRetries, tc.expectedWriteRetries; got != want {
t.Errorf("invalid # of write retries %v, wanted %v", got, want)
}
bm2 := newTestContentManagerWithStorage(st, nil)
for i, cid := range cids {
verifyContent(ctx, t, bm2, cid, seededRandomData(i, tc.contentSize))
}
})
}
}

func TestRewriteNonDeleted(t *testing.T) {
const stepBehaviors = 3

Expand Down Expand Up @@ -1193,6 +1272,45 @@ func writeContentAndVerify(ctx context.Context, t *testing.T, bm *Manager, b []b

return contentID
}
func flushWithRetries(ctx context.Context, t *testing.T, bm *Manager) int {
t.Helper()

var retryCount int

err := bm.Flush(ctx)
for i := 0; err != nil && i < maxRetries; i++ {
log.Warningf("flush failed %v, retrying", err)
err = bm.Flush(ctx)
retryCount++
}

if err != nil {
t.Errorf("err: %v", err)
}
return retryCount
}

func writeContentWithRetriesAndVerify(ctx context.Context, t *testing.T, bm *Manager, b []byte) (contentID ID, retryCount int) {
t.Helper()

contentID, err := bm.WriteContent(ctx, b, "")
for i := 0; err != nil && i < maxRetries; i++ {
retryCount++
log.Warningf("WriteContent failed %v, retrying", err)
contentID, err = bm.WriteContent(ctx, b, "")
}
if err != nil {
t.Errorf("err: %v", err)
}

if got, want := contentID, ID(hashValue(b)); got != want {
t.Errorf("invalid content ID for %x, got %v, want %v", b, got, want)
}

verifyContent(ctx, t, bm, contentID, b)

return contentID, retryCount
}

func seededRandomData(seed, length int) []byte {
b := make([]byte, length)
Expand Down

0 comments on commit d0e4d4f

Please sign in to comment.