From d1f0b312131c63aa7b6a07645736076cc37290c3 Mon Sep 17 00:00:00 2001 From: Oscar Reyes Date: Mon, 24 Apr 2023 10:16:53 -0600 Subject: [PATCH] feature(backend): Adding support for HTTP OTLP server (#2412) * creating http otlp server * cleanup changes * cleanup changes * fixing example * enabling JSON request body for the OTLP HTTP endpoint * PR comments * PR comments * test * manually installing dependencies --- examples/collector/docker-compose.yml | 2 +- server/app/app.go | 23 ++- server/go.mod | 3 +- server/go.sum | 2 + server/otlp/grpc_server.go | 44 ++++++ server/otlp/http_server.go | 205 +++++++++++++++++++++++++ server/otlp/{server.go => ingester.go} | 48 ++---- 7 files changed, 283 insertions(+), 44 deletions(-) create mode 100644 server/otlp/grpc_server.go create mode 100644 server/otlp/http_server.go rename server/otlp/{server.go => ingester.go} (69%) diff --git a/examples/collector/docker-compose.yml b/examples/collector/docker-compose.yml index c209ddff3f..b775e9a039 100644 --- a/examples/collector/docker-compose.yml +++ b/examples/collector/docker-compose.yml @@ -49,4 +49,4 @@ services: volumes: - ./collector.config.yaml:/otel-local-config.yaml environment: - - TRACETEST_ENDPOINT=tracetest:21321 + - TRACETEST_ENDPOINT=tracetest:4317 diff --git a/server/app/app.go b/server/app/app.go index 6853376eb8..8efc50c076 100644 --- a/server/app/app.go +++ b/server/app/app.go @@ -225,13 +225,7 @@ func (app *App) Start(opts ...appOption) error { return err } - otlpServer := otlp.NewServer(":21321", testDB) - go otlpServer.Start() - app.registerStopFn(func() { - fmt.Println("stopping otlp server") - otlpServer.Stop() - }) - + registerOtlpServer(app, testDB) provisioner := provisioning.New() router, mappers := controller(app.cfg, testDB, tracer, rf, triggerRegistry) @@ -287,7 +281,22 @@ func registerSPAHandler(router *mux.Router, cfg httpServerConfig, analyticsEnabl Env, ), ) +} +func registerOtlpServer(app *App, testDB model.Repository) { + ingester := otlp.NewIngester(testDB) + grpcOtlpServer := otlp.NewGrpcServer(":4317", ingester) + httpOtlpServer := otlp.NewHttpServer(":4318", ingester) + go grpcOtlpServer.Start() + go httpOtlpServer.Start() + + fmt.Println("OTLP server started on :4317 (grpc) and :4318 (http)") + + app.registerStopFn(func() { + fmt.Println("stopping otlp server") + grpcOtlpServer.Stop() + httpOtlpServer.Stop() + }) } func registerConfigResource(configRepo *configresource.Repository, router *mux.Router, db *sql.DB, provisioner *provisioning.Provisioner) { diff --git a/server/go.mod b/server/go.mod index 4ca5baca34..62cff7dd2f 100644 --- a/server/go.mod +++ b/server/go.mod @@ -37,6 +37,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569 go.opentelemetry.io/collector v0.44.0 + go.opentelemetry.io/collector/pdata v0.49.0 go.opentelemetry.io/collector/semconv v0.71.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.28.0 go.opentelemetry.io/contrib/propagators/aws v1.5.0 @@ -50,6 +51,7 @@ require ( go.opentelemetry.io/proto/otlp v0.19.0 golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 google.golang.org/grpc v1.52.0 + google.golang.org/protobuf v1.28.1 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 gotest.tools/v3 v3.1.0 @@ -107,7 +109,6 @@ require ( golang.org/x/sys v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect - google.golang.org/protobuf v1.28.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) diff --git a/server/go.sum b/server/go.sum index e741805213..b1b2c103c8 100644 --- a/server/go.sum +++ b/server/go.sum @@ -1686,6 +1686,8 @@ go.opentelemetry.io/collector v0.44.0 h1:S/uoPVGNhv0MT/UbEmb7AffehAUTfVhH8YS2zQl go.opentelemetry.io/collector v0.44.0/go.mod h1:JJbkoXqnUGAlxmPtbFzEVSVMaKIKAlzdbz8HE8Yo7Ew= go.opentelemetry.io/collector/model v0.44.0 h1:I+M6X2NANYChOGYrpbxCoEYJah3eHdMvumKjothIAtA= go.opentelemetry.io/collector/model v0.44.0/go.mod h1:4jo1R8uBDspLCxUGhQ0k3v/EFXFbW7s0AIy3LuGLbcU= +go.opentelemetry.io/collector/pdata v0.49.0 h1:aYj5rOlRC0x7lGXbc185LMsMMoY/pjOTXr5s1O2SzXs= +go.opentelemetry.io/collector/pdata v0.49.0/go.mod h1:YwmKuiFhNgtmhRdpi8Q8FAWPa0AwJTCSlssSsAtuRcY= go.opentelemetry.io/collector/semconv v0.71.0 h1:g2bMdtciW2BmKximUxaF0L962U/EIlH9ms3iOGblCKE= go.opentelemetry.io/collector/semconv v0.71.0/go.mod h1:UAp+qAMqEXOD0eEBmWJ3IJ5+LkF7zVTgmfufwpHmL8w= go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc= diff --git a/server/otlp/grpc_server.go b/server/otlp/grpc_server.go new file mode 100644 index 0000000000..cb5dff28f0 --- /dev/null +++ b/server/otlp/grpc_server.go @@ -0,0 +1,44 @@ +package otlp + +import ( + "context" + "fmt" + "net" + + pb "go.opentelemetry.io/proto/otlp/collector/trace/v1" + "google.golang.org/grpc" +) + +type grpcServer struct { + pb.UnimplementedTraceServiceServer + + addr string + ingester ingester + + gServer *grpc.Server +} + +func NewGrpcServer(addr string, ingester ingester) *grpcServer { + return &grpcServer{ + addr: addr, + ingester: ingester, + } +} + +func (s *grpcServer) Start() error { + s.gServer = grpc.NewServer() + listener, err := net.Listen("tcp", s.addr) + if err != nil { + return fmt.Errorf("cannot listen on address %s: %w", s.addr, err) + } + pb.RegisterTraceServiceServer(s.gServer, s) + return s.gServer.Serve(listener) +} + +func (s *grpcServer) Stop() { + s.gServer.Stop() +} + +func (s grpcServer) Export(ctx context.Context, request *pb.ExportTraceServiceRequest) (*pb.ExportTraceServiceResponse, error) { + return s.ingester.Ingest(ctx, request) +} diff --git a/server/otlp/http_server.go b/server/otlp/http_server.go new file mode 100644 index 0000000000..5d113d6ed8 --- /dev/null +++ b/server/otlp/http_server.go @@ -0,0 +1,205 @@ +package otlp + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "strings" + + "github.com/gorilla/handlers" + "github.com/gorilla/mux" + + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + pb "go.opentelemetry.io/proto/otlp/collector/trace/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +const ( + protoBufContentType = "application/x-protobuf" + jsonContentType = "application/json" +) + +type httpServer struct { + addr string + ingester ingester + + hServer *http.Server +} + +func NewHttpServer(addr string, ingester ingester) *httpServer { + return &httpServer{ + addr: addr, + ingester: ingester, + } +} + +func (s *httpServer) Start() error { + r := mux.NewRouter() + r.HandleFunc("/v1/traces", s.Export).Methods("POST") + + s.hServer = &http.Server{ + Addr: s.addr, + Handler: handlers.CompressHandler(decompressBodyHandler(handlers.ContentTypeHandler(r, protoBufContentType, jsonContentType))), + } + listener, err := net.Listen("tcp", s.addr) + if err != nil { + return fmt.Errorf("cannot listen on address %s: %w", s.addr, err) + } + + return s.hServer.Serve(listener) +} + +func (s *httpServer) Stop() { + s.hServer.Close() +} + +func (s httpServer) Export(w http.ResponseWriter, r *http.Request) { + contentType := r.Header.Get("content-type") + response := newHttpResponse(w, contentType) + + request, err := s.parseBody(r.Body, contentType) + if err != nil { + response.sendError(http.StatusUnprocessableEntity, status.Errorf(codes.InvalidArgument, "Could not parse request body %s", err.Error())) + return + } + + result, err := s.ingester.Ingest(r.Context(), request) + if err != nil { + response.sendError(http.StatusInternalServerError, status.Errorf(codes.InvalidArgument, "Error when ingesting spans %s", err.Error())) + return + } + + response.send(http.StatusOK, result) +} + +func (s httpServer) parseProtoBuf(body []byte) (*pb.ExportTraceServiceRequest, error) { + request := pb.ExportTraceServiceRequest{} + + err := proto.Unmarshal(body, &request) + if err != nil { + return nil, err + } + + return &request, nil +} + +func (s httpServer) parseJson(body []byte) (*pb.ExportTraceServiceRequest, error) { + exportRequest := ptraceotlp.NewRequest() + + err := exportRequest.UnmarshalJSON(body) + if err != nil { + return nil, err + } + + protoBody, err := exportRequest.MarshalProto() + if err != nil { + return nil, err + } + + return s.parseProtoBuf(protoBody) +} + +func (s httpServer) parseBody(reqBody io.ReadCloser, contentType string) (*pb.ExportTraceServiceRequest, error) { + var body []byte + if b, err := io.ReadAll(reqBody); err == nil { + body = b + } else { + return nil, err + } + + if len(body) == 0 { + return nil, fmt.Errorf("empty body") + } + + if contentType == protoBufContentType { + return s.parseProtoBuf(body) + } + + return s.parseJson(body) +} + +type httpResponse struct { + w http.ResponseWriter + contentType string +} + +func newHttpResponse(w http.ResponseWriter, contentType string) httpResponse { + return httpResponse{ + w: w, + contentType: contentType, + } +} + +func (r httpResponse) send(statusCode int, message proto.Message) error { + body, err := r.paseResponseBody(message) + if err != nil { + fmt.Println("Could not attach body to response", err.Error()) + return err + } + + r.w.WriteHeader(statusCode) + r.w.Write(body) + + return nil +} + +func (r httpResponse) sendError(code int, err error) { + rpcError, _ := status.FromError(err) + + r.send(code, rpcError.Proto()) +} + +func (r httpResponse) paseResponseBody(data proto.Message) ([]byte, error) { + if r.contentType == protoBufContentType { + return proto.Marshal(data) + } + + return json.Marshal(data) +} + +func decompressBodyHandler(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.Header.Get("content-encoding"), "gzip") { + compressedBody, err := decompressBody(r.Body) + if err != nil { + response := newHttpResponse(w, r.Header.Get("content-type")) + response.sendError(http.StatusUnprocessableEntity, status.Errorf(codes.InvalidArgument, "Could not parse request body %s", err.Error())) + return + } + + r.Body = compressedBody + r.Header.Set("accept-encoding", "gzip") + } + + h.ServeHTTP(w, r) + }) +} + +func decompressBody(reqBody io.ReadCloser) (io.ReadCloser, error) { + var body []byte + if b, err := io.ReadAll(reqBody); err == nil { + body = b + } else { + return nil, err + } + + reader := bytes.NewReader(body) + gzReader, err := gzip.NewReader(reader) + if err != nil { + return nil, err + } + + output, err := ioutil.ReadAll(gzReader) + if err != nil { + return nil, err + } + + return io.NopCloser(bytes.NewReader(output)), nil +} diff --git a/server/otlp/server.go b/server/otlp/ingester.go similarity index 69% rename from server/otlp/server.go rename to server/otlp/ingester.go index 35e4353407..1ef3417915 100644 --- a/server/otlp/server.go +++ b/server/otlp/ingester.go @@ -3,7 +3,6 @@ package otlp import ( "context" "fmt" - "net" "strings" "github.com/kubeshop/tracetest/server/model" @@ -11,41 +10,20 @@ import ( "go.opentelemetry.io/otel/trace" pb "go.opentelemetry.io/proto/otlp/collector/trace/v1" v1 "go.opentelemetry.io/proto/otlp/trace/v1" - "google.golang.org/grpc" ) -type Server struct { - pb.UnimplementedTraceServiceServer - - addr string - db model.Repository - - gServer *grpc.Server -} - -func NewServer(addr string, db model.Repository) *Server { - return &Server{ - addr: addr, - db: db, - } +type ingester struct { + db model.Repository } -func (s *Server) Start() error { - s.gServer = grpc.NewServer() - listener, err := net.Listen("tcp", s.addr) - if err != nil { - return fmt.Errorf("cannot listen on address %s: %w", s.addr, err) +func NewIngester(db model.Repository) ingester { + return ingester{ + db: db, } - pb.RegisterTraceServiceServer(s.gServer, s) - return s.gServer.Serve(listener) -} - -func (s *Server) Stop() { - s.gServer.Stop() } -func (s Server) Export(ctx context.Context, request *pb.ExportTraceServiceRequest) (*pb.ExportTraceServiceResponse, error) { - ds, err := s.db.DefaultDataStore(ctx) +func (i ingester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequest) (*pb.ExportTraceServiceResponse, error) { + ds, err := i.db.DefaultDataStore(ctx) if err != nil || !ds.IsOTLPBasedProvider() { fmt.Println("OTLP server is not enabled. Ignoring request") @@ -56,10 +34,10 @@ func (s Server) Export(ctx context.Context, request *pb.ExportTraceServiceReques return &pb.ExportTraceServiceResponse{}, nil } - spansByTrace := s.getSpansByTrace(request) + spansByTrace := i.getSpansByTrace(request) for traceID, spans := range spansByTrace { - s.saveSpansIntoTest(ctx, traceID, spans) + i.saveSpansIntoTest(ctx, traceID, spans) } return &pb.ExportTraceServiceResponse{ @@ -69,7 +47,7 @@ func (s Server) Export(ctx context.Context, request *pb.ExportTraceServiceReques }, nil } -func (s Server) getSpansByTrace(request *pb.ExportTraceServiceRequest) map[trace.TraceID][]model.Span { +func (i ingester) getSpansByTrace(request *pb.ExportTraceServiceRequest) map[trace.TraceID][]model.Span { otelSpans := make([]*v1.Span, 0) for _, resourceSpan := range request.ResourceSpans { for _, spans := range resourceSpan.ScopeSpans { @@ -95,8 +73,8 @@ func (s Server) getSpansByTrace(request *pb.ExportTraceServiceRequest) map[trace return spansByTrace } -func (s Server) saveSpansIntoTest(ctx context.Context, traceID trace.TraceID, spans []model.Span) error { - run, err := s.db.GetRunByTraceID(ctx, traceID) +func (e ingester) saveSpansIntoTest(ctx context.Context, traceID trace.TraceID, spans []model.Span) error { + run, err := e.db.GetRunByTraceID(ctx, traceID) if err != nil && strings.Contains(err.Error(), "record not found") { // span is not part of any known test run. So it will be ignored return nil @@ -128,7 +106,7 @@ func (s Server) saveSpansIntoTest(ctx context.Context, traceID trace.TraceID, sp run.Trace = &newTrace - err = s.db.UpdateRun(ctx, run) + err = e.db.UpdateRun(ctx, run) if err != nil { return fmt.Errorf("could not update run: %w", err) }