Skip to content

Commit

Permalink
feat(agent): in-memory datastore to support otlp datastores (#3176)
Browse files Browse the repository at this point in the history
* feat(agent): in-memory datastore to support otlp datastores

* fix comment
  • Loading branch information
mathnogueira authored Sep 21, 2023
1 parent 34529cd commit 4349562
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 8 deletions.
5 changes: 4 additions & 1 deletion agent/initialization/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/kubeshop/tracetest/agent/config"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/workers"
"github.com/kubeshop/tracetest/agent/workers/poller"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -22,7 +23,9 @@ func NewClient(ctx context.Context, config config.Config, traceCache collector.T
}

triggerWorker := workers.NewTriggerWorker(client, workers.WithTraceCache(traceCache))
pollingWorker := workers.NewPollerWorker(client)
pollingWorker := workers.NewPollerWorker(client, workers.WithInMemoryDatastore(
poller.NewInMemoryDatastore(traceCache),
))
dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(client)

client.OnDataStoreTestConnectionRequest(dataStoreTestConnectionWorker.Test)
Expand Down
33 changes: 27 additions & 6 deletions agent/workers/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,43 @@ import (
"github.com/fluidtruck/deepcopy"
"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/workers/datastores/connection"
"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/tracedb"
"github.com/kubeshop/tracetest/server/tracedb/connection"
"github.com/kubeshop/tracetest/server/traces"
"go.opentelemetry.io/otel/trace"
)

type PollerWorker struct {
client *client.Client
tracer trace.Tracer
sentSpanIDs *gocache.Cache[string, bool]
client *client.Client
tracer trace.Tracer
sentSpanIDs *gocache.Cache[string, bool]
inmemoryDatastore tracedb.TraceDB
}

func NewPollerWorker(client *client.Client) *PollerWorker {
type PollerOption func(*PollerWorker)

func WithInMemoryDatastore(datastore tracedb.TraceDB) PollerOption {
return func(pw *PollerWorker) {
pw.inmemoryDatastore = datastore
}
}

func NewPollerWorker(client *client.Client, opts ...PollerOption) *PollerWorker {
// TODO: use a real tracer
tracer := trace.NewNoopTracerProvider().Tracer("noop")

return &PollerWorker{
pollerWorker := &PollerWorker{
client: client,
tracer: tracer,
sentSpanIDs: gocache.New[string, bool](),
}

for _, opt := range opts {
opt(pollerWorker)
}

return pollerWorker
}

func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest) error {
Expand All @@ -52,6 +67,10 @@ func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest)
return err
}

if datastoreConfig.IsOTLPBasedProvider() && w.inmemoryDatastore != nil {
ds = w.inmemoryDatastore
}

pollingResponse := &proto.PollingResponse{
RequestID: request.RequestID,
TestID: request.TestID,
Expand Down Expand Up @@ -114,10 +133,12 @@ func convertProtoToDataStore(r *proto.DataStore) (*datastore.DataStore, error) {
if r.Tempo != nil {
ds.Values.Tempo = &datastore.MultiChannelClientConfig{}
if r.Tempo.Grpc != nil {
ds.Values.Tempo.Type = datastore.MultiChannelClientTypeGRPC
ds.Values.Tempo.Grpc = &datastore.GRPCClientSettings{}
deepcopy.DeepCopy(r.Tempo.Grpc, &ds.Values.Tempo.Grpc)
}
if r.Tempo.Http != nil {
ds.Values.Tempo.Type = datastore.MultiChannelClientTypeHTTP
ds.Values.Tempo.Http = &datastore.HttpClientConfig{}
deepcopy.DeepCopy(r.Tempo.Http, &ds.Values.Tempo.Http)
}
Expand Down
60 changes: 60 additions & 0 deletions agent/workers/poller/inmemory_datastore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package poller

import (
"context"

"github.com/kubeshop/tracetest/agent/collector"
"github.com/kubeshop/tracetest/agent/workers/datastores/connection"
"github.com/kubeshop/tracetest/server/pkg/id"
"github.com/kubeshop/tracetest/server/tracedb"
"github.com/kubeshop/tracetest/server/traces"
"go.opentelemetry.io/otel/trace"
)

func NewInMemoryDatastore(cache collector.TraceCache) tracedb.TraceDB {
return &inmemoryDatastore{cache}
}

type inmemoryDatastore struct {
cache collector.TraceCache
}

// Close implements tracedb.TraceDB.
func (d *inmemoryDatastore) Close() error {
return nil
}

// Connect implements tracedb.TraceDB.
func (d *inmemoryDatastore) Connect(ctx context.Context) error {
return nil
}

// GetEndpoints implements tracedb.TraceDB.
func (d *inmemoryDatastore) GetEndpoints() string {
return ""
}

// GetTraceByID implements tracedb.TraceDB.
func (d *inmemoryDatastore) GetTraceByID(ctx context.Context, traceID string) (traces.Trace, error) {
spans, found := d.cache.Get(traceID)
if !found || len(spans) == 0 {
return traces.Trace{}, connection.ErrTraceNotFound
}

return traces.FromSpanList(spans), nil
}

// GetTraceID implements tracedb.TraceDB.
func (d *inmemoryDatastore) GetTraceID() trace.TraceID {
return id.NewRandGenerator().TraceID()
}

// Ready implements tracedb.TraceDB.
func (d *inmemoryDatastore) Ready() bool {
return true
}

// ShouldRetry implements tracedb.TraceDB.
func (d *inmemoryDatastore) ShouldRetry() bool {
return true
}
66 changes: 65 additions & 1 deletion agent/workers/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/client/mocks"
"github.com/kubeshop/tracetest/agent/collector"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/workers"
"github.com/kubeshop/tracetest/agent/workers/poller"
"github.com/kubeshop/tracetest/server/pkg/id"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
)

func TestPollerWorker(t *testing.T) {
t.Skip("this test needs rework")
ctx := context.Background()
controlPlane := mocks.NewGrpcServer()

Expand Down Expand Up @@ -113,3 +116,64 @@ func createTempoFakeApi() *httptest.Server {
w.WriteHeader(http.StatusOK)
}))
}

func TestPollerWorkerWithInmemoryDatastore(t *testing.T) {
ctx := context.Background()
controlPlane := mocks.NewGrpcServer()

client, err := client.Connect(ctx, controlPlane.Addr())
require.NoError(t, err)

cache := collector.NewTraceCache()

pollerWorker := workers.NewPollerWorker(client, workers.WithInMemoryDatastore(
poller.NewInMemoryDatastore(cache),
))

client.OnPollingRequest(func(ctx context.Context, pr *proto.PollingRequest) error {
return pollerWorker.Poll(ctx, pr)
})

err = client.Start(ctx)
require.NoError(t, err)

traceID := id.NewRandGenerator().TraceID()
pollingRequest := proto.PollingRequest{
TestID: "test",
RunID: 1,
TraceID: traceID.String(),
Datastore: &proto.DataStore{
Type: "datadog",
},
}

controlPlane.SendPollingRequest(&pollingRequest)

time.Sleep(1 * time.Second)

// expect traces to not be sent to endpoint
pollingResponse := controlPlane.GetLastPollingResponse()
require.NotNil(t, pollingResponse, "agent did not send polling response back to server")

assert.False(t, pollingResponse.TraceFound)
assert.Len(t, pollingResponse.Spans, 0)

span1ID := id.NewRandGenerator().SpanID()
span2ID := id.NewRandGenerator().SpanID()

cache.Set(traceID.String(), []*v1.Span{
{Name: "span 1", ParentSpanId: nil, SpanId: span1ID[:], TraceId: traceID[:]},
{Name: "span 2", ParentSpanId: span1ID[:], SpanId: span2ID[:], TraceId: traceID[:]},
})

controlPlane.SendPollingRequest(&pollingRequest)

time.Sleep(1 * time.Second)

// expect traces to be sent to endpoint
pollingResponse = controlPlane.GetLastPollingResponse()
require.NotNil(t, pollingResponse, "agent did not send polling response back to server")

assert.True(t, pollingResponse.TraceFound)
assert.Len(t, pollingResponse.Spans, 2)
}

0 comments on commit 4349562

Please sign in to comment.