Skip to content

Commit

Permalink
fix(server): fix issues when running tests (#3336)
Browse files Browse the repository at this point in the history
  • Loading branch information
schoren committed Nov 6, 2023
1 parent 6bb9e63 commit e4dd4a0
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 122 deletions.
3 changes: 1 addition & 2 deletions go.mod
Expand Up @@ -45,7 +45,6 @@ require (
github.com/ohler55/ojg v1.14.4
github.com/opensearch-project/opensearch-go v1.1.0
github.com/orlangure/gnomock v0.20.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/prometheus v1.8.2-0.20211217191541-41f1a8125e66
github.com/pterm/pterm v0.12.69
Expand All @@ -55,6 +54,7 @@ require (
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.4
github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569
gitlab.com/metakeule/fmtdate v1.2.2
go.opencensus.io v0.24.0
go.opentelemetry.io/collector/component v0.80.0
go.opentelemetry.io/collector/config/configcompression v0.80.0
Expand Down Expand Up @@ -173,7 +173,6 @@ require (
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
gitlab.com/metakeule/fmtdate v1.2.2 // indirect
go.opentelemetry.io/collector v0.80.0 // indirect
go.opentelemetry.io/collector/config/configauth v0.80.0 // indirect
go.opentelemetry.io/collector/config/confignet v0.80.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Expand Up @@ -1519,8 +1519,6 @@ github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnh
github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down
2 changes: 1 addition & 1 deletion server/app/app.go
Expand Up @@ -228,7 +228,7 @@ func (app *App) Start(opts ...appOption) error {
variableSetRepo := variableset.NewRepository(db)
linterRepo := analyzer.NewRepository(db)
testRepo := test.NewRepository(db)
runRepo := test.NewRunRepository(db, test.NewCache(instanceID, test.WithMetricMeter(meter)))
runRepo := test.NewRunRepository(db)
testRunnerRepo := testrunner.NewRepository(db)
tracesRepo := traces.NewTraceRepository(db)

Expand Down
2 changes: 1 addition & 1 deletion server/executor/test_suite_runner_test.go
Expand Up @@ -116,7 +116,7 @@ func runTestSuiteRunnerTest(t *testing.T, withErrors bool, assert func(t *testin

subscriptionManager := subscription.NewManager()
testRepo := test.NewRepository(rawDB)
runRepo := test.NewRunRepository(rawDB, test.NewCache("test"))
runRepo := test.NewRunRepository(rawDB)

testRunner := &fakeTestRunner{
runRepo,
Expand Down
23 changes: 16 additions & 7 deletions server/http/mappings/tests.go
Expand Up @@ -460,13 +460,22 @@ func (m Model) Trigger(in openapi.Trigger) trigger.Trigger {

func (m Model) TriggerResult(in openapi.TriggerResult) trigger.TriggerResult {

return trigger.TriggerResult{
Type: trigger.TriggerType(in.Type),
HTTP: m.HTTPResponse(in.TriggerResult.Http),
GRPC: m.GRPCResponse(in.TriggerResult.Grpc),
TraceID: m.TraceIDResponse(in.TriggerResult.Traceid),
Error: m.TriggerError(in.TriggerResult.Error),
}
tr := trigger.TriggerResult{
Type: trigger.TriggerType(in.Type),
Error: m.TriggerError(in.TriggerResult.Error),
}
switch in.Type {
case "http":
tr.HTTP = m.HTTPResponse(in.TriggerResult.Http)
case "grpc":
tr.GRPC = m.GRPCResponse(in.TriggerResult.Grpc)
case "traceid":
tr.TraceID = m.TraceIDResponse(in.TriggerResult.Traceid)
case "kafka":
tr.Kafka = m.KafkaResponse(in.TriggerResult.Kafka)
}

return tr
}

func (m Model) TriggerError(in openapi.TriggerError) *trigger.TriggerError {
Expand Down
104 changes: 5 additions & 99 deletions server/test/run_repository.go
Expand Up @@ -5,18 +5,13 @@ import (
"database/sql"
"encoding/json"
"fmt"
"log"
"strconv"
"time"

"github.com/kubeshop/tracetest/server/executor/testrunner"
"github.com/kubeshop/tracetest/server/pkg/id"
"github.com/kubeshop/tracetest/server/pkg/sqlutil"
"github.com/kubeshop/tracetest/server/variableset"
"github.com/patrickmn/go-cache"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -33,94 +28,13 @@ type RunRepository interface {
GetTestSuiteRunSteps(_ context.Context, _ id.ID, runID int) ([]Run, error)
}

type Cache struct {
cache *cache.Cache
instanceID string
cacheLatencyHistogram metric.Int64Histogram
}

type CacheConfig struct {
meter metric.Meter
}

type CacheOption func(*CacheConfig)

func WithMetricMeter(meter metric.Meter) CacheOption {
return func(c *CacheConfig) {
c.meter = meter
}
}

func NewCache(instanceID string, opts ...CacheOption) *Cache {
cacheConfig := &CacheConfig{
meter: noop.NewMeterProvider().Meter("noop"),
}

for _, opt := range opts {
opt(cacheConfig)
}

cacheLatencyHistogram, _ := cacheConfig.meter.Int64Histogram("tracetest.cache.latency")

cache := &Cache{
cache: cache.New(5*time.Minute, 10*time.Minute),
instanceID: instanceID,
cacheLatencyHistogram: cacheLatencyHistogram,
}

return cache
}

func (c *Cache) thisInstanceCacheKey(run Run) string {
return fmt.Sprintf("%s-%s-%d", c.instanceID, run.TestID, run.ID)
}

func (c *Cache) lastInstanceCacheKey(ctx context.Context, run Run) string {
instanceID := c.instanceID
if instanceID := ctx.Value("LastInstanceID"); instanceID != nil {
instanceID = instanceID.(string)
}

return fmt.Sprintf("%s-%s-%d", instanceID, run.TestID, run.ID)
}

func (c *Cache) Set(_ context.Context, run Run) error {
log.Printf("testRunCache Update %s %d", run.TestID, run.ID)
c.cache.Set(c.thisInstanceCacheKey(run), run, cache.DefaultExpiration)
return nil
}

func (c *Cache) Get(ctx context.Context, testID id.ID, runID int) (Run, bool) {
begin := time.Now()
key := c.lastInstanceCacheKey(ctx, Run{ID: runID, TestID: testID})
cached, found := c.cache.Get(key)

duration := time.Since(begin)

attributeSet := attribute.NewSet(
attribute.String("object_type", "test_run"),
attribute.String("key", key),
attribute.Bool("hit", found),
)

c.cacheLatencyHistogram.Record(ctx, duration.Milliseconds(), metric.WithAttributeSet(attributeSet))

if !found {
return Run{}, false
}

return cached.(Run), true
}

type runRepository struct {
db *sql.DB
cache *Cache
db *sql.DB
}

func NewRunRepository(db *sql.DB, cache *Cache) RunRepository {
func NewRunRepository(db *sql.DB) RunRepository {
return &runRepository{
db: db,
cache: cache,
db: db,
}
}

Expand Down Expand Up @@ -417,8 +331,6 @@ func (r *runRepository) UpdateRun(ctx context.Context, run Run) error {
return fmt.Errorf("sql exec: %w", err)
}

r.cache.Set(ctx, run)

return nil
}

Expand Down Expand Up @@ -501,19 +413,13 @@ var (
)

func (r *runRepository) GetRun(ctx context.Context, testID id.ID, runID int) (Run, error) {
cached, found := r.cache.Get(ctx, testID, runID)
if found {
return cached, nil
}

query, params := sqlutil.TenantWithPrefix(ctx, selectRunQuery+" WHERE id = $1 AND test_id = $2", "test_runs.", runID, testID)

run, err := readRunRow(r.db.QueryRowContext(ctx, query, params...))
if err != nil {
return Run{}, fmt.Errorf("cannot read row: %w", err)
}

r.cache.Set(ctx, run)
return run, nil
}

Expand Down Expand Up @@ -665,7 +571,7 @@ func readRunRow(row scanner) (Run, error) {
return Run{}, fmt.Errorf("cannot parse Results: %w", err)
}

if jsonTrace != nil {
if jsonTrace != nil && string(jsonTrace) != "null" {
err = json.Unmarshal(jsonTrace, &r.Trace)
if err != nil {
return Run{}, fmt.Errorf("cannot parse Trace: %w", err)
Expand Down Expand Up @@ -749,7 +655,7 @@ func (r *runRepository) GetTestSuiteRunSteps(ctx context.Context, id id.ID, runI
WHERE test_suite_run_steps.test_suite_run_id = $1 AND test_suite_run_steps.test_suite_run_test_suite_id = $2
`
query, params := sqlutil.TenantWithPrefix(ctx, query, "test_runs.", strconv.Itoa(runID), id)
query += ` ORDER BY test_runs.completed_at ASC`
query += ` ORDER BY test_runs.created_at ASC`

stmt, err := r.db.Prepare(query)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions server/test/test_repository_test.go
Expand Up @@ -88,7 +88,7 @@ func registerManagerFn(router *mux.Router, db *sql.DB) resourcemanager.Manager {
func getScenarioPreparation(sample, secondSample, thirdSample test.Test) func(t *testing.T, op rmtest.Operation, manager resourcemanager.Manager) {
return func(t *testing.T, op rmtest.Operation, manager resourcemanager.Manager) {
testRepo := manager.Handler().(test.Repository)
testRunRepo := test.NewRunRepository(testRepo.DB(), test.NewCache("test"))
testRunRepo := test.NewRunRepository(testRepo.DB())

switch op {
case rmtest.OperationGetSuccess,
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestIfDeleteTestsCascadeDeletes(t *testing.T) {
defer db.Close()

testRepository := test.NewRepository(db)
runRepository := test.NewRunRepository(db, test.NewCache("test"))
runRepository := test.NewRunRepository(db)
transactionRepository := testsuite.NewRepository(db, testRepository)
transactionRunRepository := testsuite.NewRunRepository(db, runRepository)

Expand Down
2 changes: 1 addition & 1 deletion server/testdb/test_run_event_test.go
Expand Up @@ -17,7 +17,7 @@ func TestRunEvents(t *testing.T) {
defer rawDB.Close()

testRepo := test.NewRepository(rawDB)
testRunRepo := test.NewRunRepository(rawDB, test.NewCache("test"))
testRunRepo := test.NewRunRepository(rawDB)

test1 := createTestWithName(t, testRepo, "test 1")

Expand Down
6 changes: 3 additions & 3 deletions server/testsuite/testsuite_repository_test.go
Expand Up @@ -63,7 +63,7 @@ func createRun(runRepository test.RunRepository, t test.Test) test.Run {

func setupTestSuiteFixture(t *testing.T, db *sql.DB) testSuiteFixture {
testsDB := test.NewRepository(db)
runDB := test.NewRunRepository(db, test.NewCache("test"))
runDB := test.NewRunRepository(db)

fixture := testSuiteFixture{}

Expand Down Expand Up @@ -142,7 +142,7 @@ func TestDeleteTestsRelatedToTestSuite(t *testing.T) {
defer db.Close()

testRepository := test.NewRepository(db)
runRepository := test.NewRunRepository(db, test.NewCache("test"))
runRepository := test.NewRunRepository(db)
testSuiteRepo := testsuite.NewRepository(db, testRepository)
testSuiteRunRepo := testsuite.NewRunRepository(db, runRepository)

Expand Down Expand Up @@ -208,7 +208,7 @@ func TestTestSuites(t *testing.T) {
},
Prepare: func(t *testing.T, op rmtests.Operation, manager resourcemanager.Manager) {
transactionRepo := manager.Handler().(*testsuite.Repository)
runRepository := test.NewRunRepository(transactionRepo.DB(), test.NewCache("test"))
runRepository := test.NewRunRepository(transactionRepo.DB())
runRepo := testsuite.NewRunRepository(transactionRepo.DB(), runRepository)

switch op {
Expand Down
2 changes: 1 addition & 1 deletion server/testsuite/testsuite_run_repository_test.go
Expand Up @@ -37,7 +37,7 @@ func getRepos() (*testsuite.Repository, *testsuite.RunRepository, test.Repositor
db := testmock.CreateMigratedDatabase()

testRepo := test.NewRepository(db)
testRunRepo := test.NewRunRepository(db, test.NewCache("test"))
testRunRepo := test.NewRunRepository(db)

transactionRepo := testsuite.NewRepository(db, testRepo)
runRepo := testsuite.NewRunRepository(db, testRunRepo)
Expand Down
19 changes: 16 additions & 3 deletions server/traces/span_entitiess.go
Expand Up @@ -112,6 +112,15 @@ type encodedSpan struct {
Children []encodedSpan
}

const nilSpanID = "0000000000000000"

func (es encodedSpan) isValidID() bool {
if es.ID == nilSpanID || es.ID == "" {
return false
}
return true
}

func (s Span) IsZero() bool {
return !s.ID.IsValid()
}
Expand Down Expand Up @@ -150,9 +159,13 @@ func (s *Span) UnmarshalJSON(data []byte) error {
}

func (s *Span) decodeSpan(aux encodedSpan) error {
sid, err := trace.SpanIDFromHex(aux.ID)
if err != nil {
return fmt.Errorf("unmarshal span: %w", err)
sid := trace.SpanID{}
if aux.isValidID() {
var err error
sid, err = trace.SpanIDFromHex(aux.ID)
if err != nil {
return fmt.Errorf("unmarshal span: %w", err)
}
}

children, err := decodeChildren(s, aux.Children, getCache())
Expand Down

0 comments on commit e4dd4a0

Please sign in to comment.