Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add metrics for OIDC #8015

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
6 changes: 4 additions & 2 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"log"

"knative.dev/eventing/pkg/broker/filter"

"github.com/google/uuid"
"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
Expand All @@ -41,7 +43,6 @@ import (
"knative.dev/eventing/cmd/broker"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker/filter"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
Expand Down Expand Up @@ -147,11 +148,12 @@ func main() {
}

reporter := filter.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))
authReporter := filter.NewAuthStatsReporterAdapter(reporter)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating an interface to avoid import cycle.

Before this, it is stuck with a import cycle between auth and filter packages. The auth code needs filter's stats reporting, but filter is already using auth stuff. To fix this I created a simple interface in auth that just covers the stats we need. Then making an adapter in filter to bridge the gap. Is this a good approach to solve this problem?


oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx, auth.WithStatsReporter(authReporter))
trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
handler, err = filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/jobsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

audience := auth.GetAudienceDirect(sinksv.SchemeGroupVersion.WithKind("JobSink"), ref.Namespace, ref.Name)

err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w)
err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w, nil)
if err != nil {
logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
return
Expand Down Expand Up @@ -382,7 +382,7 @@ func (h *Handler) handleGet(ctx context.Context, w http.ResponseWriter, r *http.

audience := auth.GetAudienceDirect(sinksv.SchemeGroupVersion.WithKind("JobSink"), ref.Namespace, ref.Name)

err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w)
err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w, nil)
if err != nil {
logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
return
Expand Down
55 changes: 50 additions & 5 deletions pkg/auth/token_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package auth
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -36,10 +37,16 @@ const (
kubernetesOIDCDiscoveryBaseURL = "https://kubernetes.default.svc"
)

var (
ErrNoJWTTokenFound = errors.New("no JWT token found in request")
ErrInvalidJWTToken = errors.New("invalid JWT token")
)

type OIDCTokenVerifier struct {
logger *zap.SugaredLogger
restConfig *rest.Config
provider *oidc.Provider
logger *zap.SugaredLogger
restConfig *rest.Config
provider *oidc.Provider
statsReporter AuthStatsReporter
}

type IDToken struct {
Expand All @@ -51,7 +58,31 @@ type IDToken struct {
AccessTokenHash string
}

func NewOIDCTokenVerifier(ctx context.Context) *OIDCTokenVerifier {
type AuthStatsReporter interface {
ReportUnauthenticatedRequest(args *ReportArgs)
ReportInvalidTokenRequest(args *ReportArgs)
}

type ReportArgs struct {
Ns string
Trigger string
Broker string
Channel string
FilterType string
RequestType string
RequestScheme string
}

// TokenVeridierOption enables further configuration of a TokenVerifier.
type TokenVerifierOption func(*OIDCTokenVerifier)

func WithStatsReporter(reporter AuthStatsReporter) TokenVerifierOption {
return func(t *OIDCTokenVerifier) {
t.statsReporter = reporter
}
}

func NewOIDCTokenVerifier(ctx context.Context, o ...TokenVerifierOption) *OIDCTokenVerifier {
tokenHandler := &OIDCTokenVerifier{
logger: logging.FromContext(ctx).With("component", "oidc-token-handler"),
restConfig: injection.GetConfig(ctx),
Expand All @@ -61,6 +92,10 @@ func NewOIDCTokenVerifier(ctx context.Context) *OIDCTokenVerifier {
tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err))
}

for _, opt := range o {
opt(tokenHandler)
}

return tokenHandler
}

Expand Down Expand Up @@ -152,10 +187,16 @@ func (c *OIDCTokenVerifier) getKubernetesOIDCDiscovery() (*openIDMetadata, error
}

// VerifyJWTFromRequest will verify the incoming request contains the correct JWT token
func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context, r *http.Request, audience *string, response http.ResponseWriter) error {
func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context, r *http.Request, audience *string, response http.ResponseWriter, reportArgs *ReportArgs) error {
token := GetJWTFromHeader(r.Header)

if token == "" {
response.WriteHeader(http.StatusUnauthorized)

if tokenVerifier.statsReporter != nil {
tokenVerifier.statsReporter.ReportUnauthenticatedRequest(reportArgs)
}

return fmt.Errorf("no JWT token found in request")
}

Expand All @@ -166,6 +207,10 @@ func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context

if _, err := tokenVerifier.VerifyJWT(ctx, token, *audience); err != nil {
response.WriteHeader(http.StatusUnauthorized)

if tokenVerifier.statsReporter != nil {
tokenVerifier.statsReporter.ReportInvalidTokenRequest(reportArgs)
}
return fmt.Errorf("failed to verify JWT: %w", err)
}

Expand Down
49 changes: 47 additions & 2 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"net/http"
"time"

"knative.dev/eventing/pkg/auth"

opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -40,7 +42,6 @@ import (
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/apis"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/utils"

Expand Down Expand Up @@ -152,6 +153,42 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT
}, nil
}

type authStatsReporterAdapter struct {
reporter StatsReporter
}

func (a *authStatsReporterAdapter) ReportUnauthenticatedRequest(args *auth.ReportArgs) {
err := a.reporter.ReportUnauthenticatedRequest(&ReportArgs{
ns: args.Ns,
trigger: args.Trigger,
broker: args.Broker,
filterType: args.FilterType,
requestType: args.RequestType,
requestScheme: args.RequestScheme,
})
if err != nil {
return
}
}

func (a *authStatsReporterAdapter) ReportInvalidTokenRequest(args *auth.ReportArgs) {
err := a.reporter.ReportInvalidTokenRequest(&ReportArgs{
ns: args.Ns,
trigger: args.Trigger,
broker: args.Broker,
filterType: args.FilterType,
requestType: args.RequestType,
requestScheme: args.RequestScheme,
})
if err != nil {
return
}
}

func NewAuthStatsReporterAdapter(reporter StatsReporter) auth.AuthStatsReporter {
return &authStatsReporterAdapter{reporter: reporter}
}

func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
ctx := h.withContext(request.Context())

Expand Down Expand Up @@ -204,7 +241,15 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {

audience := FilterAudience

err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, &audience, writer)
reportArgs := &auth.ReportArgs{
Ns: trigger.Namespace,
Trigger: trigger.Name,
Broker: trigger.Spec.Broker,
RequestType: "filter",
}

err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, &audience, writer, reportArgs)

