Skip to content

Commit

Permalink
refactor: use http framework for features
Browse files Browse the repository at this point in the history
  • Loading branch information
turip committed May 15, 2024
1 parent e857d30 commit 4a8a032
Show file tree
Hide file tree
Showing 31 changed files with 511 additions and 235 deletions.
3 changes: 2 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/openmeterio/openmeter/internal/ingest/kafkaingest/serializer"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/namespace"
"github.com/openmeterio/openmeter/internal/namespace/namespacedriver"
"github.com/openmeterio/openmeter/internal/server"
"github.com/openmeterio/openmeter/internal/server/authenticator"
"github.com/openmeterio/openmeter/internal/server/router"
Expand Down Expand Up @@ -270,7 +271,7 @@ func main() {
}
ingestHandler := ingestdriver.NewIngestEventsHandler(
ingestService.IngestEvents,
ingestdriver.StaticNamespaceDecoder(namespaceManager.GetDefaultNamespace()),
namespacedriver.StaticNamespaceDecoder(namespaceManager.GetDefaultNamespace()),
nil,
errorsx.NewContextHandler(errorsx.NewAppHandler(errorsx.NewSlogHandler(logger))),
)
Expand Down
183 changes: 183 additions & 0 deletions internal/credit/creditdriver/http_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package creditdriver

import (
"context"
"fmt"
"net/http"

"github.com/openmeterio/openmeter/api"
"github.com/openmeterio/openmeter/internal/credit"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/namespace/namespacedriver"
"github.com/openmeterio/openmeter/pkg/framework/commonhttp"
"github.com/openmeterio/openmeter/pkg/framework/transport/httptransport"
"github.com/openmeterio/openmeter/pkg/models"
)

type Handler interface {
GetFeature(ctx context.Context, w http.ResponseWriter, r *http.Request, featureID api.FeatureID)
ListFeatures(ctx context.Context, w http.ResponseWriter, r *http.Request)
CreateFeature(ctx context.Context, w http.ResponseWriter, r *http.Request)
DeleteFeature(ctx context.Context, w http.ResponseWriter, r *http.Request, featureID api.FeatureID)
}

func NewHandler(
creditConnector credit.Connector,
meterRepository meter.Repository,
namespaceDecoder namespacedriver.NamespaceDecoder,
options ...httptransport.HandlerOption,
) Handler {
return &handler{
CreditConnector: creditConnector,
MeterRepository: meterRepository,
NamespaceDecoder: namespaceDecoder,
Options: options,
}
}

type handler struct {
CreditConnector credit.Connector
MeterRepository meter.Repository
NamespaceDecoder namespacedriver.NamespaceDecoder
Options []httptransport.HandlerOption
}

var _ Handler = (*handler)(nil)

type featureIDWithNamespace *namespacedriver.Wrapped[api.FeatureID]

func (h *handler) GetFeature(ctx context.Context, w http.ResponseWriter, r *http.Request, featureID api.FeatureID) {
httptransport.NewHandler[featureIDWithNamespace, credit.Feature](
func(ctx context.Context, r *http.Request) (featureIDWithNamespace, error) {
return namespacedriver.Wrap(ctx, featureID, h.NamespaceDecoder)
},
func(ctx context.Context, request featureIDWithNamespace) (credit.Feature, error) {
return h.CreditConnector.GetFeature(ctx, request.Namespace, request.Request)
},
commonhttp.JSONResponseEncoder,
httptransport.AppendOptions(
h.Options,
httptransport.WithErrorEncoder(func(ctx context.Context, err error, w http.ResponseWriter) bool {
if _, ok := err.(*credit.FeatureNotFoundError); ok {
models.NewStatusProblem(ctx, err, http.StatusNotFound).Respond(w)
return true
}
return false
}),
httptransport.WithOperationName("getFeature"),
)...,
).ServeHTTP(w, r)

}

type featureWithNamespace *namespacedriver.Wrapped[credit.Feature]

