-
Notifications
You must be signed in to change notification settings - Fork 566
/
server.go
144 lines (133 loc) · 5.15 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package server
import (
"context"
"path"
"github.com/pachyderm/pachyderm/v2/src/internal/client"
"github.com/pachyderm/pachyderm/v2/src/internal/collection"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
loki "github.com/pachyderm/pachyderm/v2/src/internal/lokiutil/client"
"github.com/pachyderm/pachyderm/v2/src/internal/metrics"
"github.com/pachyderm/pachyderm/v2/src/internal/pachconfig"
"github.com/pachyderm/pachyderm/v2/src/internal/pachsql"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"github.com/pachyderm/pachyderm/v2/src/internal/ppsdb"
"github.com/pachyderm/pachyderm/v2/src/internal/serviceenv"
"github.com/pachyderm/pachyderm/v2/src/internal/task"
txnenv "github.com/pachyderm/pachyderm/v2/src/internal/transactionenv"
authserver "github.com/pachyderm/pachyderm/v2/src/server/auth"
pfsserver "github.com/pachyderm/pachyderm/v2/src/server/pfs"
ppsiface "github.com/pachyderm/pachyderm/v2/src/server/pps"
etcd "go.etcd.io/etcd/client/v3"
"k8s.io/client-go/kubernetes"
)
// Env contains the dependencies needed to create an API Server
type Env struct {
DB *pachsql.DB
TxnEnv *txnenv.TransactionEnv
Listener collection.PostgresListener
KubeClient kubernetes.Interface
EtcdClient *etcd.Client
EtcdPrefix string
TaskService task.Service
// TODO: make this just a *loki.Client
// This is not a circular dependency
GetLokiClient func() (*loki.Client, error)
PFSServer pfsserver.APIServer
AuthServer authserver.APIServer
// TODO: This should just be a pach client for the needed services.
// serviceenv blocks until everything is done though, so we can't get it until after setup is done.
GetPachClient func(context.Context) *client.APIClient
Reporter *metrics.Reporter
BackgroundContext context.Context
Config pachconfig.Configuration
PachwInSidecar bool
}
func EnvFromServiceEnv(senv serviceenv.ServiceEnv, txnEnv *txnenv.TransactionEnv, reporter *metrics.Reporter) Env {
etcdPrefix := path.Join(senv.Config().EtcdPrefix, senv.Config().PPSEtcdPrefix)
return Env{
DB: senv.GetDBClient(),
TxnEnv: txnEnv,
Listener: senv.GetPostgresListener(),
KubeClient: senv.GetKubeClient(),
EtcdClient: senv.GetEtcdClient(),
EtcdPrefix: etcdPrefix,
TaskService: senv.GetTaskService(etcdPrefix),
GetLokiClient: senv.GetLokiClient,
PFSServer: senv.PfsServer(),
AuthServer: senv.AuthServer(),
GetPachClient: senv.GetPachClient,
Reporter: reporter,
BackgroundContext: pctx.Child(senv.Context(), "PPS"),
Config: *senv.Config(),
PachwInSidecar: senv.Config().PachwInSidecars,
}
}
// NewAPIServer creates an APIServer and runs the master loop in the background
func NewAPIServer(env Env) (ppsiface.APIServer, error) {
srv, err := NewAPIServerNoMaster(env)
if err != nil {
return nil, err
}
apiServer := (srv).(*apiServer)
if env.Config.EnablePreflightChecks {
apiServer.validateKube(env.BackgroundContext)
} else {
log.Error(env.BackgroundContext, "Preflight checks are disabled. This is not recommended.")
}
go apiServer.master(env.BackgroundContext)
go apiServer.worker(env.BackgroundContext)
return apiServer, nil
}
// NewAPIServerNoMaster creates an APIServer without running the master
// loop in the background.
func NewAPIServerNoMaster(env Env) (ppsiface.APIServer, error) {
config := env.Config
apiServer := &apiServer{
env: env,
txnEnv: env.TxnEnv,
etcdPrefix: env.EtcdPrefix,
namespace: config.Namespace,
workerImage: config.WorkerImage,
workerSidecarImage: config.WorkerSidecarImage,
workerImagePullPolicy: config.WorkerImagePullPolicy,
storageRoot: config.StorageRoot,
storageBackend: config.StorageBackend,
storageHostPath: config.StorageHostPath,
imagePullSecrets: config.ImagePullSecrets,
reporter: env.Reporter,
workerUsesRoot: config.WorkerUsesRoot,
pipelines: ppsdb.Pipelines(env.DB, env.Listener),
jobs: ppsdb.Jobs(env.DB, env.Listener),
clusterDefaults: ppsdb.ClusterDefaults(env.DB, env.Listener),
workerGrpcPort: config.PPSWorkerPort,
port: config.Port,
peerPort: config.PeerPort,
gcPercent: config.GCPercent,
}
return apiServer, nil
}
// NewSidecarAPIServer creates an APIServer that has limited functionalities
// and is meant to be run as a worker sidecar. It cannot, for instance,
// create pipelines.
func NewSidecarAPIServer(
env Env,
namespace string,
workerGrpcPort uint16,
peerPort uint16,
) (*apiServer, error) {
apiServer := &apiServer{
env: env,
txnEnv: env.TxnEnv,
etcdPrefix: env.EtcdPrefix,
reporter: env.Reporter,
namespace: namespace,
workerUsesRoot: true,
pipelines: ppsdb.Pipelines(env.DB, env.Listener),
jobs: ppsdb.Jobs(env.DB, env.Listener),
clusterDefaults: ppsdb.ClusterDefaults(env.DB, env.Listener),
workerGrpcPort: workerGrpcPort,
peerPort: peerPort,
}
go apiServer.ServeSidecarS3G(pctx.Child(env.BackgroundContext, "s3gateway", pctx.WithServerID()))
return apiServer, nil
}