if err != nil {
h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
return
Expand Down
44 changes: 32 additions & 12 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,17 @@ func TestReceiver(t *testing.T) {
additionalReplyHeaders http.Header

// expectations
expectedResponseEvent *cloudevents.Event
expectedResponse *http.Response
expectedDispatch bool
expectedStatus int
expectedHeaders http.Header
expectedEventCount bool
expectedEventDispatchTime bool
expectedEventProcessingTime bool
expectedResponseHeaders http.Header
expectedResponseEvent *cloudevents.Event
expectedResponse *http.Response
expectedDispatch bool
expectedStatus int
expectedHeaders http.Header
expectedEventCount bool
expectedUnauthenticatedCount bool
expectedInvalidTokenCount bool
expectedEventDispatchTime bool
expectedEventProcessingTime bool
expectedResponseHeaders http.Header
}{
"Not POST": {
request: httptest.NewRequest(http.MethodGet, validPath, nil),
Expand Down Expand Up @@ -517,6 +519,12 @@ func TestReceiver(t *testing.T) {
if tc.expectedEventCount != reporter.eventCountReported {
t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.eventCountReported)
}
if tc.expectedUnauthenticatedCount != reporter.unauthenticatedCountReported {
t.Errorf("Incorrect unauthenticated count reported metric. Expected %v, Actual %v", tc.expectedUnauthenticatedCount, reporter.unauthenticatedCountReported)
}
if tc.expectedInvalidTokenCount != reporter.invalidTokenCountReported {
t.Errorf("Incorrect invalid token count reported metric. Expected %v, Actual %v", tc.expectedInvalidTokenCount, reporter.invalidTokenCountReported)
}
if tc.expectedEventDispatchTime != reporter.eventDispatchTimeReported {
t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.eventDispatchTimeReported)
}
Expand Down Expand Up @@ -742,16 +750,28 @@ func (r *responseWriterWithInvocationsCheck) WriteHeader(statusCode int) {
}

type mockReporter struct {
eventCountReported bool
eventDispatchTimeReported bool
eventProcessingTimeReported bool
eventCountReported bool
unauthenticatedCountReported bool
invalidTokenCountReported bool
eventDispatchTimeReported bool
eventProcessingTimeReported bool
}

func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error {
r.eventCountReported = true
return nil
}

func (r *mockReporter) ReportUnauthenticatedRequest(args *ReportArgs) error {
r.unauthenticatedCountReported = true
return nil
}

func (r *mockReporter) ReportInvalidTokenRequest(args *ReportArgs) error {
r.invalidTokenCountReported = true
return nil
}

func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error {
r.eventDispatchTimeReported = true
return nil
Expand Down
48 changes: 48 additions & 0 deletions pkg/broker/filter/stats_reporter.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is not only relevant to broker-filters anymore, should we move this StatsReporter to a dedicated package and make it more generic? E.g. pkg/metrics?
(This could also help to remove the import cycle.)

Then you could also change the ReportArgs to an interface (which has the method generateTag()) to have component specific args. E.g. from a Trigger (which would to the same as the current reporter.generateTag(), or then for a Channel, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for example:

type ReportArgs interface { //TODO: find a better name
    generateTag(tags ...tag.Mutator) (context.Context, error)
}

and then for example for the broker/trigger:

type BrokerArgs struct {
	ns            string
	trigger       string
	broker        string
	filterType    string
	requestType   string
	requestScheme string
}

func (args *BrokerArgs) generateTag(tags ...tag.Mutator) (context.Context, error) {
	ctx := metricskey.WithResource(emptyContext, resource.Resource{
		Type: eventingmetrics.ResourceTypeKnativeTrigger,
		Labels: map[string]string{
			eventingmetrics.LabelNamespaceName: args.ns,
			eventingmetrics.LabelBrokerName:    args.broker,
			eventingmetrics.LabelTriggerName:   args.trigger,
		},
	})
	// Note that filterType and filterSource can be empty strings, so they need a special treatment.
	ctx, err := tag.New(
		ctx,
		append(tags,
			tag.Insert(triggerFilterTypeKey, valueOrAny(args.filterType)),
			tag.Insert(triggerFilterRequestTypeKey, args.requestType),
			tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme),
		)...)
	return ctx, err
}

And then we call this e.g. in ReportEventCount():

func (r *reporter) ReportEventCount(args ReportArgs, responseCode int) error {
	ctx, err := args.generateTag(
		tag.Insert(responseCodeKey, strconv.Itoa(responseCode)),
		tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)))
	if err != nil {
		return err
	}
	metrics.Record(ctx, eventCountM.M(1))
	return nil
}

and use it

reportArgs := &BrokerArgs{
	Ns:          broker.Namespace,
	Broker:      broker.Name,
	RequestType: "broker_ingress",
}

err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, broker.Status.Address.Audience, writer, reportArgs)

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ var (
stats.UnitDimensionless,
)

