Skip to content

Commit

Permalink
httptransport: update notification endpoints
Browse files Browse the repository at this point in the history
This updates the notification endpoints to be more in-line with the
new all-in-one, interally instrumented handler style. It also includes
a change for making the instrumentation code less verbose.

Closes: #1523
Signed-off-by: Hank Donnay <hdonnay@redhat.com>
  • Loading branch information
hdonnay committed Jul 22, 2022
1 parent fa49078 commit 5111952
Show file tree
Hide file tree
Showing 9 changed files with 478 additions and 373 deletions.
5 changes: 0 additions & 5 deletions httptransport/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ import (
"github.com/quay/claircore"
)

const (
metricNamespace = `clair`
metricSubsystem = `http`
)

// GetDigest removes the last path element and parses it as a digest.
func getDigest(_ http.ResponseWriter, r *http.Request) (d claircore.Digest, err error) {
dStr := path.Base(r.URL.Path)
Expand Down
2 changes: 1 addition & 1 deletion httptransport/indexer_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (h *IndexerV1) affectedManifests(w http.ResponseWriter, r *http.Request) {
}

func init() {
indexerv1wrapper.init()
indexerv1wrapper.init("indexerv1")
}

var indexerv1wrapper = &wrapper{
Expand Down
62 changes: 61 additions & 1 deletion httptransport/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

const (
metricNamespace = `clair`
metricSubsystem = `http`
)

type wrapper struct {
RequestCount *prometheus.CounterVec
RequestSize *prometheus.HistogramVec
Expand All @@ -16,7 +21,62 @@ type wrapper struct {
InFlight *prometheus.GaugeVec
}

func (m *wrapper) init() {
func (m *wrapper) init(name string) {
if m.RequestCount == nil {
m.RequestCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Subsystem: metricSubsystem,
Name: name + "_request_total",
Help: "A total count of http requests for the given path",
},
[]string{"handler", "code", "method"},
)
}
if m.RequestSize == nil {
m.RequestSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricNamespace,
Subsystem: metricSubsystem,
Name: name + "_request_size_bytes",
Help: "Distribution of request sizes for the given path",
},
[]string{"handler", "code", "method"},
)
}
if m.ResponseSize == nil {
m.ResponseSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricNamespace,
Subsystem: metricSubsystem,
Name: name + "_response_size_bytes",
Help: "Distribution of response sizes for the given path",
}, []string{"handler", "code", "method"},
)
}
if m.RequestDuration == nil {
m.RequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricNamespace,
Subsystem: metricSubsystem,
Name: name + "_request_duration_seconds",
Help: "Distribution of request durations for the given path",
// These are roughly exponential from 0.5 to 300 seconds
Buckets: []float64{0.5, 0.7, 1.1, 1.7, 2.7, 4.2, 6.5, 10, 15, 23, 36, 54, 83, 128, 196, 300},
}, []string{"handler", "code", "method"},
)
}
if m.InFlight == nil {
m.InFlight = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metricNamespace,
Subsystem: metricSubsystem,
Name: name + "_in_flight_requests",
Help: "Gauge of requests in flight",
},
[]string{"handler"},
)
}
prometheus.MustRegister(m.RequestCount, m.RequestSize, m.ResponseSize, m.RequestDuration, m.InFlight)
}

Expand Down
2 changes: 1 addition & 1 deletion httptransport/matcher_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (h *MatcherV1) updateOperationHandlerDelete(w http.ResponseWriter, r *http.
}

func init() {
matcherv1wrapper.init()
matcherv1wrapper.init("matcherv1")
}

var matcherv1wrapper = &wrapper{
Expand Down
178 changes: 178 additions & 0 deletions httptransport/notification_v1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package httptransport

import (
"context"
"errors"
"net/http"
"path"
"path/filepath"
"strconv"
"time"

"github.com/google/uuid"
"github.com/ldelossa/responserecorder"
"github.com/quay/zlog"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

"github.com/quay/clair/v4/internal/codec"
"github.com/quay/clair/v4/notifier"
)

const defaultPageSize = 500

type notificationResponse struct {
Page notifier.Page `json:"page"`
Notifications []notifier.Notification `json:"notifications"`
}

// NotificationV1 is a Notification endpoint.
type NotificationV1 struct {
inner http.Handler
serv notifier.Service
}

var _ http.Handler = (*NotificationV1)(nil)

// NewNotificationV1 returns an http.Handler serving the Notification V1 API rooted at
// "prefix".
func NewNotificationV1(_ context.Context, prefix string, srv notifier.Service, topt otelhttp.Option) (*NotificationV1, error) {
prefix = path.Join("/", prefix) // Ensure the prefix is rooted and cleaned.
m := http.NewServeMux()
h := NotificationV1{
inner: otelhttp.NewHandler(
m,
"notificationv1",
otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents),
topt,
),
serv: srv,
}
p := path.Join(prefix, "notification") + "/"
m.Handle(p, notificationv1wrapper.wrapFunc(path.Join(p, ":id"), h.serveHTTP))
return &h, nil
}

