/
server.go
265 lines (222 loc) · 7.58 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
package principal
import (
context "context"
"crypto/tls"
"fmt"
"net/http"
"sync"
"time"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
"github.com/jannfis/argocd-agent/internal/auth"
"github.com/jannfis/argocd-agent/internal/backend/kubernetes"
"github.com/jannfis/argocd-agent/internal/event"
appinformer "github.com/jannfis/argocd-agent/internal/informer/application"
"github.com/jannfis/argocd-agent/internal/issuer"
"github.com/jannfis/argocd-agent/internal/manager/application"
"github.com/jannfis/argocd-agent/internal/metrics"
"github.com/jannfis/argocd-agent/internal/queue"
"github.com/jannfis/argocd-agent/internal/tlsutil"
"github.com/jannfis/argocd-agent/internal/version"
"github.com/jannfis/argocd-agent/pkg/types"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
type Server struct {
options *ServerOptions
tlsConfig *tls.Config
listener *Listener
server *http.Server
grpcServer *grpc.Server
authMethods *auth.Methods
queues *queue.SendRecvQueues
namespace string
issuer issuer.Issuer
noauth map[string]bool // noauth contains endpoints accessible without authentication
ctx context.Context
ctxCancel context.CancelFunc
appManager *application.ApplicationManager
appInformer *appinformer.AppInformer
watchLock sync.RWMutex
clientMap map[string]string
namespaceMap map[string]types.AgentMode
clientLock sync.RWMutex
events *event.EventSource
version *version.Version
}
// noAuthEndpoints is a list of endpoints that are available without the need
// for the request to be authenticated.
var noAuthEndpoints = map[string]bool{
"/versionapi.Version/Version": true,
"/authapi.Authentication/Authenticate": true,
}
const waitForSyncedDuration = 1 * time.Second
func NewServer(ctx context.Context, appClient appclientset.Interface, namespace string, opts ...ServerOption) (*Server, error) {
s := &Server{
options: defaultOptions(),
queues: queue.NewSendRecvQueues(),
namespace: namespace,
noauth: noAuthEndpoints,
version: version.New("argocd-agent", "principal"),
}
s.ctx, s.ctxCancel = context.WithCancel(ctx)
for _, o := range opts {
err := o(s)
if err != nil {
return nil, err
}
}
if s.authMethods == nil {
s.authMethods = auth.NewMethods()
}
var err error
if s.options.signingKey == nil {
return nil, fmt.Errorf("unexpected missing JWT signing key")
}
s.issuer, err = issuer.NewIssuer("argocd-agent-server", issuer.WithRSAPrivateKey(s.options.signingKey))
if err != nil {
return nil, err
}
informerOpts := []appinformer.AppInformerOption{
appinformer.WithNamespaces(s.options.namespaces...),
appinformer.WithNewAppCallback(s.newAppCallback),
appinformer.WithUpdateAppCallback(s.updateAppCallback),
appinformer.WithDeleteAppCallback(s.deleteAppCallback),
}
managerOpts := []application.ApplicationManagerOption{
application.WithAllowUpsert(true),
}
if s.options.metricsPort > 0 {
informerOpts = append(informerOpts, appinformer.WithMetrics(metrics.NewApplicationWatcherMetrics()))
managerOpts = append(managerOpts, application.WithMetrics(metrics.NewApplicationClientMetrics()))
}
s.appInformer = appinformer.NewAppInformer(s.ctx, appClient,
s.namespace,
informerOpts...,
)
s.appManager = application.NewApplicationManager(kubernetes.NewKubernetesBackend(appClient, s.namespace, s.appInformer, true), s.namespace,
managerOpts...,
)
s.clientMap = map[string]string{
`{"clientID":"argocd","mode":"autonomous"}`: "argocd",
}
s.namespaceMap = map[string]types.AgentMode{
"argocd": types.AgentModeAutonomous,
}
return s, nil
}
// Start starts the Server s and its listeners in their own go routines. Any
// error during startup, before the go routines are running, will be returned
// immediately. Errors during the runtime will be propagated via errch.
func (s *Server) Start(ctx context.Context, errch chan error) error {
log().Infof("Starting %s (server) v%s (ns=%s, allowed_namespaces=%v)", s.version.Name(), s.version.Version(), s.namespace, s.options.namespaces)
if s.options.serveGRPC {
if err := s.serveGRPC(ctx, errch); err != nil {
return err
}
}
if s.options.metricsPort > 0 {
metrics.StartMetricsServer(metrics.WithListener("", s.options.metricsPort))
}
err := s.StartEventProcessor(s.ctx)
if err != nil {
return nil
}
// The application informer lives in its own go routine
go func() {
s.appManager.Application.StartInformer(ctx)
}()
s.events = event.NewEventSource(s.options.serverName)
if err := s.appInformer.EnsureSynced(waitForSyncedDuration); err != nil {
return fmt.Errorf("unable to sync informer: %v", err)
}
log().Infof("Informer synced and ready")
return nil
}
// Shutdown shuts down the server s. If no server is running, or shutting down
// results in an error, an error is returned.
func (s *Server) Shutdown() error {
var err error
log().Debugf("Shutdown requested")
// Cancel server-wide context
s.ctxCancel()
if s.server != nil {
if s.options.gracePeriod > 0 {
ctx, cancel := context.WithTimeout(context.Background(), s.options.gracePeriod)
defer cancel()
log().Infof("Server shutdown requested, allowing client connections to shut down for %v", s.options.gracePeriod)
err = s.server.Shutdown(ctx)
} else {
log().Infof("Closing server")
err = s.server.Close()
}
s.server = nil
} else if s.grpcServer != nil {
log().Infof("Shutting down server")
s.grpcServer.Stop()
s.grpcServer = nil
} else {
return fmt.Errorf("no server running")
}
return err
}
// loadTLSConfig will configure and return a tls.Config object that can be
// used by the server's listener. It will use options set in the server for
// configuring the returned object.
func (s *Server) loadTLSConfig() (*tls.Config, error) {
var cert tls.Certificate
var err error
if s.options.tlsCertPath != "" && s.options.tlsKeyPath != "" {
cert, err = tlsutil.TlsCertFromFile(s.options.tlsCertPath, s.options.tlsKeyPath, false)
} else if s.options.tlsCert != nil && s.options.tlsKey != nil {
cert, err = tlsutil.TlsCertFromX509(s.options.tlsCert, s.options.tlsKey)
}
if err != nil {
return nil, fmt.Errorf("unable to load TLS config: %w", err)
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
}
// If the server is configured to require client certificates, set up the
// TLS config accordingly. On verification, we store the common name of
// the validated certificate in the server's context, so we can access it
// later on.
if s.options.requireClientCerts {
log().Infof("This server will require TLS client certs as part of authentication")
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
tlsConfig.ClientCAs = s.options.rootCa
}
return tlsConfig, nil
}
// Listener returns the listener of Server s
func (s *Server) Listener() *Listener {
return s.listener
}
// TokenIssuer returns the token issuer of Server s
func (s *Server) TokenIssuer() issuer.Issuer {
return s.issuer
}
func log() *logrus.Entry {
return logrus.WithField("module", "server")
}
func (s *Server) AuthMethods() *auth.Methods {
return s.authMethods
}
func (s *Server) Queues() *queue.SendRecvQueues {
return s.queues
}
func (s *Server) AppManager() *application.ApplicationManager {
return s.appManager
}
func (s *Server) agentMode(namespace string) types.AgentMode {
s.clientLock.RLock()
defer s.clientLock.RUnlock()
if mode, ok := s.namespaceMap[namespace]; ok {
return mode
}
return types.AgentModeUnknown
}
func (s *Server) setAgentMode(namespace string, mode types.AgentMode) {
s.clientLock.Lock()
defer s.clientLock.Unlock()
s.namespaceMap[namespace] = mode
}