// unauthenticatedRequestsM records the number of unauthenticated requests. (Without the JWT token in the header)
unauthenticatedRequestsM = stats.Int64(
"unauthenticated_requests",
"Number of unauthenticated requests (No JWT token found in the header)",
stats.UnitDimensionless,
)

// invalidTokenRequestsM records the number of requests with invalid tokens.
invalidTokenRequestsM = stats.Int64(
"invalid_token_requests",
"Number of requests with invalid tokens",
stats.UnitDimensionless,
)

// dispatchTimeInMsecM records the time spent dispatching an event to
// a Trigger subscriber, in milliseconds.
dispatchTimeInMsecM = stats.Float64(
Expand Down Expand Up @@ -90,6 +104,8 @@ func init() {
// StatsReporter defines the interface for sending filter metrics.
type StatsReporter interface {
ReportEventCount(args *ReportArgs, responseCode int) error
ReportUnauthenticatedRequest(args *ReportArgs) error
ReportInvalidTokenRequest(args *ReportArgs) error
ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error
ReportEventProcessingTime(args *ReportArgs, d time.Duration) error
}
Expand Down Expand Up @@ -120,6 +136,18 @@ func register() {
Aggregation: view.Count(),
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: unauthenticatedRequestsM.Description(),
Measure: unauthenticatedRequestsM,
Aggregation: view.Count(),
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: invalidTokenRequestsM.Description(),
Measure: invalidTokenRequestsM,
Aggregation: view.Count(),
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: dispatchTimeInMsecM.Description(),
Measure: dispatchTimeInMsecM,
Expand Down Expand Up @@ -150,6 +178,26 @@ func (r *reporter) ReportEventCount(args *ReportArgs, responseCode int) error {
return nil
}

// ReportUnauthenticatedRequest captures unauthenticated requests. (The requests that do not have JWT token in the header)
func (r *reporter) ReportUnauthenticatedRequest(args *ReportArgs) error {
ctx, err := r.generateTag(args)
if err != nil {
return err
}
metrics.Record(ctx, unauthenticatedRequestsM.M(1))
return nil
}

// ReportInvalidTokenRequest captures requests with invalid tokens.
func (r *reporter) ReportInvalidTokenRequest(args *ReportArgs) error {
ctx, err := r.generateTag(args)
if err != nil {
return err
}
metrics.Record(ctx, invalidTokenRequestsM.M(1))
return nil
}

// ReportEventDispatchTime captures dispatch times.
func (r *reporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error {
ctx, err := r.generateTag(args,
Expand Down
8 changes: 7 additions & 1 deletion pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,13 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
if features.IsOIDCAuthentication() {
h.Logger.Debug("OIDC authentication is enabled")

err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, broker.Status.Address.Audience, writer)
reportArgs := &auth.ReportArgs{
Ns: broker.Namespace,
Broker: broker.Name,
RequestType: "broker_ingress",
}

err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, broker.Status.Address.Audience, writer, reportArgs)
if err != nil {
h.Logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
return
Expand Down
Loading
Loading