// ServeHTTP implements http.Handler.
func (h *NotificationV1) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
wr := responserecorder.NewResponseRecorder(w)
defer func() {
if f, ok := wr.(http.Flusher); ok {
f.Flush()
}
zlog.Info(r.Context()).
Str("remote_addr", r.RemoteAddr).
Str("method", r.Method).
Str("request_uri", r.RequestURI).
Int("status", wr.StatusCode()).
Dur("duration", time.Since(start)).
Msg("handled HTTP request")
}()
h.inner.ServeHTTP(wr, r)
}

func (h *NotificationV1) serveHTTP(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
h.get(w, r)
case http.MethodDelete:
h.delete(w, r)
default:
apiError(w, http.StatusMethodNotAllowed, "endpoint only allows GET or DELETE")
}
}

func (h *NotificationV1) delete(w http.ResponseWriter, r *http.Request) {
ctx := zlog.ContextWithValues(r.Context(), "component", "httptransport/NotificationV1.delete")
path := r.URL.Path
id := filepath.Base(path)
notificationID, err := uuid.Parse(id)
if err != nil {
zlog.Warn(ctx).Err(err).Msg("could not parse notification id")
apiError(w, http.StatusBadRequest, "could not parse notification id: %v", err)
return
}

err = h.serv.DeleteNotifications(ctx, notificationID)
if err != nil {
zlog.Warn(ctx).Err(err).Msg("could not delete notification")
apiError(w, http.StatusInternalServerError, "could not delete notification: %v", err)
}
}

// Get will return paginated notifications to the caller.
func (h *NotificationV1) get(w http.ResponseWriter, r *http.Request) {
ctx := zlog.ContextWithValues(r.Context(), "component", "httptransport/NotificationV1.get")
path := r.URL.Path
id := filepath.Base(path)
notificationID, err := uuid.Parse(id)
if err != nil {
zlog.Warn(ctx).Err(err).Msg("could not parse notification id")
apiError(w, http.StatusBadRequest, "could not parse notification id: %v", err)
return
}

// optional page_size parameter
var pageSize int
if param := r.URL.Query().Get("page_size"); param != "" {
p, err := strconv.ParseInt(param, 10, 64)
if err != nil {
apiError(w, http.StatusBadRequest, "could not parse %q query param into integer", "page_size")
return
}
pageSize = int(p)
}
if pageSize == 0 {
pageSize = defaultPageSize
}

// optional page parameter
var next *uuid.UUID
if param := r.URL.Query().Get("next"); param != "" {
n, err := uuid.Parse(param)
if err != nil {
apiError(w, http.StatusBadRequest, "could not parse %q query param into integer", "next")
return
}
if n != uuid.Nil {
next = &n
}
}

allow := []string{"application/vnd.clair.notification.v1+json", "application/json"}
switch err := pickContentType(w, r, allow); {
case errors.Is(err, nil): // OK
case errors.Is(err, ErrMediaType):
apiError(w, http.StatusUnsupportedMediaType, "unable to negotiate common media type for %v", allow)
return
default:
apiError(w, http.StatusBadRequest, "malformed request: %v", err)
return
}

inP := &notifier.Page{
Size: pageSize,
Next: next,
}
notifications, outP, err := h.serv.Notifications(ctx, notificationID, inP)
if err != nil {
apiError(w, http.StatusInternalServerError, "failed to retrieve notifications: %v", err)
return
}

response := notificationResponse{
Page: outP,
Notifications: notifications,
}

defer writerError(w, &err)()
enc := codec.GetEncoder(w)
defer codec.PutEncoder(enc)
err = enc.Encode(&response)
}

func init() {
notificationv1wrapper.init("notificationv1")
}

var notificationv1wrapper wrapper
Loading

0 comments on commit 5111952

Please sign in to comment.