Skip to content

Commit

Permalink
testing: improvements to TestIndexBlobManagerStress test
Browse files Browse the repository at this point in the history
- better logging to be able to trace the root cause in case of a failure
- prevented concurrent compaction which is unsafe:

The sequence:

1. A creates contentA1 in INDEX-1
2. B creates contentB1 in INDEX-2
3. A deletes contentA1 in INDEX-3
4. B does compaction, but is not seeing INDEX-3 (due to EC or simply
   because B started read before #3 completed), so it writes
   INDEX-4==merge(INDEX-1,INDEX-2)
   * INDEX-4 has contentA1 as active
5. A does compaction but it's not seeing INDEX-4 yet (due to EC
   or because read started before #4), so it drops contentA1, writes
   INDEX-5=merge(INDEX-1,INDEX-2,INDEX-3)
   * INDEX-5 does not have contentA1
7. C sees INDEX-5 and INDEX-5 and merge(INDEX-4,INDEX-5)
   contains contentA1 which is wrong, because A has been deleted
   (and there's no record of it anywhere in the system)
  • Loading branch information
jkowalski committed May 31, 2020
1 parent b78f5b7 commit e8ba34a
Showing 1 changed file with 44 additions and 16 deletions.
60 changes: 44 additions & 16 deletions repo/content/index_blob_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"math/rand"
"os"
"runtime"
"strings"
"testing"
"time"
Expand All @@ -31,10 +30,8 @@ var (
fakeStoreStartTime = time.Date(2020, 1, 1, 10, 0, 0, 0, time.UTC)
)

// the values here must be O(minutes), so that with all actors racing in the stress test they won't inadvertedly
// advance the time too much to not even give blob storage a chance to react in time.
const (
testIndexBlobDeleteAge = 10 * time.Minute
testIndexBlobDeleteAge = 1 * time.Minute
testEventualConsistencySettleTime = 45 * time.Second
)

Expand Down Expand Up @@ -128,10 +125,10 @@ var actionsTestIndexBlobManagerStress = []struct {
weight int
}{
{actionWrite, 10},
{actionRead, 100},
{actionCompact, 100},
{actionRead, 10},
{actionCompact, 10},
{actionDelete, 10},
{actionUndelete, 15},
{actionUndelete, 10},
{actionCompactAndDropDeleted, 10},
}

Expand Down Expand Up @@ -161,6 +158,11 @@ func TestIndexBlobManagerStress(t *testing.T) {

rand.Seed(time.Now().UnixNano())

for i := range actionsTestIndexBlobManagerStress {
actionsTestIndexBlobManagerStress[i].weight = rand.Intn(100)
t.Logf("weight[%v] = %v", i, actionsTestIndexBlobManagerStress[i].weight)
}

var (
fakeTimeFunc = faketime.AutoAdvance(fakeLocalStartTime, 100*time.Millisecond)
deadline time.Time // when (according to fakeTimeFunc should the test finish)
Expand All @@ -182,20 +184,20 @@ func TestIndexBlobManagerStress(t *testing.T) {

var eg errgroup.Group

numActors := 2 * runtime.NumCPU()
numActors := 2

for actorID := 0; actorID < numActors; actorID++ {
actorID := actorID
loggedSt := logging.NewWrapper(st, func(m string, args ...interface{}) {
t.Logf(m, args...)
}, fmt.Sprintf("actor[%v]:", actorID))
t.Logf(fmt.Sprintf("@%v actor[%v]:", fakeTimeFunc().Format("150405.000"), actorID)+m, args...)
}, "")
contentPrefix := fmt.Sprintf("a%v", actorID)

eg.Go(func() error {
numWritten := 0
deletedContents := map[string]bool{}
ctx := testlogging.ContextWithLevelAndPrefixFunc(t, testlogging.LevelDebug, func() string {
return fmt.Sprintf("actor[%v]@%v:", actorID, fakeTimeFunc().Format("150405.000"))
return fmt.Sprintf("@%v actor[%v]:", fakeTimeFunc().Format("150405.000"), actorID)
})

m := newIndexBlobManagerForTesting(t, loggedSt, fakeTimeFunc)
Expand Down Expand Up @@ -224,11 +226,21 @@ func TestIndexBlobManagerStress(t *testing.T) {
}

case actionCompact:
// compaction by more than one actor is unsafe, do it only if actorID == 0
if actorID != 0 {
continue
}

if err := fakeCompaction(ctx, m, false); err != nil {
return errors.Wrapf(err, "actor[%v] compaction error", actorID)
}

case actionCompactAndDropDeleted:
// compaction by more than one actor is unsafe, do it only if actorID == 0
if actorID != 0 {
continue
}

if err := fakeCompaction(ctx, m, true); err != nil {
return errors.Wrapf(err, "actor[%v] compaction error", actorID)
}
Expand Down Expand Up @@ -346,6 +358,10 @@ type fakeContentIndexEntry struct {
}

func verifyFakeContentsWritten(ctx context.Context, m indexBlobManager, numWritten int, contentPrefix string, deletedContents map[string]bool) error {
if numWritten == 0 {
return nil
}

log(ctx).Debugf("verifyFakeContentsWritten()")
defer log(ctx).Debugf("finished verifyFakeContentsWritten()")

Expand Down Expand Up @@ -374,17 +390,21 @@ func verifyFakeContentsWritten(ctx context.Context, m indexBlobManager, numWritt
}

func fakeCompaction(ctx context.Context, m indexBlobManager, dropDeleted bool) error {
log(ctx).Debugf("fakeCompaction()")
defer log(ctx).Debugf("finished fakeCompaction()")
log(ctx).Debugf("fakeCompaction(dropDeleted=%v)", dropDeleted)
defer log(ctx).Debugf("finished fakeCompaction(dropDeleted=%v)", dropDeleted)

allContents, allBlobs, err := getAllFakeContents(ctx, m)
if err != nil {
return errors.Wrap(err, "error getting contents")
}

dropped := map[string]fakeContentIndexEntry{}

if dropDeleted {
for cid, e := range allContents {
if e.Deleted {
dropped[cid] = e

delete(allContents, cid)
}
}
Expand All @@ -399,6 +419,10 @@ func fakeCompaction(ctx context.Context, m indexBlobManager, dropDeleted bool) e
return errors.Wrap(err, "unable to write index")
}

for cid, e := range dropped {
log(ctx).Debugf("dropped deleted %v %v from %v", cid, e, outputBM)
}

var (
inputs []blob.Metadata
outputs = []blob.Metadata{outputBM}
Expand All @@ -425,13 +449,13 @@ func fakeContentID(prefix string, n int) string {
}

func deleteFakeContents(ctx context.Context, m indexBlobManager, prefix string, numWritten int, deleted map[string]bool, timeFunc func() time.Time) error {
log(ctx).Debugf("deleteFakeContents()")
defer log(ctx).Debugf("finished deleteFakeContents()")

if numWritten == 0 {
return nil
}

log(ctx).Debugf("deleteFakeContents()")
defer log(ctx).Debugf("finished deleteFakeContents()")

count := rand.Intn(10) + 5

ndx := map[string]fakeContentIndexEntry{}
Expand Down Expand Up @@ -460,6 +484,10 @@ func deleteFakeContents(ctx context.Context, m indexBlobManager, prefix string,
}

func undeleteFakeContents(ctx context.Context, m indexBlobManager, deleted map[string]bool, timeFunc func() time.Time) error {
if len(deleted) == 0 {
return nil
}

log(ctx).Debugf("undeleteFakeContents()")
defer log(ctx).Debugf("finished undeleteFakeContents()")

Expand Down

0 comments on commit e8ba34a

Please sign in to comment.