-
Notifications
You must be signed in to change notification settings - Fork 567
/
pps.go
99 lines (91 loc) · 1.59 KB
/
pps.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package pps
import (
"io"
"go.pedge.io/proto/stream"
"golang.org/x/net/context"
)
func NewJob(jobID string) *Job {
return &Job{ID: jobID}
}
func NewPipeline(pipelineName string) *Pipeline {
return &Pipeline{Name: pipelineName}
}
func CreateJob(
client APIClient,
image string,
cmd []string,
stdin []string,
shards uint64,
inputs []*JobInput,
parentJobID string,
) (*Job, error) {
var parentJob *Job
if parentJobID != "" {
parentJob = NewJob(parentJobID)
}
return client.CreateJob(
context.Background(),
&CreateJobRequest{
Transform: &Transform{
Image: image,
Cmd: cmd,
Stdin: stdin,
},
Shards: shards,
Inputs: inputs,
ParentJob: parentJob,
},
)
}
func GetLogs(
client APIClient,
jobID string,
writer io.Writer,
) error {
getLogsClient, err := client.GetLogs(
context.Background(),
&GetLogsRequest{
Job: NewJob(jobID),
},
)
if err != nil {
return err
}
return protostream.WriteFromStreamingBytesClient(getLogsClient, writer)
}
func CreatePipeline(
client APIClient,
name string,
image string,
cmd []string,
stdin []string,
shards uint64,
inputs []*PipelineInput,
) error {
_, err := client.CreatePipeline(
context.Background(),
&CreatePipelineRequest{
Pipeline: NewPipeline(name),
Transform: &Transform{
Image: image,
Cmd: cmd,
Stdin: stdin,
},
Shards: shards,
Inputs: inputs,
},
)
return err
}
func DeletePipeline(
client APIClient,
name string,
) error {
_, err := client.DeletePipeline(
context.Background(),
&DeletePipelineRequest{
Pipeline: NewPipeline(name),
},
)
return err
}