Skip to content

Commit

Permalink
feature(backend): Adding support for HTTP OTLP server (#2412)
Browse files Browse the repository at this point in the history
* creating http otlp server

* cleanup changes

* cleanup changes

* fixing example

* enabling JSON request body for the OTLP HTTP endpoint

* PR comments

* PR comments

* test

* manually installing dependencies
  • Loading branch information
xoscar committed Apr 24, 2023
1 parent d6b72fd commit d1f0b31
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 44 deletions.
2 changes: 1 addition & 1 deletion examples/collector/docker-compose.yml
Expand Up @@ -49,4 +49,4 @@ services:
volumes:
- ./collector.config.yaml:/otel-local-config.yaml
environment:
- TRACETEST_ENDPOINT=tracetest:21321
- TRACETEST_ENDPOINT=tracetest:4317
23 changes: 16 additions & 7 deletions server/app/app.go
Expand Up @@ -225,13 +225,7 @@ func (app *App) Start(opts ...appOption) error {
return err
}

otlpServer := otlp.NewServer(":21321", testDB)
go otlpServer.Start()
app.registerStopFn(func() {
fmt.Println("stopping otlp server")
otlpServer.Stop()
})

registerOtlpServer(app, testDB)
provisioner := provisioning.New()

router, mappers := controller(app.cfg, testDB, tracer, rf, triggerRegistry)
Expand Down Expand Up @@ -287,7 +281,22 @@ func registerSPAHandler(router *mux.Router, cfg httpServerConfig, analyticsEnabl
Env,
),
)
}

func registerOtlpServer(app *App, testDB model.Repository) {
ingester := otlp.NewIngester(testDB)
grpcOtlpServer := otlp.NewGrpcServer(":4317", ingester)
httpOtlpServer := otlp.NewHttpServer(":4318", ingester)
go grpcOtlpServer.Start()
go httpOtlpServer.Start()

fmt.Println("OTLP server started on :4317 (grpc) and :4318 (http)")

app.registerStopFn(func() {
fmt.Println("stopping otlp server")
grpcOtlpServer.Stop()
httpOtlpServer.Stop()
})
}

