Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix handling of tenative cursor presence if protection strategy doesn't use it #714

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
74 changes: 50 additions & 24 deletions endpoint/endpoint.go
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"path"
"strings"

"github.com/kr/pretty"
"github.com/pkg/errors"
Expand Down Expand Up @@ -233,6 +234,21 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
//
// Note further that a resuming send, due to the idempotent nature of func CreateReplicationCursor and HoldStep,
// will never lose its step holds because we just (idempotently re-)created them above, before attempting the cleanup.
destroyTypes := AbstractionTypeSet{
AbstractionStepHold: true,
AbstractionTentativeReplicationCursorBookmark: true,
}
// The replication planner can also pick an endpoint zfs abstraction as FromVersion.
// Keep it, so that the replication will succeed.
//
// NB: there is no abstraction for snapshots, so, we only need to check bookmarks.
if sendArgs.FromVersion != nil && sendArgs.FromVersion.IsBookmark() {
dp, err := zfs.NewDatasetPath(sendArgs.FS)
if err != nil {
panic(err) // sendArgs is validated, this shouldn't happen
}
liveAbs = append(liveAbs, destroyTypes.ExtractBookmark(dp, sendArgs.FromVersion))
}
func() {
ctx, endSpan := trace.WithSpan(ctx, "cleanup-stale-abstractions")
defer endSpan()
Expand All @@ -245,35 +261,45 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
return keep
}
check := func(obsoleteAbs []Abstraction) {
// last line of defense: check that we don't destroy the incremental `from` and `to`
// if we did that, we might be about to blow away the last common filesystem version between sender and receiver
mustLiveVersions := []zfs.FilesystemVersion{sendArgs.ToVersion}
// Ensure that we don't delete `From` or `To`.
// Regardless of whether they are in AbstractionTypeSet or not.
// And produce a nice error message in case we do, to aid debugging the resulting panic.
//
// This is especially important for `From`. We could break incremental replication
// if we deleted the last common filesystem version between sender and receiver.
type Problem struct {
sendArgsWhat string
fullpath string
obsoleteAbs Abstraction
}
problems := make([]Problem, 0)
checkFullpaths := make(map[string]string, 2)
checkFullpaths["ToVersion"] = sendArgs.ToVersion.FullPath(sendArgs.FS)
if sendArgs.FromVersion != nil {
mustLiveVersions = append(mustLiveVersions, *sendArgs.FromVersion)
checkFullpaths["FromVersion"] = sendArgs.FromVersion.FullPath(sendArgs.FS)
}
for _, staleVersion := range obsoleteAbs {
for _, mustLiveVersion := range mustLiveVersions {
isSendArg := zfs.FilesystemVersionEqualIdentity(mustLiveVersion, staleVersion.GetFilesystemVersion())
stepHoldBasedGuaranteeStrategy := false
k := replicationGuaranteeStrategy.Kind()
switch k {
case ReplicationGuaranteeKindResumability:
stepHoldBasedGuaranteeStrategy = true
case ReplicationGuaranteeKindIncremental:
case ReplicationGuaranteeKindNone:
default:
panic(fmt.Sprintf("this is supposed to be an exhaustive match, got %v", k))
}
isSnapshot := mustLiveVersion.IsSnapshot()
if isSendArg && (!isSnapshot || stepHoldBasedGuaranteeStrategy) {
panic(fmt.Sprintf("impl error: %q would be destroyed because it is considered stale but it is part of of sendArgs=%s", mustLiveVersion.String(), pretty.Sprint(sendArgs)))
for _, a := range obsoleteAbs {
for what, fullpath := range checkFullpaths {
if a.GetFullPath() == fullpath && a.GetType().IsSnapshotOrBookmark() {
problems = append(problems, Problem{
sendArgsWhat: what,
fullpath: fullpath,
obsoleteAbs: a,
})
}
}
}
}
destroyTypes := AbstractionTypeSet{
AbstractionStepHold: true,
AbstractionTentativeReplicationCursorBookmark: true,
if len(problems) == 0 {
return
}
var msg strings.Builder
fmt.Fprintf(&msg, "cleaning up send stale would destroy send args:\n")
fmt.Fprintf(&msg, " SendArgs: %s\n", pretty.Sprint(sendArgs))
for _, check := range problems {
fmt.Fprintf(&msg, "would delete %s %s because it was deemed an obsolete abstraction: %s\n",
check.sendArgsWhat, check.fullpath, check.obsoleteAbs)
}
panic(msg.String())
}
abstractionsCacheSingleton.TryBatchDestroy(ctx, s.jobId, sendArgs.FS, destroyTypes, keep, check)
}()
Expand Down
6 changes: 6 additions & 0 deletions endpoint/endpoint_guarantees.go
Expand Up @@ -89,6 +89,8 @@ func ReplicationGuaranteeFromKind(k ReplicationGuaranteeKind) ReplicationGuarant

type ReplicationGuaranteeNone struct{}

func (g ReplicationGuaranteeNone) String() string { return "none" }

func (g ReplicationGuaranteeNone) Kind() ReplicationGuaranteeKind {
return ReplicationGuaranteeKindNone
}
Expand All @@ -107,6 +109,8 @@ func (g ReplicationGuaranteeNone) SenderPostRecvConfirmed(ctx context.Context, j

type ReplicationGuaranteeIncremental struct{}

func (g ReplicationGuaranteeIncremental) String() string { return "incremental" }

func (g ReplicationGuaranteeIncremental) Kind() ReplicationGuaranteeKind {
return ReplicationGuaranteeKindIncremental
}
Expand Down Expand Up @@ -144,6 +148,8 @@ func (g ReplicationGuaranteeIncremental) SenderPostRecvConfirmed(ctx context.Con

type ReplicationGuaranteeResumability struct{}

func (g ReplicationGuaranteeResumability) String() string { return "resumability" }

func (g ReplicationGuaranteeResumability) Kind() ReplicationGuaranteeKind {
return ReplicationGuaranteeKindResumability
}
Expand Down
51 changes: 50 additions & 1 deletion endpoint/endpoint_zfs_abstraction.go
Expand Up @@ -31,7 +31,7 @@ const (
AbstractionReplicationCursorBookmarkV2 AbstractionType = "replication-cursor-bookmark-v2"
)

var AbstractionTypesAll = map[AbstractionType]bool{
var AbstractionTypesAll = AbstractionTypeSet{
AbstractionStepHold: true,
AbstractionLastReceivedHold: true,
AbstractionTentativeReplicationCursorBookmark: true,
Expand Down Expand Up @@ -181,6 +181,38 @@ func (s AbstractionTypeSet) Validate() error {
return nil
}

// Use the `BookmarkExtractor()` method of each abstraction type in this set
// to try extract an abstraction from the given FilesystemVersion.
//
// Abstraction types in this set that don't have a bookmark extractor are skipped.
//
// Panics if more than one abstraction type matches.
func (s AbstractionTypeSet) ExtractBookmark(dp *zfs.DatasetPath, v *zfs.FilesystemVersion) Abstraction {
matched := make(AbstractionTypeSet, 1)
var matchedAbs Abstraction
for absType := range s {
extractor := absType.BookmarkExtractor()
if extractor == nil {
continue
}
abstraction := extractor(dp, *v)
if abstraction != nil {
matched[absType] = true
matchedAbs = abstraction
}
}
if len(matched) == 0 {
return nil
}
if len(matched) == 1 {
if matchedAbs == nil {
panic("loop above should always set matchedAbs if there is a match")
}
return matchedAbs
}
panic(fmt.Sprintf("abstraction types extractors should not overlap: %s", matched))
}

type BookmarkExtractor func(fs *zfs.DatasetPath, v zfs.FilesystemVersion) Abstraction

// returns nil if the abstraction type is not bookmark-based
Expand Down Expand Up @@ -238,6 +270,23 @@ func (t AbstractionType) BookmarkNamer() func(fs string, guid uint64, jobId JobI
}
}

func (t AbstractionType) IsSnapshotOrBookmark() bool {
switch t {
case AbstractionTentativeReplicationCursorBookmark:
return true
case AbstractionReplicationCursorBookmarkV1:
return true
case AbstractionReplicationCursorBookmarkV2:
return true
case AbstractionStepHold:
return false
case AbstractionLastReceivedHold:
return false
default:
panic(fmt.Sprintf("unimpl: %q", t))
}
}

type ListZFSHoldsAndBookmarksQuery struct {
FS ListZFSHoldsAndBookmarksQueryFilesystemFilter
// What abstraction types should match (any contained in the set)
Expand Down
1 change: 1 addition & 0 deletions platformtest/tests/generated_cases.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 92 additions & 3 deletions platformtest/tests/replication.go
Expand Up @@ -248,7 +248,10 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte
require.NoError(ctx, err)
snap2Hold, err := endpoint.HoldStep(ctx, sfs, snap2, jobId) // no shadow
require.NoError(ctx, err)
return []endpoint.Abstraction{snap2Cursor, snap1Hold, snap2Hold}
// create artificial tentative cursor
snap3TentativeCursor, err := endpoint.CreateTentativeReplicationCursor(ctx, sfs, snap3, jobId)
require.NoError(ctx, err)
return []endpoint.Abstraction{snap2Cursor, snap1Hold, snap2Hold, snap3TentativeCursor}
}
createArtificalStaleAbstractions(sjid)
ojidSendAbstractions := createArtificalStaleAbstractions(ojid)
Expand Down Expand Up @@ -333,21 +336,29 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte
require.NoError(ctx, err)
snap2OjidCursorName, err := endpoint.ReplicationCursorBookmarkName(sfs, snap2.Guid, ojid)
require.NoError(ctx, err)
snap3SjidTentativeCursorName, err := endpoint.TentativeReplicationCursorBookmarkName(sfs, snap3.Guid, sjid)
require.NoError(ctx, err)
snap3OjidTentativeCursorName, err := endpoint.TentativeReplicationCursorBookmarkName(sfs, snap3.Guid, ojid)
require.NoError(ctx, err)
var bmNames []string
for _, bm := range sBms {
bmNames = append(bmNames, bm.Name)
}

if invalidateCacheBeforeSecondReplication {
require.Len(ctx, sBms, 3)
require.Len(ctx, sBms, 4)
require.Contains(ctx, bmNames, snap5SjidCursorName)
require.Contains(ctx, bmNames, snap2OjidCursorName)
require.Contains(ctx, bmNames, snap3OjidTentativeCursorName)
require.Contains(ctx, bmNames, "2")
} else {
require.Len(ctx, sBms, 4)
require.Len(ctx, sBms, 6)
ctx.Logf("%s", pretty.Sprint(sBms))
require.Contains(ctx, bmNames, snap5SjidCursorName)
require.Contains(ctx, bmNames, snap2SjidCursorName)
require.Contains(ctx, bmNames, snap2OjidCursorName)
require.Contains(ctx, bmNames, snap3SjidTentativeCursorName)
require.Contains(ctx, bmNames, snap3OjidTentativeCursorName)
require.Contains(ctx, bmNames, "2")
}
}
Expand All @@ -370,6 +381,84 @@ func implReplicationIncrementalCleansUpStaleAbstractions(ctx *platformtest.Conte

}

func ReplicationIncrementalHandlesFromVersionEqTentativeCursorCorrectly(ctx *platformtest.Context) {

platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
CREATEROOT
+ "sender"
+ "sender@1"
+ "receiver"
R zfs create -p "${ROOTDS}/receiver/${ROOTDS}"
`)

sjid := endpoint.MustMakeJobID("sender-job")
rjid := endpoint.MustMakeJobID("receiver-job")

sfs := ctx.RootDataset + "/sender"
rfsRoot := ctx.RootDataset + "/receiver"

rep := replicationInvocation{
sjid: sjid,
rjid: rjid,
sfs: sfs,
rfsRoot: rfsRoot,
// It doesn't really matter what guarantee we use here, as the second replication will configure another.
// But, in the real world, the only way for a stale tentative cursor to appear is if the guarantee is set to
// incremental replication and we crash before converting the tentative cursor into a regular cursor.
guarantee: pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeIncrementalReplication),
}

// Do initial replication to set up the test.
rep1 := rep.Do(ctx)
ctx.Logf("\n%s", pretty.Sprint(rep1))
sfsDs := mustDatasetPath(sfs)
snap1_sender := mustGetFilesystemVersion(ctx, sfs+"@1")
snap1_replicationCursor_name, err := endpoint.ReplicationCursorBookmarkName(sfs, snap1_sender.Guid, sjid)
require.NoError(ctx, err)
snap1_replicationCursor := mustGetFilesystemVersion(ctx, sfs+"#"+snap1_replicationCursor_name)

// The second replication will be done with a guarantee kind that doesn't create tentative cursors by itself.
// So, it would generally be right to clean up any tentative cursors on sfs since they're stale abstractions.
// However, if the cursor is used as the `from` version in any send step, we must not destroy it, as that
// would break incremental replication.
// NB: we only need to test the first step as all subsequent steps will be snapshot->snapshot.
rep.guarantee = pdu.ReplicationConfigProtectionWithKind(pdu.ReplicationGuaranteeKind_GuaranteeNothing)
// create the artificial cursor
snap1_tentativeCursor, err := endpoint.CreateTentativeReplicationCursor(ctx, sfs, snap1_sender, sjid)
require.NoError(ctx, err)
endpoint.AbstractionsCacheInvalidate(sfs)
// remove other bookmarks of snap1, and snap1 itself, to force the replication planner to use the tentative cursor
err = zfs.ZFSDestroyFilesystemVersion(ctx, sfsDs, &snap1_sender)
require.NoError(ctx, err)
err = zfs.ZFSDestroyFilesystemVersion(ctx, sfsDs, &snap1_replicationCursor)
require.NoError(ctx, err)
versions, err := zfs.ZFSListFilesystemVersions(ctx, sfsDs, zfs.ListFilesystemVersionsOptions{})
require.NoError(ctx, err)
require.Len(ctx, versions, 1)
require.Equal(ctx, versions[0].Guid, snap1_tentativeCursor.GetFilesystemVersion().Guid)
// create another snapshot so that replication does one incremental step `tentative_cursor` -> `@2`
mustSnapshot(ctx, sfs+"@2")
mustGetFilesystemVersion(ctx, sfs+"@2")
// do the replication
rep2 := rep.Do(ctx)
ctx.Logf("\n%s", pretty.Sprint(rep2))

// Ensure that the tentative cursor was used.
require.Len(ctx, rep2.Attempts, 1)
require.Equal(ctx, rep2.Attempts[0].State, report.AttemptDone)
require.Len(ctx, rep2.Attempts[0].Filesystems, 1)
require.Nil(ctx, rep2.Attempts[0].Filesystems[0].Error())
require.Len(ctx, rep2.Attempts[0].Filesystems[0].Steps, 1)
require.EqualValues(ctx, rep2.Attempts[0].Filesystems[0].CurrentStep, 1)
require.Len(ctx, rep2.Attempts[0].Filesystems[0].Steps, 1)
require.Equal(ctx, rep2.Attempts[0].Filesystems[0].Steps[0].Info.From, snap1_tentativeCursor.GetFilesystemVersion().RelName())

// Ensure that the tentative cursor was destroyed as part of SendPost.
_, err = zfs.ZFSGetFilesystemVersion(ctx, snap1_replicationCursor.FullPath(sfs))
_, ok := err.(*zfs.DatasetDoesNotExist)
require.True(ctx, ok)
}

type PartialSender struct {
*endpoint.Sender
failAfterByteCount int64
Expand Down