-
Notifications
You must be signed in to change notification settings - Fork 566
/
api_server.go
1852 lines (1641 loc) · 65.5 KB
/
api_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package server
import (
"context"
"database/sql"
"fmt"
"net/http"
"os"
"path"
"strings"
"time"
"github.com/ghodss/yaml"
"google.golang.org/protobuf/proto"
"go.uber.org/zap"
"github.com/pachyderm/pachyderm/v2/src/auth"
enterpriseclient "github.com/pachyderm/pachyderm/v2/src/enterprise"
"github.com/pachyderm/pachyderm/v2/src/identity"
"github.com/pachyderm/pachyderm/v2/src/internal/authdb"
"github.com/pachyderm/pachyderm/v2/src/internal/backoff"
"github.com/pachyderm/pachyderm/v2/src/internal/client"
col "github.com/pachyderm/pachyderm/v2/src/internal/collection"
"github.com/pachyderm/pachyderm/v2/src/internal/dbutil"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/grpcutil"
"github.com/pachyderm/pachyderm/v2/src/internal/keycache"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
internalauth "github.com/pachyderm/pachyderm/v2/src/internal/middleware/auth"
"github.com/pachyderm/pachyderm/v2/src/internal/pachsql"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"github.com/pachyderm/pachyderm/v2/src/internal/protoutil"
txnenv "github.com/pachyderm/pachyderm/v2/src/internal/transactionenv"
"github.com/pachyderm/pachyderm/v2/src/internal/transactionenv/txncontext"
"github.com/pachyderm/pachyderm/v2/src/internal/uuid"
"github.com/pachyderm/pachyderm/v2/src/pfs"
"github.com/pachyderm/pachyderm/v2/src/pps"
)
const (
// oidc etcd object prefix
oidcAuthnPrefix = "/oidc-authns"
// configKey is a key (in etcd, in the config collection) that maps to the
// auth configuration. This is the only key in that collection (due to
// implemenation details of our config library, we can't use an empty key)
configKey = "config"
// the length of interval between expired auth token cleanups
cleanupIntervalHours = 24
)
// DefaultOIDCConfig is the default config for the auth API server
var DefaultOIDCConfig = auth.OIDCConfig{}
// apiServer implements the public interface of the Pachyderm auth system,
// including all RPCs defined in the protobuf spec.
type apiServer struct {
auth.UnimplementedAPIServer
env Env
configCache *keycache.Cache
clusterRoleBindingCache *keycache.Cache
// roleBindings is a collection of resource name -> role binding mappings.
roleBindings col.PostgresCollection
// members is a collection of username -> groups mappings.
members col.PostgresCollection
// groups is a collection of group -> usernames mappings.
groups col.PostgresCollection
// collection containing the auth config (under the key configKey)
authConfig col.PostgresCollection
// oidcStates contains the set of OIDC nonces for requests that are in progress
oidcStates col.EtcdCollection
// public addresses the fact that pachd in full mode initializes two auth
// servers: one that exposes a public API, possibly over TLS, and one that
// exposes a private API, for internal services. Only one of these can launch
// the OIDC callback web server.
public bool
// watchesEnabled controls whether we cache the auth config and cluster role bindings
// in the auth service, or whether we look them up each time. Watches are expensive in
// postgres, so we can't afford to have each sidecar run watches. Pipelines always have
// direct access to a repo anyways, so the cluster role bindings don't affect their access,
// and the OIDC server doesn't run in the sidecar so the config doesn't matter.
watchesEnabled bool
}
// NewAuthServer returns an implementation of auth.APIServer.
func NewAuthServer(env Env, public, requireNoncriticalServers, watchesEnabled bool) (*apiServer, error) {
oidcStates := col.NewEtcdCollection(
env.EtcdClient,
path.Join(oidcAuthnPrefix),
nil,
&auth.SessionInfo{},
nil,
nil,
)
s := &apiServer{
env: env,
authConfig: authdb.AuthConfigCollection(env.DB, env.Listener),
roleBindings: authdb.RoleBindingCollection(env.DB, env.Listener),
members: authdb.MembersCollection(env.DB, env.Listener),
groups: authdb.GroupsCollection(env.DB, env.Listener),
oidcStates: oidcStates,
public: public,
watchesEnabled: watchesEnabled,
}
if public {
// start OIDC service (won't respond to anything until config is set)
go waitForError("OIDC HTTP Server", requireNoncriticalServers, s.serveOIDC)
}
if watchesEnabled {
s.configCache = keycache.NewCache(env.BackgroundContext, s.authConfig.ReadOnly(env.BackgroundContext), configKey, &DefaultOIDCConfig)
s.clusterRoleBindingCache = keycache.NewCache(env.BackgroundContext, s.roleBindings.ReadOnly(env.BackgroundContext), auth.ClusterRoleBindingKey, &auth.RoleBinding{})
// Watch for new auth config options
go s.configCache.Watch()
// Watch for changes to the cluster role binding
go s.clusterRoleBindingCache.Watch()
}
if err := s.deleteExpiredTokensRoutine(env.BackgroundContext); err != nil {
return nil, err
}
return s, nil
}
// ActivationScope is an additional service to activate auth for.
type ActivationScope int
const (
ActivationScopePFS ActivationScope = iota // Activate auth for PFS.
ActivationScopePPS // Activate auth for PPS.
)
// String implements fmt.Stringer.
func (s ActivationScope) String() string {
switch s { //exhaustive:enforce
case ActivationScopePFS:
return "PFS"
case ActivationScopePPS:
return "PPS"
}
panic(fmt.Sprintf("invalid auth scope %d", int(s)))
}
// ActivateAuthEverywhere activates auth, and PPS and PFS auth, in a single transaction. It is only
// exposed publicly for testing; do not call it from outside this package.
func (a *apiServer) ActivateAuthEverywhere(ctx context.Context, scopes []ActivationScope, rootToken string) error {
if err := a.env.TxnEnv.WithWriteContext(ctx, func(txCtx *txncontext.TransactionContext) error {
log.Debug(ctx, "attempting to activate auth")
if _, err := a.activateInTransaction(ctx, txCtx, &auth.ActivateRequest{
RootToken: rootToken,
}); err != nil {
if !errors.Is(err, auth.ErrAlreadyActivated) {
return errors.Wrap(err, "activate auth")
}
log.Debug(ctx, "auth already active; rotating root token")
_, err := a.rotateRootTokenInTransaction(txCtx,
&auth.RotateRootTokenRequest{
RootToken: rootToken,
})
return errors.Wrap(err, "rotate root token")
}
for _, s := range scopes {
switch s { //exhaustive:enforce
case ActivationScopePFS:
log.Debug(ctx, "attempting to activate PFS auth")
if _, err := a.env.GetPfsServer().ActivateAuthInTransaction(ctx, txCtx, &pfs.ActivateAuthRequest{}); err != nil {
return errors.Wrap(err, "activate auth for pfs")
}
case ActivationScopePPS:
log.Debug(ctx, "attempting to activate PPS auth")
if _, err := a.env.GetPpsServer().ActivateAuthInTransaction(txCtx, &pps.ActivateAuthRequest{}); err != nil {
return errors.Wrap(err, "activate auth for pps")
}
}
}
return nil
}); err != nil {
return errors.Wrapf(err, "activate auth everywhere (%v)", scopes)
}
log.Info(ctx, "activated auth ok", zap.Stringers("scopes", scopes))
return nil
}
func (a *apiServer) EnvBootstrap(ctx context.Context) error {
if !a.env.Config.ActivateAuth {
return nil
}
log.Info(ctx, "Started to configure auth server via environment")
ctx = internalauth.AsInternalUser(ctx, authdb.InternalUser)
// handle auth activation
if rootToken := a.env.Config.AuthRootToken; rootToken != "" {
var scopes []ActivationScope
if a.env.Config.PachdSpecificConfiguration != nil {
// If this is pachd and not the enterprise server, activate auth for PFS and
// PPS.
scopes = []ActivationScope{ActivationScopePFS, ActivationScopePPS}
}
if err := a.ActivateAuthEverywhere(ctx, scopes, rootToken); err != nil {
return errors.Wrapf(err, "activate auth via environment")
}
}
if err := func() error {
// handle oidc clients & this cluster's auth config
if a.env.Config.AuthConfig != "" && a.env.Config.IdentityClients != "" {
log.Info(ctx, "attempting to add or update oidc clients")
var config auth.OIDCConfig
var clients []*identity.OIDCClient
if err := yaml.Unmarshal([]byte(a.env.Config.AuthConfig), &config); err != nil {
return errors.Wrapf(err, "unmarshal auth config: %q", a.env.Config.AuthConfig)
}
config.ClientSecret = a.env.Config.AuthClientSecret
if err := yaml.Unmarshal([]byte(a.env.Config.IdentityClients), &clients); err != nil {
return errors.Wrapf(err, "unmarshal identity clients: %q", a.env.Config.IdentityClients)
}
if a.env.Config.IdentityAdditionalClients != "" {
var extras []*identity.OIDCClient
if err := yaml.Unmarshal([]byte(a.env.Config.IdentityAdditionalClients), &extras); err != nil {
return errors.Wrapf(err, "unmarshal extra identity clients: %q", a.env.Config.IdentityAdditionalClients)
}
clients = append(clients, extras...)
}
for _, c := range clients {
log.Info(ctx, "adding oidc client", zap.String("id", c.Id), zap.Bool("via_enterprise_server", a.env.Config.EnterpriseMember))
if c.Id == config.ClientId { // c represents pachd
c.Secret = config.ClientSecret
if a.env.Config.TrustedPeers != "" {
var tps []string
if err := yaml.Unmarshal([]byte(a.env.Config.TrustedPeers), &tps); err != nil {
return errors.Wrapf(err, "unmarshal trusted peers: %q", a.env.Config.TrustedPeers)
}
c.TrustedPeers = append(c.TrustedPeers, tps...)
log.Info(ctx, "adding additional pachd trusted peers configured via environment", zap.Strings("trusted_peers", c.TrustedPeers), zap.String("id", c.Id))
}
}
if c.Id == a.env.Config.ConsoleOAuthID {
c.Secret = a.env.Config.ConsoleOAuthSecret
}
if c.Id == a.env.Config.DeterminedOAuthID {
c.Secret = a.env.Config.DeterminedOAuthSecret
}
if !a.env.Config.EnterpriseMember {
if _, err := a.env.GetIdentityServer().CreateOIDCClient(ctx, &identity.CreateOIDCClientRequest{Client: c}); err != nil {
if !identity.IsErrAlreadyExists(err) {
return errors.Wrapf(err, "create oidc client %q", c.Name)
}
// recreate the client because updating the client secret is not supported by the dex API
if _, err := a.env.GetIdentityServer().DeleteOIDCClient(ctx, &identity.DeleteOIDCClientRequest{Id: c.Id}); err != nil {
return errors.Wrapf(err, "delete oidc client %q", c.Name)
}
if _, err := a.env.GetIdentityServer().CreateOIDCClient(ctx, &identity.CreateOIDCClientRequest{Client: c}); err != nil {
return errors.Wrapf(err, "update oidc client %q", c.Name)
}
}
} else {
ec, err := client.NewFromURI(ctx, a.env.Config.EnterpriseServerAddress)
if err != nil {
return errors.Wrapf(err, "connect to enterprise server")
}
ec.SetAuthToken(a.env.Config.EnterpriseServerToken)
if _, err = ec.IdentityAPIClient.CreateOIDCClient(ec.Ctx(), &identity.CreateOIDCClientRequest{Client: c}); err != nil {
if !identity.IsErrAlreadyExists(err) {
return errors.Wrapf(err, "create oidc client %q", c.Name)
}
// recreate the client because updating the client secret is not supported by the dex API
if _, err := ec.IdentityAPIClient.DeleteOIDCClient(ec.Ctx(), &identity.DeleteOIDCClientRequest{Id: c.Id}); err != nil {
return errors.Wrapf(err, "delete oidc client %q", c.Name)
}
if _, err := ec.IdentityAPIClient.CreateOIDCClient(ec.Ctx(), &identity.CreateOIDCClientRequest{Client: c}); err != nil {
return errors.Wrapf(err, "update oidc client %q", c.Name)
}
}
}
}
log.Info(ctx, "setting auth configuration", log.Proto("config", &config))
if _, err := a.SetConfiguration(ctx, &auth.SetConfigurationRequest{Configuration: &config}); err != nil {
return err
}
}
// cluster role bindings
if a.env.Config.AuthClusterRoleBindings != "" {
log.Info(ctx, "setting up cluster role bindings")
var roleBinding map[string][]string
if err := yaml.Unmarshal([]byte(a.env.Config.AuthClusterRoleBindings), &roleBinding); err != nil {
return errors.Wrapf(err, "unmarshal auth cluster role bindings: %q", a.env.Config.AuthClusterRoleBindings)
}
existing, err := a.GetRoleBinding(ctx, &auth.GetRoleBindingRequest{
Resource: &auth.Resource{Type: auth.ResourceType_CLUSTER},
})
if err != nil {
return errors.Wrapf(err, "get cluster role bindings")
}
for p := range existing.Binding.Entries {
// `pach:` user role bindings cannot be modified
if strings.HasPrefix(p, auth.PachPrefix) || strings.HasPrefix(p, auth.InternalPrefix) {
log.Info(ctx, "skipping role binding because pach: role bindings cannot be modified", zap.String("principal", p))
continue
}
if _, ok := roleBinding[p]; !ok {
rsc := &auth.Resource{Type: auth.ResourceType_CLUSTER}
log.Debug(ctx, "unsetting existing role binding", log.Proto("resource", rsc), zap.String("principal", p))
if _, err := a.ModifyRoleBinding(ctx, &auth.ModifyRoleBindingRequest{
Resource: rsc,
Principal: p,
}); err != nil {
return errors.Wrapf(err, "unset principal cluster role bindings for principal %q", p)
}
}
}
for p, r := range roleBinding {
rsc := &auth.Resource{Type: auth.ResourceType_CLUSTER}
log.Info(ctx, "modifying existing role binding", log.Proto("resource", rsc), zap.String("principal", p), zap.Strings("roles", r))
if _, err := a.ModifyRoleBinding(ctx, &auth.ModifyRoleBindingRequest{
Resource: rsc,
Principal: p,
Roles: r,
}); err != nil {
return errors.Wrapf(err, "modify cluster role bindings")
}
}
}
return nil
}(); err != nil {
return errors.Wrapf(errors.EnsureStack(err), "configure the auth server via environment")
}
log.Info(ctx, "Successfully configured auth server via environment")
return nil
}
func waitForError(name string, required bool, cb func() error) {
if err := cb(); !errors.Is(err, http.ErrServerClosed) {
if required {
log.Error(pctx.TODO(), "error setting up and/or running server (use --require-critical-servers-only deploy flag to ignore errors from noncritical servers)", zap.String("server", name), zap.Error(err))
os.Exit(20)
}
log.Error(pctx.TODO(), "error setting up and/or running server", zap.String("server", name), zap.Error(err))
}
}
// getClusterRoleBinding attempts to get the current cluster role bindings,
// and returns an error if auth is not activated. This can require hitting
// postgres if watches are not enabled (in the worker sidecar).
func (a *apiServer) getClusterRoleBinding(ctx context.Context) (*auth.RoleBinding, error) {
if a.watchesEnabled {
bindings, ok := a.clusterRoleBindingCache.Load().(*auth.RoleBinding)
if !ok {
return nil, errors.New("cached cluster binding had unexpected type")
}
if len(bindings.Entries) == 0 {
return nil, auth.ErrNotActivated
}
return bindings, nil
}
var binding auth.RoleBinding
if err := a.roleBindings.ReadOnly(ctx).Get(auth.ClusterRoleBindingKey, &binding); err != nil {
if col.IsErrNotFound(err) {
return nil, auth.ErrNotActivated
}
return nil, errors.EnsureStack(err)
}
return &binding, nil
}
func (a *apiServer) isActive(ctx context.Context) error {
_, err := a.getClusterRoleBinding(ctx)
return err
}
func (a *apiServer) isActiveInTransaction(txnCtx *txncontext.TransactionContext) error {
_, err := a.getClusterRoleBindingInTransaction(txnCtx)
return err
}
// getClusterRoleBinding attempts to get the current cluster role bindings,
// and returns an error if auth is not activated. This can require hitting
// postgres if watches are not enabled (in the worker sidecar).
func (a *apiServer) getClusterRoleBindingInTransaction(txnCtx *txncontext.TransactionContext) (*auth.RoleBinding, error) {
if a.watchesEnabled && !txnCtx.AuthBeingActivated.Load() {
bindings, ok := a.clusterRoleBindingCache.Load().(*auth.RoleBinding)
if !ok {
return nil, errors.New("cached cluster binding had unexpected type")
}
if bindings.Entries == nil {
return nil, auth.ErrNotActivated
}
return bindings, nil
}
var binding auth.RoleBinding
if err := a.roleBindings.ReadWrite(txnCtx.SqlTx).Get(auth.ClusterRoleBindingKey, &binding); err != nil {
if col.IsErrNotFound(err) {
return nil, auth.ErrNotActivated
}
return nil, errors.EnsureStack(err)
}
return &binding, nil
}
func (a *apiServer) getEnterpriseTokenState(ctx context.Context) (enterpriseclient.State, error) {
if a.env.GetEnterpriseServer() == nil {
return 0, errors.New("Enterprise Server not yet initialized")
}
resp, err := a.env.GetEnterpriseServer().GetState(ctx, &enterpriseclient.GetStateRequest{})
if err != nil {
return 0, errors.Wrapf(grpcutil.ScrubGRPC(err), "could not get Enterprise status")
}
return resp.State, nil
}
// expiredEnterpriseCheck enforces that if the cluster's enterprise token is
// expired, users cannot log in. The root token can be used to access the cluster.
func (a *apiServer) expiredEnterpriseCheck(ctx context.Context, username string) error {
state, err := a.getEnterpriseTokenState(ctx)
if err != nil {
return errors.Wrapf(err, "error confirming Pachyderm Enterprise token")
}
if state == enterpriseclient.State_ACTIVE {
return nil
}
if err = a.userHasExpiredClusterAccessCheck(ctx, username); err != nil {
return errors.Wrapf(err, "Pachyderm Enterprise is not active in this "+
"cluster (until Pachyderm Enterprise is re-activated or Pachyderm "+
"auth is deactivated, only cluster admins can perform any operations)")
}
return nil
}
func (a *apiServer) userHasExpiredClusterAccessCheck(ctx context.Context, username string) error {
// Root User, PPS Master, and any Pipeline keep cluster access
if username == auth.RootUser || strings.HasPrefix(username, auth.PipelinePrefix) || strings.HasPrefix(username, auth.InternalPrefix) {
return nil
}
// Any User with the Cluster Admin Role keeps cluster access
isAdmin, err := a.hasClusterRole(ctx, username, auth.ClusterAdminRole)
if err != nil {
return err
}
if !isAdmin {
return errors.Errorf("user: %v, is not priviledged to operate while Enterprise License is disabled", username)
}
return nil
}
func (a *apiServer) hasClusterRole(ctx context.Context, username string, role string) (bool, error) {
bindings, err := a.getClusterRoleBinding(ctx)
if err != nil {
return false, err
}
if roles, ok := bindings.Entries[username]; ok {
for r := range roles.Roles {
if r == role {
return true, nil
}
}
}
return false, nil
}
// Activate implements the protobuf auth.Activate RPC
func (a *apiServer) Activate(ctx context.Context, req *auth.ActivateRequest) (*auth.ActivateResponse, error) {
var resp *auth.ActivateResponse
if err := a.env.TxnEnv.WithWriteContext(ctx, func(txCtx *txncontext.TransactionContext) error {
var err error
resp, err = a.activateInTransaction(ctx, txCtx, req)
return err
}); err != nil {
return nil, err
}
// wait until the clusterRoleBinding watcher has updated the local cache
// (changing the activation state), so that Activate() is less likely to
// race with subsequent calls that expect auth to be activated.
if err := backoff.RetryUntilCancel(ctx, func() error {
if err := a.isActive(ctx); err != nil {
return errors.Errorf("auth activation hasn't fully propagated")
}
return nil
}, backoff.RetryEvery(100*time.Millisecond), nil); err != nil {
return nil, err
}
return resp, nil
}
func (a *apiServer) activateInTransaction(ctx context.Context, txCtx *txncontext.TransactionContext, req *auth.ActivateRequest) (*auth.ActivateResponse, error) {
// TODO(acohen4) 2.3: disable RPC if a.env.Config.AuthRootToken != ""
// If the cluster's Pachyderm Enterprise token isn't active, the auth system
// cannot be activated
state, err := a.getEnterpriseTokenState(ctx)
if err != nil {
return nil, errors.Wrapf(err, "error confirming Pachyderm Enterprise token")
}
if state != enterpriseclient.State_ACTIVE {
return nil, errors.Errorf("Pachyderm Enterprise is not active in this " +
"cluster, and the Pachyderm auth API is an Enterprise-level feature")
}
// Activating an already activated auth service should fail, because
// otherwise anyone can just activate the service again and set
// themselves as an admin.
if err := a.isActiveInTransaction(txCtx); err == nil {
return nil, auth.ErrAlreadyActivated
}
// If the token hash was in the request, use it and return an empty response.
// Otherwise generate a new random token.
pachToken := req.RootToken
if pachToken == "" {
pachToken = uuid.NewWithoutDashes()
}
// Store a new Pachyderm token (as the caller is authenticating) and
// initialize the root user as a cluster admin
roleBindings := a.roleBindings.ReadWrite(txCtx.SqlTx)
if err := roleBindings.Put(auth.ClusterRoleBindingKey, &auth.RoleBinding{
Entries: map[string]*auth.Roles{
auth.RootUser: {Roles: map[string]bool{auth.ClusterAdminRole: true}},
authdb.InternalUser: {Roles: map[string]bool{auth.ClusterAdminRole: true}},
auth.AllClusterUsersSubject: {Roles: map[string]bool{auth.ProjectCreatorRole: true}},
},
}); err != nil {
return nil, errors.Wrap(err, "add cluster role binding")
}
if err := a.insertAuthTokenNoTTLInTransaction(txCtx, auth.HashToken(pachToken), auth.RootUser); err != nil {
return nil, errors.Wrap(err, "insert root token")
}
txCtx.AuthBeingActivated.Store(true)
return &auth.ActivateResponse{PachToken: pachToken}, nil
}
// RotateRootToken implements the protobuf auth.RotateRootToken RPC
func (a *apiServer) RotateRootToken(ctx context.Context, req *auth.RotateRootTokenRequest) (*auth.RotateRootTokenResponse, error) {
if a.env.Config.AuthRootToken != "" {
return nil, errors.New("RotateRootToken() is disabled when the root token is configured in the environment")
}
var resp *auth.RotateRootTokenResponse
if err := a.env.TxnEnv.WithWriteContext(ctx, func(txCtx *txncontext.TransactionContext) error {
var err error
resp, err = a.rotateRootTokenInTransaction(txCtx, req)
return err
}); err != nil {
return nil, err
}
return resp, nil
}
func (a *apiServer) rotateRootTokenInTransaction(txCtx *txncontext.TransactionContext, req *auth.RotateRootTokenRequest) (resp *auth.RotateRootTokenResponse, retErr error) {
var rootToken string
// First revoke root's existing auth token
if _, err := a.deleteAuthTokensForSubjectInTransaction(txCtx.SqlTx, auth.RootUser); err != nil {
return nil, err
}
// If the new token is in the request, use it.
// Otherwise generate a new random token.
rootToken = req.RootToken
if rootToken == "" {
rootToken = uuid.NewWithoutDashes()
}
if err := a.insertAuthTokenNoTTLInTransaction(txCtx, auth.HashToken(rootToken), auth.RootUser); err != nil {
return nil, err
}
return &auth.RotateRootTokenResponse{RootToken: rootToken}, nil
}
// Deactivate implements the protobuf auth.Deactivate RPC
func (a *apiServer) Deactivate(ctx context.Context, req *auth.DeactivateRequest) (resp *auth.DeactivateResponse, retErr error) {
if err := dbutil.WithTx(ctx, a.env.DB, func(ctx context.Context, sqlTx *pachsql.Tx) error {
if err := a.roleBindings.ReadWrite(sqlTx).DeleteAll(); err != nil {
return errors.EnsureStack(err)
}
if err := a.deleteAllAuthTokens(ctx, sqlTx); err != nil {
return errors.EnsureStack(err)
}
if err := a.members.ReadWrite(sqlTx).DeleteAll(); err != nil {
return errors.EnsureStack(err)
}
if err := a.groups.ReadWrite(sqlTx).DeleteAll(); err != nil {
return errors.EnsureStack(err)
}
if err := a.authConfig.ReadWrite(sqlTx).DeleteAll(); err != nil {
return errors.EnsureStack(err)
}
return nil
}); err != nil {
return nil, err
}
// wait until the clusterRoleBinding watcher sees the deleted role binding,
// so that Deactivate() is less likely to race with subsequent calls that
// expect auth to be deactivated.
if err := backoff.Retry(func() error {
if err := a.isActive(ctx); err == nil {
return errors.Errorf("auth still activated")
}
return nil
}, backoff.RetryEvery(100*time.Millisecond)); err != nil {
return nil, err
}
return &auth.DeactivateResponse{}, nil
}
// Authenticate implements the protobuf auth.Authenticate RPC
func (a *apiServer) Authenticate(ctx context.Context, req *auth.AuthenticateRequest) (resp *auth.AuthenticateResponse, retErr error) {
if err := a.isActive(ctx); err != nil {
return nil, err
}
// verify whatever credential the user has presented, and write a new
// Pachyderm token for the user that their credential belongs to
var pachToken string
switch {
case req.OidcState != "":
// Determine caller's Pachyderm/OIDC user info (email)
email, err := a.OIDCStateToEmail(ctx, req.OidcState)
if err != nil {
return nil, err
}
username := auth.UserPrefix + email
if err := a.expiredEnterpriseCheck(ctx, username); err != nil {
return nil, err
}
// Generate a new Pachyderm token and write it
t, err := a.generateAndInsertAuthToken(ctx, username, int64(60*a.env.Config.SessionDurationMinutes))
if err != nil {
return nil, errors.Wrapf(err, "error storing auth token for user \"%s\"", username)
}
pachToken = t
case req.IdToken != "":
// Determine caller's Pachyderm/OIDC user info (email)
token, claims, err := a.validateIDToken(ctx, req.IdToken)
if err != nil {
return nil, err
}
username := auth.UserPrefix + claims.Email
if err := a.expiredEnterpriseCheck(ctx, username); err != nil {
return nil, err
}
// Sync the user's group membership from the groups claim
if err := a.syncGroupMembership(ctx, claims); err != nil {
return nil, err
}
// Compute the remaining time before the ID token expires,
// and limit the pach token to the same expiration time.
// If the token would be longer-lived than the default pach token,
// TTL clamp the expiration to the default TTL.
expirationSecs := int64(time.Until(token.Expiry).Seconds())
if expirationSecs > int64(60*a.env.Config.SessionDurationMinutes) {
expirationSecs = int64(60 * a.env.Config.SessionDurationMinutes)
}
t, err := a.generateAndInsertAuthToken(ctx, username, expirationSecs)
if err != nil {
return nil, errors.Wrapf(err, "error storing auth token for user \"%s\"", username)
}
pachToken = t
default:
return nil, errors.Errorf("unrecognized authentication mechanism (old pachd?)")
}
log.Info(ctx, "Authentication checks successful, now returning pachToken")
// Return new pachyderm token to caller
return &auth.AuthenticateResponse{
PachToken: pachToken,
}, nil
}
func (a *apiServer) evaluateRoleBindingInTransaction(txnCtx *txncontext.TransactionContext, principal string, resource *auth.Resource, permissions map[auth.Permission]bool) (*authorizeRequest, error) {
request := newAuthorizeRequest(principal, permissions, a.getGroupsInTransaction)
// Special-case making spec repos world-readable, because the alternative breaks reading pipelines.
// TOOD: 2.0 - should we make this a user-configurable cluster binding instead of hard-coding it?
if resource.Type == auth.ResourceType_SPEC_REPO {
if err := request.evaluateRoleBinding(txnCtx, &auth.RoleBinding{
Entries: map[string]*auth.Roles{
auth.AllClusterUsersSubject: {
Roles: map[string]bool{
auth.RepoReaderRole: true,
},
},
},
}); err != nil {
return nil, err
}
// If the user only requested reader access, we can return early.
// Otherwise we just treat this like a request about the associated user repo.
if request.isSatisfied() {
return request, nil
}
resource.Type = auth.ResourceType_REPO
}
// Check the permissions at the cluster level
binding, err := a.getClusterRoleBindingInTransaction(txnCtx)
if err != nil {
return nil, err
}
if err := request.evaluateRoleBinding(txnCtx, binding); err != nil {
return nil, err
}
// If all the permissions are satisfied by the cached cluster binding don't
// retrieve the resource bindings. If the resource in question is the whole
// cluster we should also exit early
if request.isSatisfied() || resource.Type == auth.ResourceType_CLUSTER {
return request, nil
}
// if resource is a repo, then we should check project level permissions as well
var roleBinding auth.RoleBinding
if resource.Type == auth.ResourceType_REPO {
repo, err := authRepoResourceToRepo(resource)
if err != nil {
return nil, err
}
projectResource := &auth.Resource{Type: auth.ResourceType_PROJECT, Name: repo.Project.Name}
projectKey := authdb.ResourceKey(projectResource)
if err := a.roleBindings.ReadWrite(txnCtx.SqlTx).Get(projectKey, &roleBinding); err != nil {
if col.IsErrNotFound(err) {
return nil, &auth.ErrNoRoleBinding{Resource: projectResource}
}
return nil, errors.Wrapf(err, "error getting role bindings for %s", projectKey)
}
if err := request.evaluateRoleBinding(txnCtx, &roleBinding); err != nil {
return nil, err
}
if request.isSatisfied() {
return request, nil
}
}
// Get the role bindings for the resource to check
if err := a.roleBindings.ReadWrite(txnCtx.SqlTx).Get(authdb.ResourceKey(resource), &roleBinding); err != nil {
if col.IsErrNotFound(err) {
return nil, &auth.ErrNoRoleBinding{
Resource: resource,
}
}
return nil, errors.Wrapf(err, "error getting role bindings for %s \"%s\"", resource.Type, resource.Name)
}
if err := request.evaluateRoleBinding(txnCtx, &roleBinding); err != nil {
return nil, err
}
return request, nil
}
// AuthorizeInTransaction is identical to Authorize except that it can run in a `pachsql.Tx`.
func (a *apiServer) AuthorizeInTransaction(
txnCtx *txncontext.TransactionContext,
req *auth.AuthorizeRequest,
) (resp *auth.AuthorizeResponse, retErr error) {
me, err := txnCtx.WhoAmI()
if err != nil {
return nil, err
}
permissions := make(map[auth.Permission]bool)
for _, p := range req.Permissions {
permissions[p] = true
}
request, err := a.evaluateRoleBindingInTransaction(txnCtx, me.Username, req.Resource, permissions)
if err != nil {
return nil, err
}
return &auth.AuthorizeResponse{
Principal: me.Username,
Authorized: request.isSatisfied(),
Missing: request.missing(),
Satisfied: request.satisfiedPermissions,
}, nil
}
// Authorize implements the protobuf auth.Authorize RPC
func (a *apiServer) Authorize(
ctx context.Context,
req *auth.AuthorizeRequest,
) (resp *auth.AuthorizeResponse, retErr error) {
var response *auth.AuthorizeResponse
if err := a.env.TxnEnv.WithReadContext(ctx, func(txnCtx *txncontext.TransactionContext) error {
var err error
response, err = a.AuthorizeInTransaction(txnCtx, req)
return err
}); err != nil {
return nil, err
}
return response, nil
}
func (a *apiServer) GetPermissionsForPrincipal(ctx context.Context, req *auth.GetPermissionsForPrincipalRequest) (*auth.GetPermissionsResponse, error) {
permissions := make(map[auth.Permission]bool)
for p := range auth.Permission_name {
permissions[auth.Permission(p)] = true
}
var request *authorizeRequest
if err := a.env.TxnEnv.WithReadContext(ctx, func(txnCtx *txncontext.TransactionContext) error {
var err error
request, err = a.evaluateRoleBindingInTransaction(txnCtx, req.Principal, req.Resource, permissions)
return err
}); err != nil {
return nil, errors.Wrap(err, "cannot evaluate role binding")
}
return &auth.GetPermissionsResponse{
Roles: request.rolesForResourceType(req.Resource.Type),
Permissions: request.satisfiedForResourceType(req.Resource.Type),
}, nil
}
// GetPermissions implements the protobuf auth.GetPermissions RPC
func (a *apiServer) GetPermissions(ctx context.Context, req *auth.GetPermissionsRequest) (resp *auth.GetPermissionsResponse, retErr error) {
callerInfo, err := a.getAuthenticatedUser(ctx)
if err != nil {
return nil, errors.Wrap(err, "cannot get authenticated user")
}
resp, err = a.GetPermissionsForPrincipal(ctx, &auth.GetPermissionsForPrincipalRequest{Principal: callerInfo.Subject, Resource: req.Resource})
if err != nil {
return nil, errors.Wrap(err, "cannot get permissions for principal")
}
return resp, nil
}
func (a *apiServer) GetPermissionsInTransaction(txnCtx *txncontext.TransactionContext, req *auth.GetPermissionsRequest) (*auth.GetPermissionsResponse, error) {
callerInfo, err := txnCtx.WhoAmI()
if err != nil {
return nil, err
}
return a.getPermissionsForPrincipalInTransaction(txnCtx, &auth.GetPermissionsForPrincipalRequest{Principal: callerInfo.Username, Resource: req.Resource})
}
func (a *apiServer) getPermissionsForPrincipalInTransaction(txnCtx *txncontext.TransactionContext, req *auth.GetPermissionsForPrincipalRequest) (*auth.GetPermissionsResponse, error) {
permissions := make(map[auth.Permission]bool)
for p := range auth.Permission_name {
permissions[auth.Permission(p)] = true
}
request, err := a.evaluateRoleBindingInTransaction(txnCtx, req.Principal, req.Resource, permissions)
if err != nil {
return nil, err
}
return &auth.GetPermissionsResponse{
Roles: request.rolesForResourceType(req.Resource.Type),
Permissions: request.satisfiedForResourceType(req.Resource.Type),
}, nil
}
// WhoAmI implements the protobuf auth.WhoAmI RPC
func (a *apiServer) WhoAmI(ctx context.Context, req *auth.WhoAmIRequest) (resp *auth.WhoAmIResponse, retErr error) {
if err := a.isActive(ctx); err != nil {
return nil, err
}
callerInfo, err := a.getAuthenticatedUser(ctx)
if err != nil {
return nil, err
}
return &auth.WhoAmIResponse{
Username: callerInfo.Subject,
Expiration: callerInfo.Expiration,
}, nil
}
// DeleteRoleBindingInTransaction is used to remove role bindings for resources when they're deleted in other services.
// It doesn't do any auth checks itself - the calling method should ensure the user is allowed to delete this resource.
// This is not an RPC, this is only called in-process.
func (a *apiServer) DeleteRoleBindingInTransaction(txnCtx *txncontext.TransactionContext, resource *auth.Resource) error {
if err := a.isActiveInTransaction(txnCtx); err != nil {
return err
}
if resource.Type == auth.ResourceType_CLUSTER {
return errors.Errorf("cannot delete cluster role binding")
}
key := authdb.ResourceKey(resource)
roleBindings := a.roleBindings.ReadWrite(txnCtx.SqlTx)
if err := roleBindings.Delete(key); err != nil {
return errors.EnsureStack(err)
}
return nil
}
// rolesFromRoleSlice converts a slice of strings into *auth.Roles,
// validating that each role name is valid.
func rolesFromRoleSlice(rs []string) (*auth.Roles, error) {
if len(rs) == 0 {
return nil, nil
}
for _, r := range rs {
if _, err := getRole(r); err != nil {
return nil, err
}
}
roles := &auth.Roles{Roles: make(map[string]bool)}
for _, r := range rs {
roles.Roles[r] = true
}
return roles, nil
}
// CreateRoleBindingInTransaction is an internal-only API to create a role binding for a new resource.
// It doesn't do any authorization checks itself - the calling method should ensure the user is allowed
// to create the resource. This is not an RPC.
func (a *apiServer) CreateRoleBindingInTransaction(txnCtx *txncontext.TransactionContext, principal string, roleSlice []string, resource *auth.Resource) error {
bindings := &auth.RoleBinding{
Entries: make(map[string]*auth.Roles),
}
if len(roleSlice) != 0 {
// Check that the subject is in canonical form (`<type>:<principal>`).
if err := a.checkCanonicalSubject(principal); err != nil {
return err
}
roles, err := rolesFromRoleSlice(roleSlice)
if err != nil {
return err
}
bindings.Entries[principal] = roles
}
// Call Create, this will raise an error if the role binding already exists.
key := authdb.ResourceKey(resource)
roleBindings := a.roleBindings.ReadWrite(txnCtx.SqlTx)
if err := roleBindings.Create(key, bindings); err != nil {
return errors.EnsureStack(err)
}
return nil
}
// AddPipelineReaderToRepoInTransaction gives a pipeline access to read data from the specified source repo.
// This is distinct from ModifyRoleBinding because AddPipelineReader is a less expansive permission
// that is included in the repoReader role, versus being able to modify all role bindings which is
// part of repoOwner. This method is for internal use and is not exposed as an RPC.
func (a *apiServer) AddPipelineReaderToRepoInTransaction(txnCtx *txncontext.TransactionContext, sourceRepo *pfs.Repo, pipeline *pps.Pipeline) error {
r := &pfs.Repo{
Project: sourceRepo.Project,
Name: sourceRepo.Name,
Type: pfs.UserRepoType,
}
if err := a.CheckRepoIsAuthorizedInTransaction(txnCtx, r, auth.Permission_REPO_ADD_PIPELINE_READER); err != nil {
return err
}
return a.setUserRoleBindingInTransaction(txnCtx, sourceRepo.AuthResource(), auth.PipelinePrefix+pipeline.String(), []string{auth.RepoReaderRole})
}
// AddPipelineWriterToSourceRepoInTransaction gives a pipeline access to write data to the specified source repo.
// The only time a pipeline needs write permission for a source repo is in the case of Cron inputs.
// This is distinct from ModifyRoleBinding because AddPipelineWriter is a less expansive permission
// that is included in the repoWriter role, versus being able to modify all role bindings which is
// part of repoOwner. This method is for internal use and is not exposed as an RPC.
func (a *apiServer) AddPipelineWriterToSourceRepoInTransaction(txnCtx *txncontext.TransactionContext, sourceRepo *pfs.Repo, pipeline *pps.Pipeline) error {
// Check that the user is allowed to add a pipeline to write to the output repo.
r := &pfs.Repo{
Project: sourceRepo.Project,
Name: sourceRepo.Name,
Type: pfs.UserRepoType,
}
if err := a.CheckRepoIsAuthorizedInTransaction(txnCtx, r, auth.Permission_REPO_ADD_PIPELINE_WRITER); err != nil {
return err
}