forked from pachyderm/pachyderm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
107 lines (98 loc) · 2.62 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package main
import (
"fmt"
"io"
"os"
"strings"
"github.com/pachyderm/pachyderm/src/client"
ppsclient "github.com/pachyderm/pachyderm/src/client/pps"
"github.com/pachyderm/pachyderm/src/server/pfs/fuse"
ppsserver "github.com/pachyderm/pachyderm/src/server/pps"
"github.com/spf13/cobra"
"go.pedge.io/env"
"go.pedge.io/pkg/exec"
"golang.org/x/net/context"
)
type appEnv struct {
PachydermAddress string `env:"PACHD_PORT_650_TCP_ADDR,required"`
}
func main() {
env.Main(do, &appEnv{})
}
func do(appEnvObj interface{}) error {
appEnv := appEnvObj.(*appEnv)
rootCmd := &cobra.Command{
Use: os.Args[0] + " job-id",
Short: `Pachyderm job-shim, coordinates with ppsd to create an output commit and run user work.`,
Long: `Pachyderm job-shim, coordinates with ppsd to create an output commit and run user work.`,
Run: func(cmd *cobra.Command, args []string) {
ppsClient, err := ppsserver.NewInternalJobAPIClientFromAddress(fmt.Sprintf("%v:650", appEnv.PachydermAddress))
if err != nil {
errorAndExit(err.Error())
}
response, err := ppsClient.StartJob(
context.Background(),
&ppsserver.StartJobRequest{
Job: &ppsclient.Job{
ID: args[0],
}})
if err != nil {
fmt.Fprintf(os.Stderr, "%s\n", err.Error())
os.Exit(0)
}
pfsClient, err := client.NewFromAddress(fmt.Sprintf("%v:650", appEnv.PachydermAddress))
if err != nil {
errorAndExit(err.Error())
}
mounter := fuse.NewMounter(appEnv.PachydermAddress, pfsClient)
ready := make(chan bool)
go func() {
if err := mounter.Mount(
"/pfs",
nil,
response.CommitMounts,
ready,
); err != nil {
errorAndExit(err.Error())
}
}()
<-ready
defer func() {
if err := mounter.Unmount("/pfs"); err != nil {
errorAndExit(err.Error())
}
}()
var readers []io.Reader
for _, line := range response.Transform.Stdin {
readers = append(readers, strings.NewReader(line+"\n"))
}
io := pkgexec.IO{
Stdin: io.MultiReader(readers...),
Stdout: os.Stdout,
Stderr: os.Stderr,
}
success := true
if err := pkgexec.RunIO(io, response.Transform.Cmd...); err != nil {
fmt.Fprintf(os.Stderr, "%s\n", err.Error())
success = false
}
if _, err := ppsClient.FinishJob(
context.Background(),
&ppsserver.FinishJobRequest{
Job: &ppsclient.Job{
ID: args[0],
},
Index: response.Index,
Success: success,
},
); err != nil {
errorAndExit(err.Error())
}
},
}
return rootCmd.Execute()
}
func errorAndExit(format string, args ...interface{}) {
fmt.Fprintf(os.Stderr, "%s\n", fmt.Sprintf(format, args...))
os.Exit(1)
}