Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(backend): Adding support for HTTP OTLP server #2412

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/collector/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ services:
volumes:
- ./collector.config.yaml:/otel-local-config.yaml
environment:
- TRACETEST_ENDPOINT=tracetest:21321
- TRACETEST_ENDPOINT=tracetest:4317
23 changes: 16 additions & 7 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,7 @@ func (app *App) Start(opts ...appOption) error {
return err
}

otlpServer := otlp.NewServer(":21321", testDB)
go otlpServer.Start()
app.registerStopFn(func() {
fmt.Println("stopping otlp server")
otlpServer.Stop()
})

registerOtlpServer(app, testDB)
provisioner := provisioning.New()

router, mappers := controller(app.cfg, testDB, tracer, rf, triggerRegistry)
Expand Down Expand Up @@ -287,7 +281,22 @@ func registerSPAHandler(router *mux.Router, cfg httpServerConfig, analyticsEnabl
Env,
),
)
}

func registerOtlpServer(app *App, testDB model.Repository) {
xoscar marked this conversation as resolved.
Show resolved Hide resolved
ingester := otlp.NewIngester(testDB)
grpcOtlpServer := otlp.NewGrpcServer(":4317", ingester)
httpOtlpServer := otlp.NewHttpServer(":4318", ingester)
go grpcOtlpServer.Start()
go httpOtlpServer.Start()

fmt.Println("OTLP server started on :4317 (grpc) and :4318 (http)")

app.registerStopFn(func() {
fmt.Println("stopping otlp server")
grpcOtlpServer.Stop()
httpOtlpServer.Stop()
})
}

