forked from pachyderm/pachyderm
/
main.go
198 lines (178 loc) · 6.91 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package main
import (
"context"
"fmt"
"io"
"net/http"
_ "net/http/pprof"
"os"
"path"
"path/filepath"
"time"
"golang.org/x/sync/errgroup"
etcd "github.com/coreos/etcd/clientv3"
"github.com/pachyderm/pachyderm/src/client"
debugclient "github.com/pachyderm/pachyderm/src/client/debug"
"github.com/pachyderm/pachyderm/src/client/pkg/grpcutil"
"github.com/pachyderm/pachyderm/src/client/pkg/tracing"
"github.com/pachyderm/pachyderm/src/client/pps"
"github.com/pachyderm/pachyderm/src/client/version"
"github.com/pachyderm/pachyderm/src/client/version/versionpb"
debugserver "github.com/pachyderm/pachyderm/src/server/debug/server"
"github.com/pachyderm/pachyderm/src/server/pkg/cmdutil"
logutil "github.com/pachyderm/pachyderm/src/server/pkg/log"
"github.com/pachyderm/pachyderm/src/server/pkg/ppsutil"
"github.com/pachyderm/pachyderm/src/server/pkg/serviceenv"
"github.com/pachyderm/pachyderm/src/server/worker"
"google.golang.org/grpc"
log "github.com/sirupsen/logrus"
)
func main() {
log.SetFormatter(logutil.FormatterFunc(logutil.Pretty))
// Copy the contents of /pach-bin/certs into /etc/ssl/certs. Don't return an
// error (which would cause 'Walk()' to exit early) but do record if any certs
// are known to be missing so we can inform the user
copyErr := false
if err := filepath.Walk("/pach-bin/certs", func(inPath string, info os.FileInfo, err error) error {
if err != nil {
log.Warnf("skipping \"%s\", could not stat path: %v", inPath, err)
copyErr = true
return nil // Don't try and fix any errors encountered by Walk() itself
}
if info.IsDir() {
return nil // We'll just copy the children of any directories when we traverse them
}
// Open input file (src)
in, err := os.OpenFile(inPath, os.O_RDONLY, 0)
if err != nil {
log.Warnf("could not read \"%s\": %v", inPath, err)
copyErr = true
return nil
}
defer in.Close()
// Create output file (dest) and open for writing
outRelPath, err := filepath.Rel("/pach-bin/certs", inPath)
if err != nil {
log.Warnf("skipping \"%s\", could not extract relative path: %v", inPath, err)
copyErr = true
return nil
}
outPath := filepath.Join("/etc/ssl/certs", outRelPath)
outDir := filepath.Dir(outPath)
if err := os.MkdirAll(outDir, 0755); err != nil {
log.Warnf("skipping \"%s\", could not create directory \"%s\": %v", inPath, outDir, err)
copyErr = true
return nil
}
out, err := os.OpenFile(outPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, info.Mode())
if err != nil {
log.Warnf("skipping \"%s\", could not create output file \"%s\": %v", inPath, outPath, err)
copyErr = true
return nil
}
defer out.Close()
// Copy src -> dest
if _, err := io.Copy(out, in); err != nil {
log.Warnf("could not copy \"%s\" to \"%s\": %v", inPath, outPath, err)
copyErr = true
return nil
}
return nil
}); err != nil {
// Should never happen
log.Warnf("could not copy /pach-bin/certs to /etc/ssl/certs: %v", err)
}
if copyErr {
log.Warnf("Errors were encountered while copying /pach-bin/certs to /etc/ssl/certs (see above--might result in subsequent SSL/TLS errors)")
}
cmdutil.Main(do, &serviceenv.WorkerFullConfiguration{})
}
// getPipelineInfo gets the PipelineInfo proto describing the pipeline that this
// worker is part of.
// getPipelineInfo has the side effect of adding auth to the passed pachClient
// which is necessary to get the PipelineInfo from pfs.
func getPipelineInfo(pachClient *client.APIClient, env *serviceenv.ServiceEnv) (*pps.PipelineInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
resp, err := env.GetEtcdClient().Get(ctx, path.Join(env.PPSEtcdPrefix, "pipelines", env.PPSPipelineName))
if err != nil {
return nil, err
}
if len(resp.Kvs) != 1 {
return nil, fmt.Errorf("expected to find 1 pipeline (%s), got %d: %v", env.PPSPipelineName, len(resp.Kvs), resp)
}
var pipelinePtr pps.EtcdPipelineInfo
if err := pipelinePtr.Unmarshal(resp.Kvs[0].Value); err != nil {
return nil, err
}
pachClient.SetAuthToken(pipelinePtr.AuthToken)
// Notice we use the SpecCommitID from our env, not from etcd. This is
// because the value in etcd might get updated while the worker pod is
// being created and we don't want to run the transform of one version of
// the pipeline in the image of a different verison.
pipelinePtr.SpecCommit.ID = env.PPSSpecCommitID
return ppsutil.GetPipelineInfo(pachClient, &pipelinePtr)
}
func do(config interface{}) error {
tracing.InstallJaegerTracerFromEnv() // must run before InitWithKube
env := serviceenv.InitServiceEnv(serviceenv.NewConfiguration(config))
// Expose PProf service
go func() {
log.Println(http.ListenAndServe(fmt.Sprintf(":%d", env.PProfPort), nil))
}()
// Construct a client that connects to the sidecar.
pachClient := env.GetPachClient(context.Background())
// Get etcd client, so we can register our IP (so pachd can discover us)
pipelineInfo, err := getPipelineInfo(pachClient, env)
if err != nil {
return fmt.Errorf("error getting pipelineInfo: %v", err)
}
// Construct worker API server.
workerRcName := ppsutil.PipelineRcName(pipelineInfo.Pipeline.Name, pipelineInfo.Version)
apiServer, err := worker.NewAPIServer(pachClient, env.GetEtcdClient(), env.PPSEtcdPrefix, pipelineInfo, env.PodName, env.Namespace, env.StorageRoot)
if err != nil {
return err
}
// Start worker api server
eg := errgroup.Group{}
ready := make(chan error)
eg.Go(func() error {
return grpcutil.Serve(
grpcutil.ServerOptions{
MaxMsgSize: grpcutil.MaxMsgSize,
Port: env.PPSWorkerPort,
RegisterFunc: func(s *grpc.Server) error {
defer close(ready)
worker.RegisterWorkerServer(s, apiServer)
versionpb.RegisterAPIServer(s, version.NewAPIServer(version.Version, version.APIServerOptions{}))
debugclient.RegisterDebugServer(s, debugserver.NewDebugServer(env.PodName, env.GetEtcdClient(), env.PPSEtcdPrefix, env.PPSWorkerPort))
return nil
},
},
)
})
// Wait until server is ready, then put our IP address into etcd, so pachd can
// discover us
<-ready
key := path.Join(env.PPSEtcdPrefix, worker.WorkerEtcdPrefix, workerRcName, env.PPSWorkerIP)
// Prepare to write "key" into etcd by creating lease -- if worker dies, our
// IP will be removed from etcd
ctx, cancel := context.WithTimeout(pachClient.Ctx(), 10*time.Second)
defer cancel()
resp, err := env.GetEtcdClient().Grant(ctx, 10 /* seconds */)
if err != nil {
return fmt.Errorf("error granting lease: %v", err)
}
// keepalive forever
if _, err := env.GetEtcdClient().KeepAlive(context.Background(), resp.ID); err != nil {
return fmt.Errorf("error with KeepAlive: %v", err)
}
// Actually write "key" into etcd
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) // new ctx
defer cancel()
if _, err := env.GetEtcdClient().Put(ctx, key, "", etcd.WithLease(resp.ID)); err != nil {
return fmt.Errorf("error putting IP address: %v", err)
}
// If server ever exits, return error
return eg.Wait()
}