Skip to content

Commit

Permalink
authorize: add databroker server and record version to result, force …
Browse files Browse the repository at this point in the history
…sync via polling (#2024)

* authorize: add databroker server and record version to result, force sync via polling

* wrap inmem store to take read lock when grabbing databroker versions

* address code review comments

* reset max to 0
  • Loading branch information
calebdoxsey committed Mar 31, 2021
1 parent 8f97b0d commit d7ab817
Show file tree
Hide file tree
Showing 17 changed files with 464 additions and 359 deletions.
2 changes: 1 addition & 1 deletion authorize/check_response_test.go
Expand Up @@ -39,7 +39,7 @@ func TestAuthorize_okResponse(t *testing.T) {
encoder, _ := jws.NewHS256Signer([]byte{0, 0, 0, 0})
a.state.Load().encoder = encoder
a.currentOptions.Store(opt)
a.store = evaluator.NewStoreFromProtos(
a.store = evaluator.NewStoreFromProtos(0,
&session.Session{
Id: "SESSION_ID",
UserId: "USER_ID",
Expand Down
6 changes: 3 additions & 3 deletions authorize/evaluator/custom_test.go
Expand Up @@ -14,7 +14,7 @@ func TestCustomEvaluator(t *testing.T) {

store := NewStore()
t.Run("bool deny", func(t *testing.T) {
ce := NewCustomEvaluator(store.opaStore)
ce := NewCustomEvaluator(store)
res, err := ce.Evaluate(ctx, &CustomEvaluatorRequest{
RegoPolicy: `
package pomerium.custom_policy
Expand All @@ -29,7 +29,7 @@ func TestCustomEvaluator(t *testing.T) {
assert.Empty(t, res.Reason)
})
t.Run("set deny", func(t *testing.T) {
ce := NewCustomEvaluator(store.opaStore)
ce := NewCustomEvaluator(store)
res, err := ce.Evaluate(ctx, &CustomEvaluatorRequest{
RegoPolicy: `
package pomerium.custom_policy
Expand All @@ -44,7 +44,7 @@ func TestCustomEvaluator(t *testing.T) {
assert.Equal(t, "test", res.Reason)
})
t.Run("missing package", func(t *testing.T) {
ce := NewCustomEvaluator(store.opaStore)
ce := NewCustomEvaluator(store)
res, err := ce.Evaluate(ctx, &CustomEvaluatorRequest{
RegoPolicy: `allow = true`,
})
Expand Down
100 changes: 5 additions & 95 deletions authorize/evaluator/evaluator.go
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/base64"
"fmt"
"net/http"
"strconv"

"github.com/open-policy-agent/opa/rego"
"gopkg.in/square/go-jose.v2"
Expand All @@ -29,7 +28,7 @@ type Evaluator struct {
// New creates a new Evaluator.
func New(options *config.Options, store *Store) (*Evaluator, error) {
e := &Evaluator{
custom: NewCustomEvaluator(store.opaStore),
custom: NewCustomEvaluator(store),
policies: options.GetAllPolicies(),
store: store,
}
Expand All @@ -55,7 +54,7 @@ func New(options *config.Options, store *Store) (*Evaluator, error) {
store.UpdateSigningKey(jwk)

e.rego = rego.New(
rego.Store(store.opaStore),
rego.Store(store),
rego.Module("pomerium.authz", string(authzPolicy)),
rego.Query("result = data.pomerium.authz"),
getGoogleCloudServerlessHeadersRegoOption,
Expand Down Expand Up @@ -91,6 +90,9 @@ func (e *Evaluator) Evaluate(ctx context.Context, req *Request) (*Result, error)
MatchingPolicy: getMatchingPolicy(res[0].Bindings.WithoutWildcards(), e.policies),
Headers: getHeadersVar(res[0].Bindings.WithoutWildcards()),
}
evalResult.DataBrokerServerVersion, evalResult.DataBrokerRecordVersion = getDataBrokerVersions(
res[0].Bindings,
)

allow := getAllowVar(res[0].Bindings.WithoutWildcards())
// evaluate any custom policies
Expand Down Expand Up @@ -181,95 +183,3 @@ func (e *Evaluator) newInput(req *Request, isValidClientCertificate bool) *input
i.IsValidClientCertificate = isValidClientCertificate
return i
}

// Result is the result of evaluation.
type Result struct {
Status int
Message string
Headers map[string]string
MatchingPolicy *config.Policy
}

func getMatchingPolicy(vars rego.Vars, policies []config.Policy) *config.Policy {
result, ok := vars["result"].(map[string]interface{})
if !ok {
return nil
}

idx, err := strconv.Atoi(fmt.Sprint(result["route_policy_idx"]))
if err != nil {
return nil
}

if idx >= len(policies) {
return nil
}

return &policies[idx]
}

func getAllowVar(vars rego.Vars) bool {
result, ok := vars["result"].(map[string]interface{})
if !ok {
return false
}

allow, ok := result["allow"].(bool)
if !ok {
return false
}
return allow
}

func getDenyVar(vars rego.Vars) []Result {
result, ok := vars["result"].(map[string]interface{})
if !ok {
return nil
}

denials, ok := result["deny"].([]interface{})
if !ok {
return nil
}

results := make([]Result, 0, len(denials))
for _, denial := range denials {
denial, ok := denial.([]interface{})
if !ok || len(denial) != 2 {
continue
}

status, err := strconv.Atoi(fmt.Sprint(denial[0]))
if err != nil {
log.Error().Err(err).Msg("invalid type in deny")
continue
}
msg := fmt.Sprint(denial[1])

results = append(results, Result{
Status: status,
Message: msg,
})
}
return results
}

func getHeadersVar(vars rego.Vars) map[string]string {
headers := make(map[string]string)

result, ok := vars["result"].(map[string]interface{})
if !ok {
return headers
}

m, ok := result["identity_headers"].(map[string]interface{})
if !ok {
return headers
}

for k, v := range m {
headers[k] = fmt.Sprint(v)
}

return headers
}
16 changes: 8 additions & 8 deletions authorize/evaluator/evaluator_test.go
Expand Up @@ -25,7 +25,7 @@ import (
func TestJSONMarshal(t *testing.T) {
opt := config.NewDefaultOptions()
opt.AuthenticateURLString = "https://authenticate.example.com"
e, err := New(opt, NewStoreFromProtos(
e, err := New(opt, NewStoreFromProtos(0,
&session.Session{
UserId: "user1",
},
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestEvaluator_Evaluate(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

store := NewStoreFromProtos()
store := NewStoreFromProtos(0)
data, _ := ptypes.MarshalAny(&session.Session{
Version: "1",
Id: sessionID,
Expand All @@ -116,7 +116,7 @@ func TestEvaluator_Evaluate(t *testing.T) {
RefreshToken: "REFRESH TOKEN",
},
})
store.UpdateRecord(&databroker.Record{
store.UpdateRecord(0, &databroker.Record{
Version: 1,
Type: "type.googleapis.com/session.Session",
Id: sessionID,
Expand All @@ -127,7 +127,7 @@ func TestEvaluator_Evaluate(t *testing.T) {
Id: userID,
Email: "foo@example.com",
})
store.UpdateRecord(&databroker.Record{
store.UpdateRecord(0, &databroker.Record{
Version: 1,
Type: "type.googleapis.com/user.User",
Id: userID,
Expand Down Expand Up @@ -189,7 +189,7 @@ func BenchmarkEvaluator_Evaluate(b *testing.B) {
RefreshToken: "REFRESH TOKEN",
},
})
store.UpdateRecord(&databroker.Record{
store.UpdateRecord(0, &databroker.Record{
Version: uint64(i),
Type: "type.googleapis.com/session.Session",
Id: sessionID,
Expand All @@ -199,7 +199,7 @@ func BenchmarkEvaluator_Evaluate(b *testing.B) {
Version: fmt.Sprint(i),
Id: userID,
})
store.UpdateRecord(&databroker.Record{
store.UpdateRecord(0, &databroker.Record{
Version: uint64(i),
Type: "type.googleapis.com/user.User",
Id: userID,
Expand All @@ -211,7 +211,7 @@ func BenchmarkEvaluator_Evaluate(b *testing.B) {
Id: userID,
GroupIds: []string{"1", "2", "3", "4"},
})
store.UpdateRecord(&databroker.Record{
store.UpdateRecord(0, &databroker.Record{
Version: uint64(i),
Type: data.TypeUrl,
Id: userID,
Expand All @@ -222,7 +222,7 @@ func BenchmarkEvaluator_Evaluate(b *testing.B) {
Version: fmt.Sprint(i),
Id: fmt.Sprint(i),
})
store.UpdateRecord(&databroker.Record{
store.UpdateRecord(0, &databroker.Record{
Version: uint64(i),
Type: data.TypeUrl,
Id: fmt.Sprint(i),
Expand Down
4 changes: 4 additions & 0 deletions authorize/evaluator/opa/policy/authz.rego
Expand Up @@ -5,6 +5,10 @@ default allow = false
# 5 minutes from now in seconds
five_minutes := (time.now_ns() / 1e9) + (60 * 5)

# databroker versions to know which version of the data was evaluated
databroker_server_version := data.databroker_server_version
databroker_record_version := data.databroker_record_version

route_policy_idx := first_allowed_route_policy_idx(input.http.url)

route_policy := data.route_policies[route_policy_idx]
Expand Down
14 changes: 12 additions & 2 deletions authorize/evaluator/opa_test.go
Expand Up @@ -3,6 +3,7 @@ package evaluator
import (
"context"
"encoding/json"
"math"
"testing"
"time"

Expand All @@ -11,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
"gopkg.in/square/go-jose.v2"
"gopkg.in/square/go-jose.v2/jwt"

Expand All @@ -37,13 +39,13 @@ func TestOPA(t *testing.T) {
eval := func(t *testing.T, policies []config.Policy, data []proto.Message, req *Request, isValidClientCertificate bool) rego.Result {
authzPolicy, err := readPolicy()
require.NoError(t, err)
store := NewStoreFromProtos(data...)
store := NewStoreFromProtos(math.MaxUint64, data...)
store.UpdateIssuer("authenticate.example.com")
store.UpdateJWTClaimHeaders(config.NewJWTClaimHeaders("email", "groups", "user"))
store.UpdateRoutePolicies(policies)
store.UpdateSigningKey(privateJWK)
r := rego.New(
rego.Store(store.opaStore),
rego.Store(store),
rego.Module("pomerium.authz", string(authzPolicy)),
rego.Query("result = data.pomerium.authz"),
getGoogleCloudServerlessHeadersRegoOption,
Expand Down Expand Up @@ -646,4 +648,12 @@ func TestOPA(t *testing.T) {
}, true)
assert.True(t, res.Bindings["result"].(M)["allow"].(bool))
})
t.Run("databroker versions", func(t *testing.T) {
res := eval(t, nil, []proto.Message{
wrapperspb.String("test"),
}, &Request{}, false)
serverVersion, recordVersion := getDataBrokerVersions(res.Bindings)
assert.Equal(t, uint64(math.MaxUint64), serverVersion)
assert.NotEqual(t, uint64(0), recordVersion) // random
})
}

0 comments on commit d7ab817

Please sign in to comment.