forked from pachyderm/pachyderm
/
main.go
166 lines (140 loc) · 5.36 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package main
import (
"context"
"fmt"
"net/http"
_ "net/http/pprof"
"path"
"time"
"golang.org/x/sync/errgroup"
etcd "github.com/coreos/etcd/clientv3"
"github.com/pachyderm/pachyderm/src/client"
"github.com/pachyderm/pachyderm/src/client/pkg/grpcutil"
"github.com/pachyderm/pachyderm/src/client/pps"
"github.com/pachyderm/pachyderm/src/client/version"
"github.com/pachyderm/pachyderm/src/server/pkg/cmdutil"
"github.com/pachyderm/pachyderm/src/server/pkg/ppsutil"
"github.com/pachyderm/pachyderm/src/server/worker"
"google.golang.org/grpc"
log "github.com/sirupsen/logrus"
)
// appEnv stores the environment variables that this worker needs
type appEnv struct {
// Address of etcd, so that worker can write its own IP there for discoverh
EtcdAddress string `env:"ETCD_PORT_2379_TCP_ADDR,required"`
// Address for connecting to pachd (so this can download input data)
PachdAddress string `env:"PACHD_PORT_650_TCP_ADDR,required"`
// Prefix in etcd for all pachd-related records
PPSPrefix string `env:"PPS_ETCD_PREFIX,required"`
// worker gets its own IP here, via the k8s downward API. It then writes that
// IP back to etcd so that pachd can discover it
PPSWorkerIP string `env:"PPS_WORKER_IP,required"`
// The name of the pipeline that this worker belongs to
PPSPipelineName string `env:"PPS_PIPELINE_NAME,required"`
// The ID of the commit that contains the pipeline spec.
PPSSpecCommitID string `env:"PPS_SPEC_COMMIT,required"`
// The name of this pod
PodName string `env:"PPS_POD_NAME,required"`
// The namespace in which Pachyderm is deployed
Namespace string `env:"PPS_NAMESPACE,required"`
}
func main() {
cmdutil.Main(do, &appEnv{})
}
// getPipelineInfo gets the PipelineInfo proto describing the pipeline that this
// worker is part of.
// getPipelineInfo has the side effect of adding auth to the passed pachClient
// which is necessary to get the PipelineInfo from pfs.
func getPipelineInfo(etcdClient *etcd.Client, pachClient *client.APIClient, appEnv *appEnv) (*pps.PipelineInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
resp, err := etcdClient.Get(ctx, path.Join(appEnv.PPSPrefix, "pipelines", appEnv.PPSPipelineName))
if err != nil {
return nil, err
}
if len(resp.Kvs) != 1 {
return nil, fmt.Errorf("expected to find 1 pipeline (%s), got %d: %v", appEnv.PPSPipelineName, len(resp.Kvs), resp)
}
var pipelinePtr pps.EtcdPipelineInfo
if err := pipelinePtr.Unmarshal(resp.Kvs[0].Value); err != nil {
return nil, err
}
pachClient.SetAuthToken(pipelinePtr.AuthToken)
// Notice we use the SpecCommitID from our env, not from etcd. This is
// because the value in etcd might get updated while the worker pod is
// being created and we don't want to run the transform of one version of
// the pipeline in the image of a different verison.
pipelinePtr.SpecCommit.ID = appEnv.PPSSpecCommitID
return ppsutil.GetPipelineInfo(pachClient, appEnv.PPSPipelineName, &pipelinePtr)
}
func do(appEnvObj interface{}) error {
go func() {
log.Println(http.ListenAndServe(":651", nil))
}()
appEnv := appEnvObj.(*appEnv)
// Construct a client that connects to the sidecar.
pachClient, err := client.NewFromAddress("localhost:650")
if err != nil {
return fmt.Errorf("error constructing pachClient: %v", err)
}
// Get etcd client, so we can register our IP (so pachd can discover us)
etcdClient, err := etcd.New(etcd.Config{
Endpoints: []string{fmt.Sprintf("%s:2379", appEnv.EtcdAddress)},
DialOptions: client.EtcdDialOptions(),
})
if err != nil {
return fmt.Errorf("error constructing etcdClient: %v", err)
}
pipelineInfo, err := getPipelineInfo(etcdClient, pachClient, appEnv)
if err != nil {
return fmt.Errorf("error getting pipelineInfo: %v", err)
}
// Construct worker API server.
workerRcName := ppsutil.PipelineRcName(pipelineInfo.Pipeline.Name, pipelineInfo.Version)
apiServer, err := worker.NewAPIServer(pachClient, etcdClient, appEnv.PPSPrefix, pipelineInfo, appEnv.PodName, appEnv.Namespace)
if err != nil {
return err
}
// Start worker api server
eg := errgroup.Group{}
ready := make(chan error)
eg.Go(func() error {
return grpcutil.Serve(
func(s *grpc.Server) {
worker.RegisterWorkerServer(s, apiServer)
close(ready)
},
grpcutil.ServeOptions{
Version: version.Version,
MaxMsgSize: grpcutil.MaxMsgSize,
},
grpcutil.ServeEnv{
GRPCPort: client.PPSWorkerPort,
},
)
})
// Wait until server is ready, then put our IP address into etcd, so pachd can
// discover us
<-ready
key := path.Join(appEnv.PPSPrefix, "workers", workerRcName, appEnv.PPSWorkerIP)
// Prepare to write "key" into etcd by creating lease -- if worker dies, our
// IP will be removed from etcd
ctx, cancel := context.WithTimeout(pachClient.Ctx(), 10*time.Second)
defer cancel()
resp, err := etcdClient.Grant(ctx, 10 /* seconds */)
if err != nil {
return fmt.Errorf("error granting lease: %v", err)
}
// keepalive forever
if _, err := etcdClient.KeepAlive(context.Background(), resp.ID); err != nil {
return fmt.Errorf("error with KeepAlive: %v", err)
}
// Actually write "key" into etcd
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) // new ctx
defer cancel()
if _, err := etcdClient.Put(ctx, key, "", etcd.WithLease(resp.ID)); err != nil {
return fmt.Errorf("error putting IP address: %v", err)
}
// If server ever exits, return error
return eg.Wait()
}