Skip to content

Commit

Permalink
Merge pull request #715 from pachyderm/fix-run-empty
Browse files Browse the repository at this point in the history
Fix a bug where empty input commits trigger parent-less output commits
  • Loading branch information
derekchiang committed Aug 12, 2016
2 parents 1d5e2da + 81ed1ad commit e478bb9
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 97 deletions.
178 changes: 92 additions & 86 deletions src/client/pps/pps.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/client/pps/pps.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ enum JobState {
JOB_RUNNING = 1;
JOB_FAILURE = 2;
JOB_SUCCESS = 3;
// An empty job is a job that hasn't actually been run.
// It's a result of empty input commits.
JOB_EMPTY = 4;
}

enum Partition {
Expand Down
38 changes: 36 additions & 2 deletions src/server/pachyderm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,13 +503,47 @@ func TestPipelineWithEmptyInputs(t *testing.T) {
outCommits := listCommitResponse.CommitInfo
require.Equal(t, 1, len(outCommits))
require.Equal(t, 0, int(outCommits[0].SizeBytes))
// No job should've been created
// An empty job should've been created
jobInfos, err := c.ListJob(pipelineName, nil)
require.NoError(t, err)
require.Equal(t, 0, len(jobInfos))
require.Equal(t, 1, len(jobInfos))
require.Equal(t, ppsclient.JobState_JOB_EMPTY, jobInfos[0].State)

// Make another empty commit in the input repo
// The output commit should have the previous output commit as its parent
parentOutputCommit := outCommits[0].Commit
commit2, err := c.StartCommit(dataRepo, commit1.ID, "")
require.NoError(t, err)
require.NoError(t, c.FinishCommit(dataRepo, commit2.ID))
listCommitRequest = &pfsclient.ListCommitRequest{
Repo: []*pfsclient.Repo{outRepo},
CommitType: pfsclient.CommitType_COMMIT_TYPE_READ,
Block: true,
FromCommit: []*pfsclient.Commit{parentOutputCommit},
}
listCommitResponse, err = c.PfsAPIClient.ListCommit(
context.Background(),
listCommitRequest,
)
require.NoError(t, err)
outCommits = listCommitResponse.CommitInfo
require.Equal(t, 1, len(outCommits))
require.Equal(t, 0, int(outCommits[0].SizeBytes))
require.Equal(t, parentOutputCommit.ID, outCommits[0].ParentCommit.ID)
jobInfos, err = c.ListJob(pipelineName, nil)
require.NoError(t, err)
require.Equal(t, 2, len(jobInfos))
require.Equal(t, ppsclient.JobState_JOB_EMPTY, jobInfos[1].State)

// create a pipeline that runs with empty commits
dataRepo = uniqueString("data")
require.NoError(t, c.CreateRepo(dataRepo))
pipelineName = uniqueString("pipeline")

commit, err := c.StartCommit(dataRepo, "", "")
require.NoError(t, err)
require.NoError(t, c.FinishCommit(dataRepo, commit.ID))

outRepo = ppsserver.PipelineRepo(client.NewPipeline(pipelineName))
require.NoError(t, c.CreatePipeline(
pipelineName,
Expand Down
1 change: 1 addition & 0 deletions src/server/pps/persist/server/rethink_api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (a *rethinkAPIServer) InspectJob(ctx context.Context, request *ppsclient.In
func(jobInfo gorethink.Term) gorethink.Term {
if request.BlockState {
return gorethink.Or(
jobInfo.Field("State").Eq(ppsclient.JobState_JOB_EMPTY),
jobInfo.Field("State").Eq(ppsclient.JobState_JOB_SUCCESS),
jobInfo.Field("State").Eq(ppsclient.JobState_JOB_FAILURE))
}
Expand Down

0 comments on commit e478bb9

Please sign in to comment.