Skip to content

Commit

Permalink
[CORE-448] Spread alias commits from output branches to meta branches. (
Browse files Browse the repository at this point in the history
#8095) (#8116)

* Spread alias commits from output branches to meta branches.

This eliminates a large class of bugs.

* Skip alias meta commits

* Update TestChainedPipelines

* fix TestChainedPipelinesNoDelay

Co-authored-by: acohen4 <alonco124@gmail.com>

Co-authored-by: PFedak <pzfedak@gmail.com>
  • Loading branch information
acohen4 and PFedak committed Aug 30, 2022
1 parent 98afbd5 commit 77f598a
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 11 deletions.
13 changes: 10 additions & 3 deletions src/server/pachyderm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3851,6 +3851,13 @@ func TestPipelineThatSymlinks(t *testing.T) {
}

// TestChainedPipelines tracks https://github.com/pachyderm/pachyderm/v2/issues/797
// DAG:
//
// A
// |
// B D
// | /
// C
func TestChainedPipelines(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration tests in short mode")
Expand Down Expand Up @@ -3911,7 +3918,7 @@ func TestChainedPipelines(t *testing.T) {

commitInfos, err := c.WaitCommitSetAll(commitInfo.Commit.ID)
require.NoError(t, err)
require.Equal(t, 7, len(commitInfos))
require.Equal(t, 8, len(commitInfos))

var buf bytes.Buffer
require.NoError(t, c.GetFile(commitInfo.Commit, "bFile", &buf))
Expand Down Expand Up @@ -4005,7 +4012,7 @@ func TestChainedPipelinesNoDelay(t *testing.T) {
require.NoError(t, err)
commitInfos, err := c.WaitCommitSetAll(commitInfo.Commit.ID)
require.NoError(t, err)
require.Equal(t, 9, len(commitInfos))
require.Equal(t, 11, len(commitInfos))

eCommit2, err := c.StartCommit(eRepo, "master")
require.NoError(t, err)
Expand All @@ -4014,7 +4021,7 @@ func TestChainedPipelinesNoDelay(t *testing.T) {

commitInfos, err = c.WaitCommitSetAll(eCommit2.ID)
require.NoError(t, err)
require.Equal(t, 10, len(commitInfos))
require.Equal(t, 11, len(commitInfos))

// Get number of jobs triggered in pipeline D
jobInfos, err := c.ListJob(dPipeline, nil, -1, true)
Expand Down
21 changes: 21 additions & 0 deletions src/server/pfs/server/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,27 @@ func (d *driver) propagateBranches(txnCtx *txncontext.TransactionContext, branch
// Update the cached branch head
provOfSubvBI.Head.ID = txnCtx.CommitSetID
}
// if this is a pipeline output branch, we need to also create an alias commit on the meta branch
// to maintain pipeline system invariants.
if provOfSubvBI.Branch.Repo.Type == pfs.UserRepoType {
metaBranch := client.NewSystemRepo(provOfSubvBI.Branch.Repo.Name, pfs.MetaRepoType).
NewBranch(provOfSubvBI.Branch.Name)
metaBI, err := getBranchInfo(metaBranch)
if err != nil {
if col.IsErrNotFound(err) {
// no corresponding meta branch, so not a pipeline. Ignore
continue
}
return err
}
// create the alias if necessary, just like above
if metaBI.Head.ID != txnCtx.CommitSetID {
if _, err := d.aliasCommit(txnCtx, metaBI.Head, metaBI.Head.Branch); err != nil {
return err
}
metaBI.Head.ID = txnCtx.CommitSetID
}
}
}

if subvBI.Head.ID == txnCtx.CommitSetID {
Expand Down
18 changes: 10 additions & 8 deletions src/server/worker/pipeline/transform/pending_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,16 @@ func (pj *pendingJob) load() error {
if err != nil {
return errors.EnsureStack(err)
}
outputCI, err := pachClient.InspectCommit(pj.baseMetaCommit.Branch.Repo.Name,
pj.baseMetaCommit.Branch.Name, pj.baseMetaCommit.ID)
if err != nil {
return errors.EnsureStack(err)
}
// both commits must have succeeded - a validation error will only show up in the output
if metaCI.Error == "" && outputCI.Error == "" {
break
if metaCI.Origin.Kind == pfs.OriginKind_AUTO {
outputCI, err := pachClient.InspectCommit(pj.baseMetaCommit.Branch.Repo.Name,
pj.baseMetaCommit.Branch.Name, pj.baseMetaCommit.ID)
if err != nil {
return errors.EnsureStack(err)
}
// both commits must have succeeded - a validation error will only show up in the output
if metaCI.Error == "" && outputCI.Error == "" {
break
}
}
pj.baseMetaCommit = metaCI.ParentCommit
}
Expand Down

0 comments on commit 77f598a

Please sign in to comment.