Skip to content

Commit

Permalink
WIP: Pass mutex to built-in functions via BuiltinContext
Browse files Browse the repository at this point in the history
Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar committed Jun 9, 2023
1 parent 310b2c6 commit 5d514f8
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 0 deletions.
2 changes: 2 additions & 0 deletions topdown/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"math/rand"
"sync"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/metrics"
Expand Down Expand Up @@ -54,6 +55,7 @@ type (
DistributedTracingOpts tracing.Options // options to be used by distributed tracing.
rand *rand.Rand // randomization source for non-security-sensitive operations
Capabilities *ast.Capabilities
Mtx *sync.Mutex
}

// BuiltinFunc defines an interface for implementing built-in functions.
Expand Down
3 changes: 3 additions & 0 deletions topdown/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/metrics"
Expand Down Expand Up @@ -98,6 +99,7 @@ type eval struct {
tracingOpts tracing.Options
findOne bool
strictObjects bool
mtx *sync.Mutex
}

func (e *eval) Run(iter evalIterator) error {
Expand Down Expand Up @@ -811,6 +813,7 @@ func (e *eval) evalCall(terms []*ast.Term, iter unifyIterator) error {
PrintHook: e.printHook,
DistributedTracingOpts: e.tracingOpts,
Capabilities: capabilities,
Mtx: e.mtx,
}

eval := evalBuiltin{
Expand Down
9 changes: 9 additions & 0 deletions topdown/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,15 @@ func builtinHTTPSend(bctx BuiltinContext, operands []*ast.Term, iter func(*ast.T
return handleBuiltinErr(ast.HTTPSend.Name, bctx.Location, err)
}

if bctx.Mtx != nil {
bctx.Mtx.Lock()
}
result, err := getHTTPResponse(bctx, req)

if bctx.Mtx != nil {
bctx.Mtx.Unlock()
}

if err != nil {
if raiseError {
return handleHTTPSendErr(bctx, err)
Expand Down Expand Up @@ -807,6 +815,7 @@ func (c *interQueryCache) checkHTTPSendInterQueryCache() (ast.Value, error) {
if !found {
return nil, nil
}

c.bctx.Metrics.Counter(httpSendInterQueryCacheHits).Incr()
var cachedRespData *interQueryCacheData

Expand Down
52 changes: 52 additions & 0 deletions topdown/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -2830,6 +2831,57 @@ func TestHTTPSendCacheDefaultStatusCodesInterQueryCache(t *testing.T) {
})
}

func TestHTTPSendInterQueryCacheConcurrentRequests(t *testing.T) {

// run test server
var requests []*http.Request
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requests = append(requests, r)
headers := w.Header()
headers["Cache-Control"] = []string{"max-age=290304000, public"}
w.WriteHeader(http.StatusOK)
}))

defer ts.Close()

// add an inter-query cache
config, _ := iCache.ParseCachingConfig(nil)
interQueryCache := iCache.NewInterQueryCache(config)

m := metrics.New()
var mtx sync.Mutex

done := make(chan struct{})
go func() {
q := NewQuery(ast.MustParseBody(fmt.Sprintf(`http.send({"method": "get", "url": %q, "cache": true})`, ts.URL))).
WithMetrics(m).WithInterQueryBuiltinCache(interQueryCache).WithMutex(&mtx)

_, err := q.Run(context.Background())
if err != nil {
t.Error(err)
}
done <- struct{}{}
}()

q := NewQuery(ast.MustParseBody(fmt.Sprintf(`http.send({"method": "get", "url": %q, "cache": true})`, ts.URL))).
WithMetrics(m).WithInterQueryBuiltinCache(interQueryCache).WithMutex(&mtx)

_, err := q.Run(context.Background())
if err != nil {
t.Fatal(err)
}
<-done

expectedReqCount := 1
if len(requests) != expectedReqCount {
t.Fatalf("expected to get %d requests, got %d", expectedReqCount, len(requests))
}

if exp, act := uint64(1), m.Counter(httpSendInterQueryCacheHits).Value(); exp != act {
t.Fatalf("expected %d cache hits, got %d", exp, act)
}
}

type onlyOnceInterQueryCache struct {
value *interQueryCacheData
counter int
Expand Down
10 changes: 10 additions & 0 deletions topdown/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"io"
"sort"
"sync"
"time"

"github.com/open-policy-agent/opa/ast"
Expand Down Expand Up @@ -58,6 +59,7 @@ type Query struct {
strictObjects bool
printHook print.Hook
tracingOpts tracing.Options
mtx *sync.Mutex
}

// Builtin represents a built-in function that queries can call.
Expand Down Expand Up @@ -291,6 +293,12 @@ func (q *Query) WithStrictObjects(yes bool) *Query {
return q
}

// WithMutex sets the mutual exclusion lock that built-in functions can utilize.
func (q *Query) WithMutex(mtx *sync.Mutex) *Query {
q.mtx = mtx
return q
}

// PartialRun executes partial evaluation on the query with respect to unknown
// values. Partial evaluation attempts to evaluate as much of the query as
// possible without requiring values for the unknowns set on the query. The
Expand Down Expand Up @@ -357,6 +365,7 @@ func (q *Query) PartialRun(ctx context.Context) (partials []ast.Body, support []
builtinErrors: &builtinErrors{},
printHook: q.printHook,
strictObjects: q.strictObjects,
mtx: q.mtx,
}

if len(q.disableInlining) > 0 {
Expand Down Expand Up @@ -526,6 +535,7 @@ func (q *Query) Iter(ctx context.Context, iter func(QueryResult) error) error {
printHook: q.printHook,
tracingOpts: q.tracingOpts,
strictObjects: q.strictObjects,
mtx: q.mtx,
}
e.caller = e
q.metrics.Timer(metrics.RegoQueryEval).Start()
Expand Down

0 comments on commit 5d514f8

Please sign in to comment.