Skip to content

Commit

Permalink
runtime/logging: only suppress payloads for handlers that compress re…
Browse files Browse the repository at this point in the history
…sponses (open-policy-agent#4502)

* runtime/logging: only suppress payloads for handlers that compress responses

To get compressed responses, two things need to be true:

1. The client must accept compressed responses
2. The handler must reply with a compressed response

For general API requests, (1) holds most of the time. (2) is only true
for the metrics endpoint at them moment, since the 3rd party library we
use for serving the prometheus endpoint will do compression.

* runtime/logging: remove dead code

The http.Hijack stuff was related to a watch feature removed in
open-policy-agent@186ef99

dropInputParam was only used by its tests.

Signed-off-by: Stephan Renatus <stephan.renatus@gmail.com>
  • Loading branch information
srenatus authored and rokkiter committed Apr 18, 2022
1 parent c200234 commit db7732c
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 107 deletions.
62 changes: 14 additions & 48 deletions runtime/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,16 @@
package runtime

import (
"bufio"
"bytes"
"context"
"errors"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"sync/atomic"
"time"

"github.com/open-policy-agent/opa/logging"
"github.com/open-policy-agent/opa/server/types"
"github.com/open-policy-agent/opa/topdown/print"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -146,9 +141,16 @@ func (h *LoggingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

if h.loggingEnabled(logging.Debug) {
if gzipAccepted(r.Header) || isPprofEndpoint(r) {
fields["resp_body"] = "[Payload Compressed]"
} else {
switch {
case isPprofEndpoint(r):
// pprof always sends binary data (protobuf)
fields["resp_body"] = "[binary payload]"

case gzipAccepted(r.Header) && isMetricsEndpoint(r):
// metrics endpoint does so when the client accepts it (e.g. prometheus)
fields["resp_body"] = "[compressed payload]"

default:
fields["resp_body"] = recorder.buf.String()
}
}
Expand All @@ -173,6 +175,10 @@ func isPprofEndpoint(req *http.Request) bool {
return strings.HasPrefix(req.URL.Path, "/debug/pprof/")
}

func isMetricsEndpoint(req *http.Request) bool {
return strings.HasPrefix(req.URL.Path, "/metrics")
}

type recorder struct {
logger logging.Logger
inner http.ResponseWriter
Expand Down Expand Up @@ -215,46 +221,6 @@ func (r *recorder) WriteHeader(s int) {
r.inner.WriteHeader(s)
}

func (r *recorder) Hijack() (net.Conn, *bufio.ReadWriter, error) {
h, ok := r.inner.(http.Hijacker)
if !ok {
return nil, nil, errors.New("response writer is not a http.Hijacker")
}

c, rw, err := h.Hijack()
if err != nil {
return nil, nil, err
}

fields := map[string]interface{}{
"client_addr": r.req.RemoteAddr,
"req_id": r.id,
"req_method": r.req.Method,
"req_path": r.req.URL.EscapedPath(),
}

queries := r.req.URL.Query()[types.ParamQueryV1]
if len(queries) > 0 {
fields["req_query"] = queries[len(queries)-1]
}
r.logger.WithFields(fields).Info("Started watch.")

return c, rw, nil
}

func dropInputParam(u *url.URL) string {
cpy := url.Values{}
for k, v := range u.Query() {
if k != types.ParamInputV1 {
cpy[k] = v
}
}
if len(cpy) == 0 {
return u.Path
}
return u.Path + "?" + cpy.Encode()
}

func readBody(r io.ReadCloser) ([]byte, io.ReadCloser, error) {
if r == http.NoBody {
return nil, r, nil
Expand Down
182 changes: 123 additions & 59 deletions runtime/logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,103 +6,167 @@
package runtime

import (
"fmt"
"context"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
)

func TestDropInputParam(t *testing.T) {
"github.com/open-policy-agent/opa/logging"
"github.com/open-policy-agent/opa/logging/test"
)

// Without other params.
abc := `a.b.c:{"foo":[1,2,3,4]}`
abcEncoded := url.QueryEscape(abc)
func TestValidateGzipHeader(t *testing.T) {

uri, err := url.ParseRequestURI(fmt.Sprintf(`http://localhost:8181/v1/data/foo/bar?input=%v`, abcEncoded))
if err != nil {
panic(err)
httpHeader := http.Header{}
httpHeader.Add("Accept", "*/*")
if result, expected := gzipAccepted(httpHeader), false; result != expected {
t.Errorf("Expected %v but got: %v", expected, result)
}

result := dropInputParam(uri)
expected := "/v1/data/foo/bar"

if result != expected {
httpHeader.Add("Accept-Encoding", "gzip")
if result, expected := gzipAccepted(httpHeader), true; result != expected {
t.Errorf("Expected %v but got: %v", expected, result)
}

// With other params.
def := `d.e.f:{"bar":{"baz":null}}`
defEncoded := url.QueryEscape(def)

uri, err = url.ParseRequestURI(fmt.Sprintf(`http://localhost:8181/v1/data/foo/bar?input=%v&pretty=true&depth=1&input=%v`, abcEncoded, defEncoded))
if err != nil {
panic(err)
httpHeader.Set("Accept-Encoding", "gzip, deflate, br")
if result, expected := gzipAccepted(httpHeader), true; result != expected {
t.Errorf("Expected %v but got: %v", expected, result)
}

result = dropInputParam(uri)
expected = "/v1/data/foo/bar?depth=1&pretty=true"

if result != expected {
httpHeader.Set("Accept-Encoding", "br;q=1.0, gzip;q=0.8, *;q=0.1")
if result, expected := gzipAccepted(httpHeader), true; result != expected {
t.Errorf("Expected %v but got: %v", expected, result)
}

}

func TestValidateGzipHeader(t *testing.T) {

httpHeader := http.Header{}
func TestValidatePprofUrl(t *testing.T) {

httpHeader.Add("Accept", "*/*")
result := gzipAccepted(httpHeader)
expected := false
req := http.Request{}

if result != expected {
req.URL = &url.URL{Path: "/metrics"}
if result, expected := isPprofEndpoint(&req), false; result != expected {
t.Errorf("Expected %v but got: %v", expected, result)
}

httpHeader.Add("Accept-Encoding", "gzip")

result = gzipAccepted(httpHeader)
expected = true

if result != expected {
req.URL = &url.URL{Path: "/debug/pprof/"}
if result, expected := isPprofEndpoint(&req), true; result != expected {
t.Errorf("Expected %v but got: %v", expected, result)
}
}

httpHeader.Set("Accept-Encoding", "gzip, deflate, br")
result = gzipAccepted(httpHeader)
expected = true
func TestValidateMetricsUrl(t *testing.T) {

if result != expected {
req := http.Request{}

req.URL = &url.URL{Path: "/metrics"}
if result, expected := isMetricsEndpoint(&req), true; result != expected {
t.Errorf("Expected %v but got: %v", expected, result)
}

httpHeader.Set("Accept-Encoding", "br;q=1.0, gzip;q=0.8, *;q=0.1")
result = gzipAccepted(httpHeader)
expected = true

if result != expected {
req.URL = &url.URL{Path: "/debug/pprof/"}
if result, expected := isMetricsEndpoint(&req), false; result != expected {
t.Errorf("Expected %v but got: %v", expected, result)
}
}

func TestValidatePprofUrl(t *testing.T) {
func TestRequestLogging(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

req := http.Request{}
logger := test.New()
logger.SetLevel(logging.Debug)

req.URL = &url.URL{Path: "/metrics"}
result := isPprofEndpoint(&req)
expected := false
shutdownSeconds := 1
params := NewParams()
params.Addrs = &[]string{":0"}
params.Logger = logger
params.PprofEnabled = true
params.GracefulShutdownPeriod = shutdownSeconds // arbitrary, must be non-zero

if result != expected {
t.Errorf("Expected %v but got: %v", expected, result)
rt, err := NewRuntime(ctx, params)
if err != nil {
t.Fatal(err)
}

req.URL = &url.URL{Path: "/debug/pprof/"}
result = isPprofEndpoint(&req)
expected = true
initChannel := rt.Manager.ServerInitializedChannel()
go func() {
if err := rt.Serve(ctx); err != nil {
t.Error(err)
}
}()
<-initChannel

tests := []struct {
path string
acceptEncoding string
expected string
}{
{
"/metrics", "gzip", "[compressed payload]",
},
{
"/metrics", "*/*", "HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.", // rest omitted
},
{ // accept-encoding does not matter for "our" handlers -- they don't compress
"/v1/data", "gzip", "{\"result\":{}}",
},
{ // accept-encoding does not matter for pprof: it's always protobuf
"/debug/pprof/cmdline", "*/*", "[binary payload]",
},
}

if result != expected {
t.Errorf("Expected %v but got: %v", expected, result)
// execute all the requests
for _, tc := range tests {
rec := httptest.NewRecorder()
req, err := http.NewRequest("GET", tc.path, nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Accept-Encoding", tc.acceptEncoding)
rt.server.Handler.ServeHTTP(rec, req)
if exp, act := http.StatusOK, rec.Result().StatusCode; exp != act {
t.Errorf("GET %s: expected HTTP %d, got %d", tc.path, exp, act)
}
}

cancel()

// check the logs
ents := logger.Entries()
for j, tc := range tests {
i := uint64(j + 1)
found := false
for _, ent := range entriesForReq(ents, i) {
if ent.Message == "Sent response." {
act := ent.Fields["resp_body"].(string)
if !strings.Contains(act, tc.expected) {
t.Errorf("expected %q in resp_body field, got %q", tc.expected, act)
}
found = true
}
}
if !found {
t.Errorf("Expected \"Sent response.\" log for request %d (path %s)", j, tc.path)
}

}
if t.Failed() {
t.Logf("logs: %v", ents)
}
}

func entriesForReq(ents []test.LogEntry, n uint64) []test.LogEntry {
var ret []test.LogEntry
for _, e := range ents {
if r, ok := e.Fields["req_id"]; ok {
if i, ok := r.(uint64); ok {
if i == n {
ret = append(ret, e)
}
}
}
}
return ret
}

0 comments on commit db7732c

Please sign in to comment.