func registerConfigResource(configRepo *configresource.Repository, router *mux.Router, db *sql.DB, provisioner *provisioning.Provisioner) {
Expand Down
3 changes: 2 additions & 1 deletion server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/stretchr/testify v1.8.1
github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569
go.opentelemetry.io/collector v0.44.0
go.opentelemetry.io/collector/pdata v0.49.0
go.opentelemetry.io/collector/semconv v0.71.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.28.0
go.opentelemetry.io/contrib/propagators/aws v1.5.0
Expand All @@ -50,6 +51,7 @@ require (
go.opentelemetry.io/proto/otlp v0.19.0
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15
google.golang.org/grpc v1.52.0
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
gotest.tools/v3 v3.1.0
Expand Down Expand Up @@ -107,7 +109,6 @@ require (
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

Expand Down
2 changes: 2 additions & 0 deletions server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1686,6 +1686,8 @@ go.opentelemetry.io/collector v0.44.0 h1:S/uoPVGNhv0MT/UbEmb7AffehAUTfVhH8YS2zQl
go.opentelemetry.io/collector v0.44.0/go.mod h1:JJbkoXqnUGAlxmPtbFzEVSVMaKIKAlzdbz8HE8Yo7Ew=
go.opentelemetry.io/collector/model v0.44.0 h1:I+M6X2NANYChOGYrpbxCoEYJah3eHdMvumKjothIAtA=
go.opentelemetry.io/collector/model v0.44.0/go.mod h1:4jo1R8uBDspLCxUGhQ0k3v/EFXFbW7s0AIy3LuGLbcU=
go.opentelemetry.io/collector/pdata v0.49.0 h1:aYj5rOlRC0x7lGXbc185LMsMMoY/pjOTXr5s1O2SzXs=
go.opentelemetry.io/collector/pdata v0.49.0/go.mod h1:YwmKuiFhNgtmhRdpi8Q8FAWPa0AwJTCSlssSsAtuRcY=
go.opentelemetry.io/collector/semconv v0.71.0 h1:g2bMdtciW2BmKximUxaF0L962U/EIlH9ms3iOGblCKE=
go.opentelemetry.io/collector/semconv v0.71.0/go.mod h1:UAp+qAMqEXOD0eEBmWJ3IJ5+LkF7zVTgmfufwpHmL8w=
go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc=
Expand Down
44 changes: 44 additions & 0 deletions server/otlp/grpc_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package otlp

import (
"context"
"fmt"
"net"

pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc"
)

type grpcServer struct {
pb.UnimplementedTraceServiceServer

addr string
ingester ingester

gServer *grpc.Server
}

func NewGrpcServer(addr string, ingester ingester) *grpcServer {
return &grpcServer{
addr: addr,
ingester: ingester,
}
}

func (s *grpcServer) Start() error {
s.gServer = grpc.NewServer()
listener, err := net.Listen("tcp", s.addr)
if err != nil {
return fmt.Errorf("cannot listen on address %s: %w", s.addr, err)
}
pb.RegisterTraceServiceServer(s.gServer, s)
return s.gServer.Serve(listener)
}

func (s *grpcServer) Stop() {
s.gServer.Stop()
}

func (s grpcServer) Export(ctx context.Context, request *pb.ExportTraceServiceRequest) (*pb.ExportTraceServiceResponse, error) {
return s.ingester.Ingest(ctx, request)
}
205 changes: 205 additions & 0 deletions server/otlp/http_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package otlp

import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"strings"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"

"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

const (
protoBufContentType = "application/x-protobuf"
jsonContentType = "application/json"
)

type httpServer struct {
addr string
ingester ingester

hServer *http.Server
}

func NewHttpServer(addr string, ingester ingester) *httpServer {
return &httpServer{
addr: addr,
ingester: ingester,
}
}

func (s *httpServer) Start() error {
r := mux.NewRouter()
r.HandleFunc("/v1/traces", s.Export).Methods("POST")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

niceee


s.hServer = &http.Server{
Addr: s.addr,
Handler: handlers.CompressHandler(decompressBodyHandler(handlers.ContentTypeHandler(r, protoBufContentType, jsonContentType))),
schoren marked this conversation as resolved.
Show resolved Hide resolved
}
listener, err := net.Listen("tcp", s.addr)
if err != nil {
return fmt.Errorf("cannot listen on address %s: %w", s.addr, err)
}

return s.hServer.Serve(listener)
}

func (s *httpServer) Stop() {
s.hServer.Close()
}

func (s httpServer) Export(w http.ResponseWriter, r *http.Request) {
contentType := r.Header.Get("content-type")
response := newHttpResponse(w, contentType)

request, err := s.parseBody(r.Body, contentType)
if err != nil {
response.sendError(http.StatusUnprocessableEntity, status.Errorf(codes.InvalidArgument, "Could not parse request body %s", err.Error()))
return
}

result, err := s.ingester.Ingest(r.Context(), request)
if err != nil {
response.sendError(http.StatusInternalServerError, status.Errorf(codes.InvalidArgument, "Error when ingesting spans %s", err.Error()))
return
}

response.send(http.StatusOK, result)
}

func (s httpServer) parseProtoBuf(body []byte) (*pb.ExportTraceServiceRequest, error) {
request := pb.ExportTraceServiceRequest{}

err := proto.Unmarshal(body, &request)
if err != nil {
return nil, err
}

return &request, nil
}

func (s httpServer) parseJson(body []byte) (*pb.ExportTraceServiceRequest, error) {
exportRequest := ptraceotlp.NewRequest()

err := exportRequest.UnmarshalJSON(body)
if err != nil {
return nil, err
}

protoBody, err := exportRequest.MarshalProto()
if err != nil {
return nil, err
}

return s.parseProtoBuf(protoBody)
}

func (s httpServer) parseBody(reqBody io.ReadCloser, contentType string) (*pb.ExportTraceServiceRequest, error) {
var body []byte
if b, err := io.ReadAll(reqBody); err == nil {
body = b
} else {
return nil, err
}

if len(body) == 0 {
return nil, fmt.Errorf("empty body")
}

if contentType == protoBufContentType {
return s.parseProtoBuf(body)
}

return s.parseJson(body)
}

type httpResponse struct {
w http.ResponseWriter
contentType string
}

func newHttpResponse(w http.ResponseWriter, contentType string) httpResponse {
return httpResponse{
w: w,
contentType: contentType,
}
}

func (r httpResponse) send(statusCode int, message proto.Message) error {
body, err := r.paseResponseBody(message)
if err != nil {
fmt.Println("Could not attach body to response", err.Error())
return err
}

r.w.WriteHeader(statusCode)
r.w.Write(body)

return nil
}

func (r httpResponse) sendError(code int, err error) {
rpcError, _ := status.FromError(err)

r.send(code, rpcError.Proto())
}

func (r httpResponse) paseResponseBody(data proto.Message) ([]byte, error) {
if r.contentType == protoBufContentType {
return proto.Marshal(data)
}

return json.Marshal(data)
}

func decompressBodyHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.Header.Get("content-encoding"), "gzip") {
compressedBody, err := decompressBody(r.Body)
if err != nil {
response := newHttpResponse(w, r.Header.Get("content-type"))
response.sendError(http.StatusUnprocessableEntity, status.Errorf(codes.InvalidArgument, "Could not parse request body %s", err.Error()))
return
}

r.Body = compressedBody
r.Header.Set("accept-encoding", "gzip")
}

h.ServeHTTP(w, r)
})
}

func decompressBody(reqBody io.ReadCloser) (io.ReadCloser, error) {
var body []byte
if b, err := io.ReadAll(reqBody); err == nil {
body = b
} else {
return nil, err
}

reader := bytes.NewReader(body)
gzReader, err := gzip.NewReader(reader)
if err != nil {
return nil, err
}

output, err := ioutil.ReadAll(gzReader)
if err != nil {
return nil, err
}

return io.NopCloser(bytes.NewReader(output)), nil
}