Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport Fix for Spout pipelines not able to restart #8310

Merged
merged 2 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/pps/cmds/cmds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestUnrunnableJobInfo(t *testing.T) {
"pipeline", pipeline2, "inputPipeline", pipeline1).Run())
require.NoError(t, tu.PachctlBashCmd(t, c, `
pachctl wait commit data@master
sleep 10
sleep 20
# make sure that there is a not-run job
pachctl list job --raw \
| match "JOB_UNRUNNABLE"
Expand Down
48 changes: 25 additions & 23 deletions src/server/pps/server/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1796,24 +1796,24 @@ func getExpectedNumWorkers(pipelineInfo *pps.PipelineInfo) (int, error) {
// CreatePipeline implements the protobuf pps.CreatePipeline RPC
//
// Implementation note:
// - CreatePipeline always creates pipeline output branches such that the
// pipeline's spec branch is in the pipeline output branch's provenance
// - CreatePipeline will always create a new output commit, but that's done
// by CreateBranch at the bottom of the function, which sets the new output
// branch provenance, rather than commitPipelineInfoFromFileSet higher up.
// - This is because CreatePipeline calls hardStopPipeline towards the top,
// breaking the provenance connection from the spec branch to the output branch
// - For straightforward pipeline updates (e.g. new pipeline image)
// stopping + updating + starting the pipeline isn't necessary
// - However it is necessary in many slightly atypical cases (e.g. the
// pipeline input changed: if the spec commit is created while the
// output branch has its old provenance, or the output branch gets new
// provenance while the old spec commit is the HEAD of the spec branch,
// then an output commit will be created with provenance that doesn't
// match its spec's PipelineInfo.Details.Input. Another example is when
// request.Reprocess == true).
// - Rather than try to enumerate every case where we can't create a spec
// commit without stopping the pipeline, we just always stop the pipeline
// - CreatePipeline always creates pipeline output branches such that the
// pipeline's spec branch is in the pipeline output branch's provenance
// - CreatePipeline will always create a new output commit, but that's done
// by CreateBranch at the bottom of the function, which sets the new output
// branch provenance, rather than commitPipelineInfoFromFileSet higher up.
// - This is because CreatePipeline calls hardStopPipeline towards the top,
// breaking the provenance connection from the spec branch to the output branch
// - For straightforward pipeline updates (e.g. new pipeline image)
// stopping + updating + starting the pipeline isn't necessary
// - However it is necessary in many slightly atypical cases (e.g. the
// pipeline input changed: if the spec commit is created while the
// output branch has its old provenance, or the output branch gets new
// provenance while the old spec commit is the HEAD of the spec branch,
// then an output commit will be created with provenance that doesn't
// match its spec's PipelineInfo.Details.Input. Another example is when
// request.Reprocess == true).
// - Rather than try to enumerate every case where we can't create a spec
// commit without stopping the pipeline, we just always stop the pipeline
func (a *apiServer) CreatePipeline(ctx context.Context, request *pps.CreatePipelineRequest) (response *types.Empty, retErr error) {
metricsFn := metrics.ReportUserAction(ctx, a.reporter, "CreatePipeline")
defer func(start time.Time) { metricsFn(start, retErr) }(time.Now())
Expand Down Expand Up @@ -2644,11 +2644,13 @@ func (a *apiServer) StartPipeline(ctx context.Context, request *pps.StartPipelin
return errors.EnsureStack(err)
}
// restore same provenance to meta repo
if err := a.env.PFSServer.CreateBranchInTransaction(txnCtx, &pfs.CreateBranchRequest{
Branch: client.NewSystemRepo(pipelineInfo.Pipeline.Name, pfs.MetaRepoType).NewBranch(pipelineInfo.Details.OutputBranch),
Provenance: provenance,
}); err != nil {
return errors.EnsureStack(err)
if pipelineInfo.Details.Spout == nil && pipelineInfo.Details.Service == nil {
if err := a.env.PFSServer.CreateBranchInTransaction(txnCtx, &pfs.CreateBranchRequest{
Branch: client.NewSystemRepo(pipelineInfo.Pipeline.Name, pfs.MetaRepoType).NewBranch(pipelineInfo.Details.OutputBranch),
Provenance: provenance,
}); err != nil {
return errors.EnsureStack(err)
}
}

newPipelineInfo := &pps.PipelineInfo{}
Expand Down
35 changes: 35 additions & 0 deletions src/server/spout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,4 +557,39 @@ func testSpout(t *testing.T, usePachctl bool) {
}))
require.NoError(t, c.DeleteAll())
})

t.Run("SpoutRestart", func(t *testing.T) {
pipeline := tu.UniqueString("pipeline")
_, err := c.PpsAPIClient.CreatePipeline(
c.Ctx(),
&pps.CreatePipelineRequest{
Pipeline: client.NewPipeline(pipeline),
Transform: &pps.Transform{
Cmd: []string{"sleep", "infinity"},
},
Spout: &pps.Spout{},
},
)
require.NoError(t, err)
require.NoError(t, backoff.Retry(func() error {
pi, err := c.InspectPipeline(pipeline, false)
require.NoError(t, err)
if pi.State != pps.PipelineState_PIPELINE_RUNNING {
return errors.Errorf("expected pipeline state: %s, but got: %s", pps.PipelineState_PIPELINE_RUNNING, pi.State)
}
return nil
}, backoff.NewTestingBackOff()))

// stop and start spout pipeline
require.NoError(t, c.StopPipeline(pipeline))
require.NoError(t, backoff.Retry(func() error {
pi, err := c.InspectPipeline(pipeline, false)
require.NoError(t, err)
if pi.State != pps.PipelineState_PIPELINE_PAUSED {
return errors.Errorf("expected pipeline state: %s, but got: %s", pps.PipelineState_PIPELINE_PAUSED, pi.State)
}
return nil
}, backoff.NewTestingBackOff()))
require.NoError(t, c.StartPipeline(pipeline))
})
}