func (h *handler) CreateFeature(ctx context.Context, w http.ResponseWriter, r *http.Request) {
httptransport.NewHandler[featureWithNamespace, credit.Feature](
func(ctx context.Context, r *http.Request) (featureWithNamespace, error) {
featureIn := api.CreateFeatureJSONRequestBody{}
if err := commonhttp.JSONRequestBodyDecoder(r, &featureIn); err != nil {
return nil, err
}

featureWithNS, err := namespacedriver.Wrap(ctx, featureIn, h.NamespaceDecoder)
if err != nil {
return nil, err
}

meter, err := h.MeterRepository.GetMeterByIDOrSlug(ctx, featureWithNS.Namespace, featureIn.MeterSlug)
if err != nil {
if _, ok := err.(*models.MeterNotFoundError); ok {
return nil, commonhttp.NewHTTPError(
http.StatusBadRequest,
fmt.Errorf("meter not found: %s", featureIn.MeterSlug),
)
}

return nil, err
}

if err := validateMeterAggregation(meter); err != nil {
return nil, commonhttp.NewHTTPError(http.StatusBadRequest, err)
}
return featureWithNS, nil
},
func(ctx context.Context, in featureWithNamespace) (credit.Feature, error) {
// Let's make sure we are not allowing the ID to be specified externally
in.Request.ID = nil

return h.CreditConnector.CreateFeature(ctx, in.Namespace, in.Request)
},
commonhttp.JSONResponseEncoderWithStatus[credit.Feature](http.StatusCreated),
httptransport.AppendOptions(
h.Options,
httptransport.WithOperationName("createFeature"),
)...,
).ServeHTTP(w, r)
}

func validateMeterAggregation(meter models.Meter) error {
switch meter.Aggregation {
case models.MeterAggregationCount, models.MeterAggregationUniqueCount, models.MeterAggregationSum:
return nil
}

return fmt.Errorf("meter %s's aggregation is %s but features can only be created for SUM, COUNT, UNIQUE_COUNT MeterRepository",
meter.Slug,
meter.Aggregation,
)
}

type featureListWithNamespace *namespacedriver.Wrapped[credit.ListFeaturesParams]

func (h *handler) ListFeatures(ctx context.Context, w http.ResponseWriter, r *http.Request) {
httptransport.NewHandler[featureListWithNamespace, []credit.Feature](
func(ctx context.Context, r *http.Request) (featureListWithNamespace, error) {
// TODO: add get arguments (limit, offset, archived)
return namespacedriver.Wrap(ctx, credit.ListFeaturesParams{}, h.NamespaceDecoder)
},
func(ctx context.Context, request featureListWithNamespace) ([]credit.Feature, error) {
return h.CreditConnector.ListFeatures(ctx, request.Namespace, request.Request)
},
commonhttp.JSONResponseEncoder,
httptransport.AppendOptions(
h.Options,
httptransport.WithOperationName("listFeatures"),
)...,
).ServeHTTP(w, r)
}

func (h *handler) DeleteFeature(ctx context.Context, w http.ResponseWriter, r *http.Request, featureID api.FeatureID) {
httptransport.NewHandler[featureIDWithNamespace, any](
func(ctx context.Context, r *http.Request) (featureIDWithNamespace, error) {
featureIDWithNs, err := namespacedriver.Wrap(ctx, featureID, h.NamespaceDecoder)
if err != nil {
return nil, err
}

if _, err := h.CreditConnector.GetFeature(ctx, featureIDWithNs.Namespace, featureID); err != nil {
return nil, err
}
return featureIDWithNs, nil
},
func(ctx context.Context, request featureIDWithNamespace) (any, error) {
return nil,
h.CreditConnector.DeleteFeature(ctx, request.Namespace, request.Request)
},
func(ctx context.Context, w http.ResponseWriter, response any) error {
w.WriteHeader(http.StatusNoContent)
return nil
},
httptransport.AppendOptions(
h.Options,
httptransport.WithOperationName("deleteFeature"),
httptransport.WithErrorEncoder(func(ctx context.Context, err error, w http.ResponseWriter) bool {
if _, ok := err.(*credit.FeatureNotFoundError); ok {
models.NewStatusProblem(ctx, err, http.StatusNotFound).Respond(w)
return true
}
return false
}),
)...,
).ServeHTTP(w, r)
}
8 changes: 4 additions & 4 deletions internal/ingest/httpingest/httpingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, namespace str
err, handled = h.processBatchRequest(ctx, w, r, namespace)
default:
// this should never happen
models.NewStatusProblem(ctx, errors.New("invalid content type: "+contentType), http.StatusBadRequest).Respond(w, r)
models.NewStatusProblem(ctx, errors.New("invalid content type: "+contentType), http.StatusBadRequest).Respond(w)
handled = true
}