func registerConfigResource(configRepo *configresource.Repository, router *mux.Router, db *sql.DB, provisioner *provisioning.Provisioner) {
Expand Down
3 changes: 2 additions & 1 deletion server/go.mod
Expand Up @@ -37,6 +37,7 @@ require (
github.com/stretchr/testify v1.8.1
github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569
go.opentelemetry.io/collector v0.44.0
go.opentelemetry.io/collector/pdata v0.49.0
go.opentelemetry.io/collector/semconv v0.71.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.28.0
go.opentelemetry.io/contrib/propagators/aws v1.5.0
Expand All @@ -50,6 +51,7 @@ require (
go.opentelemetry.io/proto/otlp v0.19.0
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15
google.golang.org/grpc v1.52.0
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
gotest.tools/v3 v3.1.0
Expand Down Expand Up @@ -107,7 +109,6 @@ require (
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

Expand Down
2 changes: 2 additions & 0 deletions server/go.sum
Expand Up @@ -1686,6 +1686,8 @@ go.opentelemetry.io/collector v0.44.0 h1:S/uoPVGNhv0MT/UbEmb7AffehAUTfVhH8YS2zQl
go.opentelemetry.io/collector v0.44.0/go.mod h1:JJbkoXqnUGAlxmPtbFzEVSVMaKIKAlzdbz8HE8Yo7Ew=
go.opentelemetry.io/collector/model v0.44.0 h1:I+M6X2NANYChOGYrpbxCoEYJah3eHdMvumKjothIAtA=
go.opentelemetry.io/collector/model v0.44.0/go.mod h1:4jo1R8uBDspLCxUGhQ0k3v/EFXFbW7s0AIy3LuGLbcU=
go.opentelemetry.io/collector/pdata v0.49.0 h1:aYj5rOlRC0x7lGXbc185LMsMMoY/pjOTXr5s1O2SzXs=
go.opentelemetry.io/collector/pdata v0.49.0/go.mod h1:YwmKuiFhNgtmhRdpi8Q8FAWPa0AwJTCSlssSsAtuRcY=
go.opentelemetry.io/collector/semconv v0.71.0 h1:g2bMdtciW2BmKximUxaF0L962U/EIlH9ms3iOGblCKE=
go.opentelemetry.io/collector/semconv v0.71.0/go.mod h1:UAp+qAMqEXOD0eEBmWJ3IJ5+LkF7zVTgmfufwpHmL8w=
go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc=
Expand Down
44 changes: 44 additions & 0 deletions server/otlp/grpc_server.go
@@ -0,0 +1,44 @@
package otlp

import (
"context"
"fmt"
"net"

pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc"
)

type grpcServer struct {
pb.UnimplementedTraceServiceServer

addr string
ingester ingester

gServer *grpc.Server
}

func NewGrpcServer(addr string, ingester ingester) *grpcServer {
return &grpcServer{
addr: addr,
ingester: ingester,
}
}

func (s *grpcServer) Start() error {
s.gServer = grpc.NewServer()
listener, err := net.Listen("tcp", s.addr)
if err != nil {
return fmt.Errorf("cannot listen on address %s: %w", s.addr, err)
}
pb.RegisterTraceServiceServer(s.gServer, s)
return s.gServer.Serve(listener)
}

func (s *grpcServer) Stop() {
s.gServer.Stop()
}

func (s grpcServer) Export(ctx context.Context, request *pb.ExportTraceServiceRequest) (*pb.ExportTraceServiceResponse, error) {
return s.ingester.Ingest(ctx, request)
}
205 changes: 205 additions & 0 deletions server/otlp/http_server.go
@@ -0,0 +1,205 @@
package otlp

import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"strings"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"

"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

const (
protoBufContentType = "application/x-protobuf"
jsonContentType = "application/json"
)

type httpServer struct {
addr string
ingester ingester

hServer *http.Server
}

func NewHttpServer(addr string, ingester ingester) *httpServer {
return &httpServer{
addr: addr,
ingester: ingester,
}
}

func (s *httpServer) Start() error {
r := mux.NewRouter()
r.HandleFunc("/v1/traces", s.Export).Methods("POST")

s.hServer = &http.Server{
Addr: s.addr,
Handler: handlers.CompressHandler(decompressBodyHandler(handlers.ContentTypeHandler(r, protoBufContentType, jsonContentType))),
}
listener, err := net.Listen("tcp", s.addr)
if err != nil {
return fmt.Errorf("cannot listen on address %s: %w", s.addr, err)
}

return s.hServer.Serve(listener)
}

func (s *httpServer) Stop() {
s.hServer.Close()
}

func (s httpServer) Export(w http.ResponseWriter, r *http.Request) {
contentType := r.Header.Get("content-type")
response := newHttpResponse(w, contentType)

request, err := s.parseBody(r.Body, contentType)
if err != nil {
response.sendError(http.StatusUnprocessableEntity, status.Errorf(codes.InvalidArgument, "Could not parse request body %s", err.Error()))
return
}

result, err := s.ingester.Ingest(r.Context(), request)
if err != nil {
response.sendError(http.StatusInternalServerError, status.Errorf(codes.InvalidArgument, "Error when ingesting spans %s", err.Error()))
return
}

response.send(http.StatusOK, result)
}

func (s httpServer) parseProtoBuf(body []byte) (*pb.ExportTraceServiceRequest, error) {
request := pb.ExportTraceServiceRequest{}

err := proto.Unmarshal(body, &request)
if err != nil {
return nil, err
}

return &request, nil
}

func (s httpServer) parseJson(body []byte) (*pb.ExportTraceServiceRequest, error) {
exportRequest := ptraceotlp.NewRequest()

err := exportRequest.UnmarshalJSON(body)
if err != nil {
return nil, err
}

protoBody, err := exportRequest.MarshalProto()
if err != nil {
return nil, err
}

return s.parseProtoBuf(protoBody)
}

func (s httpServer) parseBody(reqBody io.ReadCloser, contentType string) (*pb.ExportTraceServiceRequest, error) {
var body []byte
if b, err := io.ReadAll(reqBody); err == nil {
body = b
} else {
return nil, err
}

if len(body) == 0 {
return nil, fmt.Errorf("empty body")
}

if contentType == protoBufContentType {
return s.parseProtoBuf(body)
}

return s.parseJson(body)
}

type httpResponse struct {
w http.ResponseWriter
contentType string
}

func newHttpResponse(w http.ResponseWriter, contentType string) httpResponse {
return httpResponse{
w: w,
contentType: contentType,
}
}

func (r httpResponse) send(statusCode int, message proto.Message) error {
body, err := r.paseResponseBody(message)
if err != nil {
fmt.Println("Could not attach body to response", err.Error())
return err
}

r.w.WriteHeader(statusCode)
r.w.Write(body)

return nil
}

func (r httpResponse) sendError(code int, err error) {
rpcError, _ := status.FromError(err)

r.send(code, rpcError.Proto())
}

func (r httpResponse) paseResponseBody(data proto.Message) ([]byte, error) {
if r.contentType == protoBufContentType {
return proto.Marshal(data)
}

return json.Marshal(data)
}

func decompressBodyHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.Header.Get("content-encoding"), "gzip") {
compressedBody, err := decompressBody(r.Body)
if err != nil {
response := newHttpResponse(w, r.Header.Get("content-type"))
response.sendError(http.StatusUnprocessableEntity, status.Errorf(codes.InvalidArgument, "Could not parse request body %s", err.Error()))
return
}

r.Body = compressedBody
r.Header.Set("accept-encoding", "gzip")
}

h.ServeHTTP(w, r)
})
}

func decompressBody(reqBody io.ReadCloser) (io.ReadCloser, error) {
var body []byte
if b, err := io.ReadAll(reqBody); err == nil {
body = b
} else {
return nil, err
}

reader := bytes.NewReader(body)
gzReader, err := gzip.NewReader(reader)
if err != nil {
return nil, err
}

output, err := ioutil.ReadAll(gzReader)
if err != nil {
return nil, err
}

return io.NopCloser(bytes.NewReader(output)), nil
}

0 comments on commit d1f0b31

Please sign in to comment.