Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(backend): Adding support for HTTP OTLP server #2412

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/collector/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ services:
volumes:
- ./collector.config.yaml:/otel-local-config.yaml
environment:
- TRACETEST_ENDPOINT=tracetest:21321
- TRACETEST_ENDPOINT=tracetest:4317
23 changes: 16 additions & 7 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,7 @@ func (app *App) Start(opts ...appOption) error {
return err
}

otlpServer := otlp.NewServer(":21321", testDB)
go otlpServer.Start()
app.registerStopFn(func() {
fmt.Println("stopping otlp server")
otlpServer.Stop()
})

registerOtlpServer(app, testDB)
provisioner := provisioning.New()

router, mappers := controller(app.cfg, testDB, tracer, rf, triggerRegistry)
Expand Down Expand Up @@ -287,7 +281,22 @@ func registerSPAHandler(router *mux.Router, cfg httpServerConfig, analyticsEnabl
Env,
),
)
}

func registerOtlpServer(app *App, testDB model.Repository) {
xoscar marked this conversation as resolved.
Show resolved Hide resolved
exporter := otlp.NewExporter(testDB)
grpcOtlpServer := otlp.NewGrpcServer(":4317", exporter)
httpOtlpServer := otlp.NewHttpServer(":4318", exporter)
go grpcOtlpServer.Start()
go httpOtlpServer.Start()

fmt.Println("OTLP server started on :4317 (grpc) and :4318 (http)")

app.registerStopFn(func() {
fmt.Println("stopping otlp server")
grpcOtlpServer.Stop()
httpOtlpServer.Stop()
})
}

func registerConfigResource(configRepo *configresource.Repository, router *mux.Router, db *sql.DB, provisioner *provisioning.Provisioner) {
Expand Down
50 changes: 17 additions & 33 deletions server/otlp/server.go → server/otlp/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,33 @@ package otlp
import (
"context"
"fmt"
"net"
"strings"

"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/traces"
"go.opentelemetry.io/otel/trace"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
"google.golang.org/grpc"
)

type Server struct {
pb.UnimplementedTraceServiceServer

addr string
db model.Repository

gServer *grpc.Server
type IExporter interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually don't prefix interfaces with I in go. Also it looks like this interface is not really required. You can just reference the actual struct. And given that this is only used internally within this package, you can even unexport it (a.k.a lowercasing the first letter)

Ingest(ctx context.Context, request *pb.ExportTraceServiceRequest) (*pb.ExportTraceServiceResponse, error)
}

func NewServer(addr string, db model.Repository) *Server {
return &Server{
addr: addr,
db: db,
}
type Exporter struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit weird that an Exporter only exposes an Ingest method. I would rename either the struct or the method to look a bit more consistent

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, you can unexport this struct, since it's only directly referenced within the package

db model.Repository
}

func (s *Server) Start() error {
s.gServer = grpc.NewServer()
listener, err := net.Listen("tcp", s.addr)
if err != nil {
return fmt.Errorf("cannot listen on address %s: %w", s.addr, err)
}
pb.RegisterTraceServiceServer(s.gServer, s)
return s.gServer.Serve(listener)
}
var _ IExporter = Exporter{}

func (s *Server) Stop() {
s.gServer.Stop()
func NewExporter(db model.Repository) Exporter {
return Exporter{
db: db,
}
}

func (s Server) Export(ctx context.Context, request *pb.ExportTraceServiceRequest) (*pb.ExportTraceServiceResponse, error) {
ds, err := s.db.DefaultDataStore(ctx)
func (e Exporter) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequest) (*pb.ExportTraceServiceResponse, error) {
ds, err := e.db.DefaultDataStore(ctx)

if err != nil || !ds.IsOTLPBasedProvider() {
fmt.Println("OTLP server is not enabled. Ignoring request")
Expand All @@ -56,10 +40,10 @@ func (s Server) Export(ctx context.Context, request *pb.ExportTraceServiceReques
return &pb.ExportTraceServiceResponse{}, nil
}

spansByTrace := s.getSpansByTrace(request)
spansByTrace := e.getSpansByTrace(request)

for traceID, spans := range spansByTrace {
s.saveSpansIntoTest(ctx, traceID, spans)
e.saveSpansIntoTest(ctx, traceID, spans)
}

return &pb.ExportTraceServiceResponse{
Expand All @@ -69,7 +53,7 @@ func (s Server) Export(ctx context.Context, request *pb.ExportTraceServiceReques
}, nil
}

func (s Server) getSpansByTrace(request *pb.ExportTraceServiceRequest) map[trace.TraceID][]model.Span {
func (e Exporter) getSpansByTrace(request *pb.ExportTraceServiceRequest) map[trace.TraceID][]model.Span {
otelSpans := make([]*v1.Span, 0)
for _, resourceSpan := range request.ResourceSpans {
for _, spans := range resourceSpan.ScopeSpans {
Expand All @@ -95,8 +79,8 @@ func (s Server) getSpansByTrace(request *pb.ExportTraceServiceRequest) map[trace
return spansByTrace
}

func (s Server) saveSpansIntoTest(ctx context.Context, traceID trace.TraceID, spans []model.Span) error {
run, err := s.db.GetRunByTraceID(ctx, traceID)
func (e Exporter) saveSpansIntoTest(ctx context.Context, traceID trace.TraceID, spans []model.Span) error {
run, err := e.db.GetRunByTraceID(ctx, traceID)
if err != nil && strings.Contains(err.Error(), "record not found") {
// span is not part of any known test run. So it will be ignored
return nil
Expand Down Expand Up @@ -128,7 +112,7 @@ func (s Server) saveSpansIntoTest(ctx context.Context, traceID trace.TraceID, sp

run.Trace = &newTrace

err = s.db.UpdateRun(ctx, run)
err = e.db.UpdateRun(ctx, run)
if err != nil {
return fmt.Errorf("could not update run: %w", err)
}
Expand Down
44 changes: 44 additions & 0 deletions server/otlp/grpc_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package otlp

import (
"context"
"fmt"
"net"

pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc"
)

type GrpcServer struct {
xoscar marked this conversation as resolved.
Show resolved Hide resolved
pb.UnimplementedTraceServiceServer

addr string
exporter IExporter

gServer *grpc.Server
}

func NewGrpcServer(addr string, exporter IExporter) *GrpcServer {
return &GrpcServer{
addr: addr,
exporter: exporter,
}
}

func (s *GrpcServer) Start() error {
s.gServer = grpc.NewServer()
listener, err := net.Listen("tcp", s.addr)
if err != nil {
return fmt.Errorf("cannot listen on address %s: %w", s.addr, err)
}
pb.RegisterTraceServiceServer(s.gServer, s)
return s.gServer.Serve(listener)
}

func (s *GrpcServer) Stop() {
s.gServer.Stop()
}

func (s GrpcServer) Export(ctx context.Context, request *pb.ExportTraceServiceRequest) (*pb.ExportTraceServiceResponse, error) {
return s.exporter.Ingest(ctx, request)
}