Skip to content

Commit

Permalink
remove cache and close datastore after poll
Browse files Browse the repository at this point in the history
  • Loading branch information
mathnogueira committed May 2, 2024
1 parent 44e081f commit d49f603
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 35 deletions.
35 changes: 0 additions & 35 deletions agent/tracedb/jaegerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"
"strings"
"sync"

pb "github.com/kubeshop/tracetest/agent/internal/proto-gen-go/api_v3"
"github.com/kubeshop/tracetest/agent/tracedb/connection"
Expand All @@ -23,43 +22,12 @@ func jaegerDefaultPorts() []string {
return []string{"16685"}
}

var (
connectionPool map[string]TraceDB = make(map[string]TraceDB)
connectionPoolMutex sync.Mutex
)

func getConnectionFromCache(grpcConfig *datastore.GRPCClientSettings) (TraceDB, bool) {
connectionPoolMutex.Lock()
defer connectionPoolMutex.Unlock()

connection, ok := connectionPool[grpcConfig.Endpoint]
return connection, ok
}

func saveConnectionToCache(grpcConfig *datastore.GRPCClientSettings, traceDB TraceDB) {
connectionPoolMutex.Lock()
defer connectionPoolMutex.Unlock()

connectionPool[grpcConfig.Endpoint] = traceDB
}

func invalidateConnectionCache(endpoint string) {
connectionPoolMutex.Lock()
defer connectionPoolMutex.Unlock()

delete(connectionPool, endpoint)
}

type jaegerTraceDB struct {
realTraceDB
dataSource datasource.DataSource
}

func newJaegerDB(grpcConfig *datastore.GRPCClientSettings) (TraceDB, error) {
if connection, ok := getConnectionFromCache(grpcConfig); ok {
return connection, nil
}

baseConfig := &datastore.MultiChannelClientConfig{
Type: datastore.MultiChannelClientTypeGRPC,
Grpc: grpcConfig,
Expand All @@ -73,8 +41,6 @@ func newJaegerDB(grpcConfig *datastore.GRPCClientSettings) (TraceDB, error) {
dataSource: dataSource,
}

saveConnectionToCache(grpcConfig, traceDB)

return traceDB, nil
}

Expand Down Expand Up @@ -114,7 +80,6 @@ func (jtd *jaegerTraceDB) Ready() bool {
}

func (jtd *jaegerTraceDB) Close() error {
invalidateConnectionCache(jtd.dataSource.Endpoint())
return jtd.dataSource.Close()
}

Expand Down
1 change: 1 addition & 0 deletions agent/workers/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (w *PollerWorker) poll(ctx context.Context, request *proto.PollingRequest)

dsFactory := tracedb.Factory(nil)
ds, err := dsFactory(*datastoreConfig)
defer ds.Close()
if err != nil {
w.logger.Error("Invalid datastore", zap.Error(err))
log.Printf("Invalid datastore: %s", err.Error())
Expand Down

0 comments on commit d49f603

Please sign in to comment.