Skip to content

Commit

Permalink
Merge pull request #694 from pachyderm/update_pipeline
Browse files Browse the repository at this point in the history
Update pipeline
  • Loading branch information
jdoliner committed Aug 15, 2016
2 parents b1e70fa + a77df28 commit 495f858
Show file tree
Hide file tree
Showing 25 changed files with 1,562 additions and 534 deletions.
28 changes: 25 additions & 3 deletions src/client/pfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ const (
CommitTypeWrite = pfs.CommitType_COMMIT_TYPE_WRITE
)

// CommitStatus alias pfs.CommitStatus_*
const (
CommitStatusNormal = pfs.CommitStatus_NORMAL
CommitStatusArchived = pfs.CommitStatus_ARCHIVED
CommitStatusCancelled = pfs.CommitStatus_CANCELLED
CommitStatusAll = pfs.CommitStatus_ALL
)

// CreateRepo creates a new Repo object in pfs with the given name. Repos are
// the top level data object in pfs and should be used to store data of a
// similar type. For example rather than having a single Repo for an entire
Expand Down Expand Up @@ -159,6 +167,19 @@ func (c APIClient) FinishCommit(repoName string, commitID string) error {
return sanitizeErr(err)
}

// ArchiveCommit marks a commit as archived. Archived commits are not listed in
// ListCommit unless commit status is set to Archived or All. Archived commits
// are not considered by FlushCommit either.
func (c APIClient) ArchiveCommit(repoName string, commitID string) error {
_, err := c.PfsAPIClient.ArchiveCommit(
context.Background(),
&pfs.ArchiveCommitRequest{
Commit: NewCommit(repoName, commitID),
},
)
return sanitizeErr(err)
}

// CancelCommit ends the process of committing data to a repo. It differs from
// FinishCommit in that the Commit will not be used as a source for downstream
// pipelines. CancelCommit is used primarily by PPS for the output commits of
Expand Down Expand Up @@ -196,12 +217,13 @@ func (c APIClient) InspectCommit(repoName string, commitID string) (*pfs.CommitI
// commitType specifies the type of commit you want returned, normally CommitTypeRead is the most useful option
// block, when set to true, will cause ListCommit to block until at least 1 new CommitInfo is available.
// Using fromCommitIDs and block you can get subscription semantics from ListCommit.
// all, when set to true, will cause ListCommit to return cancelled commits as well.
// commitStatus, controls the statuses of the returned commits. The default
// value `Normal` will filter out archived and cancelled commits.
// provenance specifies a set of provenance commits, only commits which have
// ALL of the specified commits as provenance will be returned unless
// provenance is nil in which case it is ignored.
func (c APIClient) ListCommit(repoNames []string, fromCommitIDs []string,
commitType pfs.CommitType, block bool, all bool, provenance []*pfs.Commit) ([]*pfs.CommitInfo, error) {
commitType pfs.CommitType, block bool, status pfs.CommitStatus, provenance []*pfs.Commit) ([]*pfs.CommitInfo, error) {
var repos []*pfs.Repo
for _, repoName := range repoNames {
repos = append(repos, &pfs.Repo{Name: repoName})
Expand All @@ -219,7 +241,7 @@ func (c APIClient) ListCommit(repoNames []string, fromCommitIDs []string,
Repo: repos,
FromCommit: fromCommits,
Block: block,
All: all,
Status: status,
Provenance: provenance,
},
)
Expand Down
450 changes: 287 additions & 163 deletions src/client/pfs/pfs.pb.go

Large diffs are not rendered by default.

21 changes: 19 additions & 2 deletions src/client/pfs/pfs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ message CommitInfo {
google.protobuf.Timestamp finished = 6;
uint64 size_bytes = 7;
bool cancelled = 8;
repeated Commit provenance = 9;
bool archived = 9;
repeated Commit provenance = 10;
}

message CommitInfos {
Expand Down Expand Up @@ -130,6 +131,7 @@ message DiffInfo {
uint64 size_bytes = 7;
bool cancelled = 8;
repeated Commit provenance = 9;
bool archived = 10;
}

message Shard {
Expand Down Expand Up @@ -173,16 +175,27 @@ message FinishCommitRequest {
google.protobuf.Timestamp finished = 3;
}

message ArchiveCommitRequest {
Commit commit = 1;
}

message InspectCommitRequest {
Commit commit = 1;
}

enum CommitStatus {
NORMAL = 0;
ARCHIVED = 1;
CANCELLED = 2;
ALL = 3;
}

message ListCommitRequest {
repeated Repo repo = 1;
CommitType commit_type = 2;
repeated Commit from_commit = 3;
repeated Commit provenance = 4;
bool all = 5;
CommitStatus status = 5;
bool block = 6;
}

Expand Down Expand Up @@ -275,6 +288,8 @@ service API {
rpc StartCommit(StartCommitRequest) returns (Commit) {}
// FinishCommit turns a write commit into a read commit.
rpc FinishCommit(FinishCommitRequest) returns (google.protobuf.Empty) {}
// ArchiveCommit marks a commit as archived, it will be excluded from ListCommit.
rpc ArchiveCommit(ArchiveCommitRequest) returns (google.protobuf.Empty) {}
// InspectCommit returns the info about a commit.
rpc InspectCommit(InspectCommitRequest) returns (CommitInfo) {}
// ListCommit returns info about all commits.
Expand Down Expand Up @@ -319,6 +334,8 @@ service InternalAPI {
rpc StartCommit(StartCommitRequest) returns (google.protobuf.Empty) {}
// FinishCommit turns a write commit into a read commit.
rpc FinishCommit(FinishCommitRequest) returns (google.protobuf.Empty) {}
// ArchiveCommit marks a commit as archived, it will be excluded from ListCommit.
rpc ArchiveCommit(ArchiveCommitRequest) returns (google.protobuf.Empty) {}
// InspectCommit returns the info about a commit.
rpc InspectCommit(InspectCommitRequest) returns (CommitInfo) {}
// ListCommit returns info about all commits.
Expand Down
26 changes: 26 additions & 0 deletions src/client/pps.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,15 @@ func (c APIClient) GetLogs(
// on availabe resources.
// inputs specifies a set of Repos that will be visible to the jobs during runtime.
// commits to these repos will cause the pipeline to create new jobs to process them.
// update indicates that you want to update an existing pipeline
func (c APIClient) CreatePipeline(
name string,
image string,
cmd []string,
stdin []string,
parallelism uint64,
inputs []*pps.PipelineInput,
update bool,
) error {
_, err := c.PpsAPIClient.CreatePipeline(
context.Background(),
Expand All @@ -206,6 +208,7 @@ func (c APIClient) CreatePipeline(
},
Parallelism: parallelism,
Inputs: inputs,
Update: update,
},
)
return sanitizeErr(err)
Expand Down Expand Up @@ -244,3 +247,26 @@ func (c APIClient) DeletePipeline(name string) error {
)
return sanitizeErr(err)
}

// StartPipeline restarts a stopped pipeline.
func (c APIClient) StartPipeline(name string) error {
_, err := c.PpsAPIClient.StartPipeline(
context.Background(),
&pps.StartPipelineRequest{
Pipeline: NewPipeline(name),
},
)
return sanitizeErr(err)
}

// StopPipeline prevents a pipeline from processing things, it can be restarted
// with StartPipeline.
func (c APIClient) StopPipeline(name string) error {
_, err := c.PpsAPIClient.StopPipeline(
context.Background(),
&pps.StopPipelineRequest{
Pipeline: NewPipeline(name),
},
)
return sanitizeErr(err)
}

0 comments on commit 495f858

Please sign in to comment.