Expand All @@ -71,7 +71,7 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, namespace str

if err != nil {
h.config.ErrorHandler.HandleContext(ctx, err)
models.NewStatusProblem(ctx, err, http.StatusInternalServerError).Respond(w, r)
models.NewStatusProblem(ctx, err, http.StatusInternalServerError).Respond(w)

return
}
Expand All @@ -84,7 +84,7 @@ func (h Handler) processBatchRequest(ctx context.Context, w http.ResponseWriter,

err := json.NewDecoder(r.Body).Decode(&events)
if err != nil {
models.NewStatusProblem(ctx, fmt.Errorf("parsing event: %w", err), http.StatusBadRequest).Respond(w, r)
models.NewStatusProblem(ctx, fmt.Errorf("parsing event: %w", err), http.StatusBadRequest).Respond(w)

return nil, true
}
Expand All @@ -104,7 +104,7 @@ func (h Handler) processSingleRequest(ctx context.Context, w http.ResponseWriter

err := json.NewDecoder(r.Body).Decode(&event)
if err != nil {
models.NewStatusProblem(ctx, fmt.Errorf("parsing event: %w", err), http.StatusBadRequest).Respond(w, r)
models.NewStatusProblem(ctx, fmt.Errorf("parsing event: %w", err), http.StatusBadRequest).Respond(w)

return nil, true
}
Expand Down
28 changes: 9 additions & 19 deletions internal/ingest/ingestdriver/http_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/openmeterio/openmeter/api"
"github.com/openmeterio/openmeter/internal/ingest"
"github.com/openmeterio/openmeter/internal/namespace/namespacedriver"
"github.com/openmeterio/openmeter/pkg/framework/operation"
"github.com/openmeterio/openmeter/pkg/framework/transport/httptransport"
"github.com/openmeterio/openmeter/pkg/models"
Expand All @@ -18,35 +19,24 @@ import (
// NewIngestEventsHandler returns a new HTTP handler that wraps the given [operation.Operation].
func NewIngestEventsHandler(
op operation.Operation[ingest.IngestEventsRequest, bool],
namespaceDecoder NamespaceDecoder,
namespaceDecoder namespacedriver.NamespaceDecoder,
commonErrorEncoder httptransport.ErrorEncoder,
errorHandler httptransport.ErrorHandler,
) http.Handler {
return httptransport.NewHandler(
op,
(ingestEventsRequestDecoder{
NamespaceDecoder: namespaceDecoder,
}).decode,
op,
encodeIngestEventsResponse,
(ingestEventsErrorEncoder{
httptransport.WithErrorEncoder((ingestEventsErrorEncoder{
CommonErrorEncoder: commonErrorEncoder,
}).encode,
}).encode),
httptransport.WithErrorHandler(errorHandler),
httptransport.WithOperationName("ingestEvents"),
)
}

// NamespaceDecoder gets the namespace from the request.
type NamespaceDecoder interface {
GetNamespace(ctx context.Context) (string, bool)
}

type StaticNamespaceDecoder string

func (d StaticNamespaceDecoder) GetNamespace(ctx context.Context) (string, bool) {
return string(d), true
}

type ErrorInvalidContentType struct {
ContentType string
}
Expand Down Expand Up @@ -82,7 +72,7 @@ func (e ErrorInvalidEvent) Message() string {
}

type ingestEventsRequestDecoder struct {
NamespaceDecoder NamespaceDecoder
NamespaceDecoder namespacedriver.NamespaceDecoder
}

func (d ingestEventsRequestDecoder) decode(ctx context.Context, r *http.Request) (ingest.IngestEventsRequest, error) {
Expand Down Expand Up @@ -139,13 +129,13 @@ type ingestEventsErrorEncoder struct {

func (e ingestEventsErrorEncoder) encode(ctx context.Context, err error, w http.ResponseWriter) bool {
if e := (ErrorInvalidContentType{}); errors.As(err, &e) {
models.NewStatusProblem(ctx, e, http.StatusBadRequest).Respond(w, nil)
models.NewStatusProblem(ctx, e, http.StatusBadRequest).Respond(w)

return true
}

if e := (ErrorInvalidEvent{}); errors.As(err, &e) {
models.NewStatusProblem(ctx, e, http.StatusBadRequest).Respond(w, nil)
models.NewStatusProblem(ctx, e, http.StatusBadRequest).Respond(w)

return true
}
Expand All @@ -154,7 +144,7 @@ func (e ingestEventsErrorEncoder) encode(ctx context.Context, err error, w http.
return e.CommonErrorEncoder(ctx, err, w)
}

models.NewStatusProblem(ctx, errors.New("something went wrong"), http.StatusInternalServerError).Respond(w, nil)
models.NewStatusProblem(ctx, errors.New("something went wrong"), http.StatusInternalServerError).Respond(w)

return false
}
7 changes: 4 additions & 3 deletions internal/ingest/ingestdriver/http_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/openmeterio/openmeter/internal/ingest"
"github.com/openmeterio/openmeter/internal/ingest/ingestdriver"
"github.com/openmeterio/openmeter/internal/namespace/namespacedriver"
"github.com/openmeterio/openmeter/pkg/errorsx"
)

Expand All @@ -30,7 +31,7 @@ func TestIngestEvents(t *testing.T) {

handler := ingestdriver.NewIngestEventsHandler(
service.IngestEvents,
ingestdriver.StaticNamespaceDecoder("test"),
namespacedriver.StaticNamespaceDecoder("test"),
nil,
errorsx.NewContextHandler(errorsx.NopHandler{}),
)
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestIngestEvents_InvalidEvent(t *testing.T) {

handler := ingestdriver.NewIngestEventsHandler(
service.IngestEvents,
ingestdriver.StaticNamespaceDecoder("test"),
namespacedriver.StaticNamespaceDecoder("test"),
nil,
errorsx.NewContextHandler(errorsx.NopHandler{}),
)
Expand All @@ -102,7 +103,7 @@ func TestBatchHandler(t *testing.T) {

handler := ingestdriver.NewIngestEventsHandler(
service.IngestEvents,
ingestdriver.StaticNamespaceDecoder("test"),
namespacedriver.StaticNamespaceDecoder("test"),
nil,
errorsx.NewContextHandler(errorsx.NopHandler{}),
)
Expand Down
14 changes: 14 additions & 0 deletions internal/namespace/namespacedriver/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package namespacedriver

import "context"

// NamespaceDecoder gets the namespace from the request.
type NamespaceDecoder interface {
GetNamespace(ctx context.Context) (string, bool)
}

type StaticNamespaceDecoder string

func (d StaticNamespaceDecoder) GetNamespace(ctx context.Context) (string, bool) {
return string(d), true
}
26 changes: 26 additions & 0 deletions internal/namespace/namespacedriver/wrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package namespacedriver

import (
"context"
"errors"
)

// TODO: move to the rigt place

type Wrapped[T any] struct {
Request T
Namespace string
}

func Wrap[T any](ctx context.Context, request T, resolver NamespaceDecoder) (*Wrapped[T], error) {
ns, found := resolver.GetNamespace(ctx)
// TODO: return error instead?
if !found {
return nil, errors.New("TODO")
}

return &Wrapped[T]{
Request: request,
Namespace: ns,
}, nil
}
Loading

0 comments on commit 4a8a032

Please sign in to comment.