diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 167a0e888d6..bb1c2ac55f3 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -21,6 +21,8 @@ import ( "fmt" "log" + "knative.dev/eventing/pkg/broker/filter" + "github.com/google/uuid" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" @@ -41,7 +43,6 @@ import ( "knative.dev/eventing/cmd/broker" "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/auth" - "knative.dev/eventing/pkg/broker/filter" eventingclient "knative.dev/eventing/pkg/client/injection/client" brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker" triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger" @@ -147,11 +148,12 @@ func main() { } reporter := filter.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) + authReporter := filter.NewAuthStatsReporterAdapter(reporter) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send // the messages to the triggers' subscribers) in this binary. - oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx) + oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx, auth.WithStatsReporter(authReporter)) trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) handler, err = filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc) if err != nil { diff --git a/cmd/jobsink/main.go b/cmd/jobsink/main.go index 0f8c99646b4..74ab44b8a5d 100644 --- a/cmd/jobsink/main.go +++ b/cmd/jobsink/main.go @@ -201,7 +201,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { audience := auth.GetAudienceDirect(sinksv.SchemeGroupVersion.WithKind("JobSink"), ref.Namespace, ref.Name) - err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w) + err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w, nil) if err != nil { logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) return @@ -382,7 +382,7 @@ func (h *Handler) handleGet(ctx context.Context, w http.ResponseWriter, r *http. audience := auth.GetAudienceDirect(sinksv.SchemeGroupVersion.WithKind("JobSink"), ref.Namespace, ref.Name) - err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w) + err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w, nil) if err != nil { logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) return diff --git a/pkg/auth/token_verifier.go b/pkg/auth/token_verifier.go index 5571b67f2b1..9276a3d3356 100644 --- a/pkg/auth/token_verifier.go +++ b/pkg/auth/token_verifier.go @@ -19,6 +19,7 @@ package auth import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -36,10 +37,16 @@ const ( kubernetesOIDCDiscoveryBaseURL = "https://kubernetes.default.svc" ) +var ( + ErrNoJWTTokenFound = errors.New("no JWT token found in request") + ErrInvalidJWTToken = errors.New("invalid JWT token") +) + type OIDCTokenVerifier struct { - logger *zap.SugaredLogger - restConfig *rest.Config - provider *oidc.Provider + logger *zap.SugaredLogger + restConfig *rest.Config + provider *oidc.Provider + statsReporter AuthStatsReporter } type IDToken struct { @@ -51,7 +58,31 @@ type IDToken struct { AccessTokenHash string } -func NewOIDCTokenVerifier(ctx context.Context) *OIDCTokenVerifier { +type AuthStatsReporter interface { + ReportUnauthenticatedRequest(args *ReportArgs) + ReportInvalidTokenRequest(args *ReportArgs) +} + +type ReportArgs struct { + Ns string + Trigger string + Broker string + Channel string + FilterType string + RequestType string + RequestScheme string +} + +// TokenVeridierOption enables further configuration of a TokenVerifier. +type TokenVerifierOption func(*OIDCTokenVerifier) + +func WithStatsReporter(reporter AuthStatsReporter) TokenVerifierOption { + return func(t *OIDCTokenVerifier) { + t.statsReporter = reporter + } +} + +func NewOIDCTokenVerifier(ctx context.Context, o ...TokenVerifierOption) *OIDCTokenVerifier { tokenHandler := &OIDCTokenVerifier{ logger: logging.FromContext(ctx).With("component", "oidc-token-handler"), restConfig: injection.GetConfig(ctx), @@ -61,6 +92,10 @@ func NewOIDCTokenVerifier(ctx context.Context) *OIDCTokenVerifier { tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err)) } + for _, opt := range o { + opt(tokenHandler) + } + return tokenHandler } @@ -152,10 +187,16 @@ func (c *OIDCTokenVerifier) getKubernetesOIDCDiscovery() (*openIDMetadata, error } // VerifyJWTFromRequest will verify the incoming request contains the correct JWT token -func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context, r *http.Request, audience *string, response http.ResponseWriter) error { +func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context, r *http.Request, audience *string, response http.ResponseWriter, reportArgs *ReportArgs) error { token := GetJWTFromHeader(r.Header) + if token == "" { response.WriteHeader(http.StatusUnauthorized) + + if tokenVerifier.statsReporter != nil { + tokenVerifier.statsReporter.ReportUnauthenticatedRequest(reportArgs) + } + return fmt.Errorf("no JWT token found in request") } @@ -166,6 +207,10 @@ func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context if _, err := tokenVerifier.VerifyJWT(ctx, token, *audience); err != nil { response.WriteHeader(http.StatusUnauthorized) + + if tokenVerifier.statsReporter != nil { + tokenVerifier.statsReporter.ReportInvalidTokenRequest(reportArgs) + } return fmt.Errorf("failed to verify JWT: %w", err) } diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 5276b6f0a5e..6bd49a117e7 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -26,6 +26,8 @@ import ( "net/http" "time" + "knative.dev/eventing/pkg/auth" + opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" @@ -40,7 +42,6 @@ import ( "knative.dev/pkg/logging" "knative.dev/eventing/pkg/apis" - "knative.dev/eventing/pkg/auth" "knative.dev/eventing/pkg/eventingtls" "knative.dev/eventing/pkg/utils" @@ -152,6 +153,42 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT }, nil } +type authStatsReporterAdapter struct { + reporter StatsReporter +} + +func (a *authStatsReporterAdapter) ReportUnauthenticatedRequest(args *auth.ReportArgs) { + err := a.reporter.ReportUnauthenticatedRequest(&ReportArgs{ + ns: args.Ns, + trigger: args.Trigger, + broker: args.Broker, + filterType: args.FilterType, + requestType: args.RequestType, + requestScheme: args.RequestScheme, + }) + if err != nil { + return + } +} + +func (a *authStatsReporterAdapter) ReportInvalidTokenRequest(args *auth.ReportArgs) { + err := a.reporter.ReportInvalidTokenRequest(&ReportArgs{ + ns: args.Ns, + trigger: args.Trigger, + broker: args.Broker, + filterType: args.FilterType, + requestType: args.RequestType, + requestScheme: args.RequestScheme, + }) + if err != nil { + return + } +} + +func NewAuthStatsReporterAdapter(reporter StatsReporter) auth.AuthStatsReporter { + return &authStatsReporterAdapter{reporter: reporter} +} + func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { ctx := h.withContext(request.Context()) @@ -204,7 +241,15 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { audience := FilterAudience - err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, &audience, writer) + reportArgs := &auth.ReportArgs{ + Ns: trigger.Namespace, + Trigger: trigger.Name, + Broker: trigger.Spec.Broker, + RequestType: "filter", + } + + err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, &audience, writer, reportArgs) + if err != nil { h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) return diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 1e94b43a568..0ed399b3273 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -88,15 +88,17 @@ func TestReceiver(t *testing.T) { additionalReplyHeaders http.Header // expectations - expectedResponseEvent *cloudevents.Event - expectedResponse *http.Response - expectedDispatch bool - expectedStatus int - expectedHeaders http.Header - expectedEventCount bool - expectedEventDispatchTime bool - expectedEventProcessingTime bool - expectedResponseHeaders http.Header + expectedResponseEvent *cloudevents.Event + expectedResponse *http.Response + expectedDispatch bool + expectedStatus int + expectedHeaders http.Header + expectedEventCount bool + expectedUnauthenticatedCount bool + expectedInvalidTokenCount bool + expectedEventDispatchTime bool + expectedEventProcessingTime bool + expectedResponseHeaders http.Header }{ "Not POST": { request: httptest.NewRequest(http.MethodGet, validPath, nil), @@ -517,6 +519,12 @@ func TestReceiver(t *testing.T) { if tc.expectedEventCount != reporter.eventCountReported { t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.eventCountReported) } + if tc.expectedUnauthenticatedCount != reporter.unauthenticatedCountReported { + t.Errorf("Incorrect unauthenticated count reported metric. Expected %v, Actual %v", tc.expectedUnauthenticatedCount, reporter.unauthenticatedCountReported) + } + if tc.expectedInvalidTokenCount != reporter.invalidTokenCountReported { + t.Errorf("Incorrect invalid token count reported metric. Expected %v, Actual %v", tc.expectedInvalidTokenCount, reporter.invalidTokenCountReported) + } if tc.expectedEventDispatchTime != reporter.eventDispatchTimeReported { t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.eventDispatchTimeReported) } @@ -742,9 +750,11 @@ func (r *responseWriterWithInvocationsCheck) WriteHeader(statusCode int) { } type mockReporter struct { - eventCountReported bool - eventDispatchTimeReported bool - eventProcessingTimeReported bool + eventCountReported bool + unauthenticatedCountReported bool + invalidTokenCountReported bool + eventDispatchTimeReported bool + eventProcessingTimeReported bool } func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error { @@ -752,6 +762,16 @@ func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) erro return nil } +func (r *mockReporter) ReportUnauthenticatedRequest(args *ReportArgs) error { + r.unauthenticatedCountReported = true + return nil +} + +func (r *mockReporter) ReportInvalidTokenRequest(args *ReportArgs) error { + r.invalidTokenCountReported = true + return nil +} + func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error { r.eventDispatchTimeReported = true return nil diff --git a/pkg/broker/filter/stats_reporter.go b/pkg/broker/filter/stats_reporter.go index c2df9b3c9b5..2352aae9410 100644 --- a/pkg/broker/filter/stats_reporter.go +++ b/pkg/broker/filter/stats_reporter.go @@ -46,6 +46,20 @@ var ( stats.UnitDimensionless, ) + // unauthenticatedRequestsM records the number of unauthenticated requests. (Without the JWT token in the header) + unauthenticatedRequestsM = stats.Int64( + "unauthenticated_requests", + "Number of unauthenticated requests (No JWT token found in the header)", + stats.UnitDimensionless, + ) + + // invalidTokenRequestsM records the number of requests with invalid tokens. + invalidTokenRequestsM = stats.Int64( + "invalid_token_requests", + "Number of requests with invalid tokens", + stats.UnitDimensionless, + ) + // dispatchTimeInMsecM records the time spent dispatching an event to // a Trigger subscriber, in milliseconds. dispatchTimeInMsecM = stats.Float64( @@ -90,6 +104,8 @@ func init() { // StatsReporter defines the interface for sending filter metrics. type StatsReporter interface { ReportEventCount(args *ReportArgs, responseCode int) error + ReportUnauthenticatedRequest(args *ReportArgs) error + ReportInvalidTokenRequest(args *ReportArgs) error ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error ReportEventProcessingTime(args *ReportArgs, d time.Duration) error } @@ -120,6 +136,18 @@ func register() { Aggregation: view.Count(), TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey}, }, + &view.View{ + Description: unauthenticatedRequestsM.Description(), + Measure: unauthenticatedRequestsM, + Aggregation: view.Count(), + TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey}, + }, + &view.View{ + Description: invalidTokenRequestsM.Description(), + Measure: invalidTokenRequestsM, + Aggregation: view.Count(), + TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey}, + }, &view.View{ Description: dispatchTimeInMsecM.Description(), Measure: dispatchTimeInMsecM, @@ -150,6 +178,26 @@ func (r *reporter) ReportEventCount(args *ReportArgs, responseCode int) error { return nil } +// ReportUnauthenticatedRequest captures unauthenticated requests. (The requests that do not have JWT token in the header) +func (r *reporter) ReportUnauthenticatedRequest(args *ReportArgs) error { + ctx, err := r.generateTag(args) + if err != nil { + return err + } + metrics.Record(ctx, unauthenticatedRequestsM.M(1)) + return nil +} + +// ReportInvalidTokenRequest captures requests with invalid tokens. +func (r *reporter) ReportInvalidTokenRequest(args *ReportArgs) error { + ctx, err := r.generateTag(args) + if err != nil { + return err + } + metrics.Record(ctx, invalidTokenRequestsM.M(1)) + return nil +} + // ReportEventDispatchTime captures dispatch times. func (r *reporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error { ctx, err := r.generateTag(args, diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index 6219cb92537..df4da12f574 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -234,7 +234,13 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { if features.IsOIDCAuthentication() { h.Logger.Debug("OIDC authentication is enabled") - err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, broker.Status.Address.Audience, writer) + reportArgs := &auth.ReportArgs{ + Ns: broker.Namespace, + Broker: broker.Name, + RequestType: "broker_ingress", + } + + err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, broker.Status.Address.Audience, writer, reportArgs) if err != nil { h.Logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) return diff --git a/pkg/channel/event_receiver.go b/pkg/channel/event_receiver.go index ffb441cbcce..a670edac94b 100644 --- a/pkg/channel/event_receiver.go +++ b/pkg/channel/event_receiver.go @@ -256,7 +256,14 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth features := feature.FromContext(ctx) if features.IsOIDCAuthentication() { r.logger.Debug("OIDC authentication is enabled") - err = r.tokenVerifier.VerifyJWTFromRequest(ctx, request, &r.audience, response) + + reportArgs := &auth.ReportArgs{ + Ns: channel.Namespace, + Channel: channel.Name, + RequestType: "channel", + } + + err = r.tokenVerifier.VerifyJWTFromRequest(ctx, request, &r.audience, response, reportArgs) if err != nil { r.logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) return