/
databroker.go
112 lines (96 loc) · 2.63 KB
/
databroker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package authorize
import (
"context"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/grpcutil"
"github.com/pomerium/pomerium/pkg/storage"
)
type sessionOrServiceAccount interface {
GetUserId() string
Validate() error
}
func getDataBrokerRecord(
ctx context.Context,
recordType string,
recordID string,
lowestRecordVersion uint64,
) (*databroker.Record, error) {
q := storage.GetQuerier(ctx)
req := &databroker.QueryRequest{
Type: recordType,
Limit: 1,
}
req.SetFilterByIDOrIndex(recordID)
res, err := q.Query(ctx, req)
if err != nil {
return nil, err
}
if len(res.GetRecords()) == 0 {
return nil, storage.ErrNotFound
}
// if the current record version is less than the lowest we'll accept, invalidate the cache
if res.GetRecords()[0].GetVersion() < lowestRecordVersion {
q.InvalidateCache(ctx, req)
} else {
return res.GetRecords()[0], nil
}
// retry with an up to date cache
res, err = q.Query(ctx, req)
if err != nil {
return nil, err
}
if len(res.GetRecords()) == 0 {
return nil, storage.ErrNotFound
}
return res.GetRecords()[0], nil
}
func (a *Authorize) getDataBrokerSessionOrServiceAccount(
ctx context.Context,
sessionID string,
dataBrokerRecordVersion uint64,
) (s sessionOrServiceAccount, err error) {
ctx, span := trace.StartSpan(ctx, "authorize.getDataBrokerSessionOrServiceAccount")
defer span.End()
record, err := getDataBrokerRecord(ctx, grpcutil.GetTypeURL(new(session.Session)), sessionID, dataBrokerRecordVersion)
if storage.IsNotFound(err) {
record, err = getDataBrokerRecord(ctx, grpcutil.GetTypeURL(new(user.ServiceAccount)), sessionID, dataBrokerRecordVersion)
}
if err != nil {
return nil, err
}
msg, err := record.GetData().UnmarshalNew()
if err != nil {
return nil, err
}
s = msg.(sessionOrServiceAccount)
if err := s.Validate(); err != nil {
return nil, err
}
if _, ok := s.(*session.Session); ok {
a.accessTracker.TrackSessionAccess(sessionID)
}
if _, ok := s.(*user.ServiceAccount); ok {
a.accessTracker.TrackServiceAccountAccess(sessionID)
}
return s, nil
}
func (a *Authorize) getDataBrokerUser(
ctx context.Context,
userID string,
) (*user.User, error) {
ctx, span := trace.StartSpan(ctx, "authorize.getDataBrokerUser")
defer span.End()
record, err := getDataBrokerRecord(ctx, grpcutil.GetTypeURL(new(user.User)), userID, 0)
if err != nil {
return nil, err
}
var u user.User
err = record.GetData().UnmarshalTo(&u)
if err != nil {
return nil, err
}
return &u, nil
}