Skip to content

Commit

Permalink
Propagate context on dependencies endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Yoav Eyal <yoave23@gmail.com>
  • Loading branch information
yoave23 committed Aug 31, 2020
1 parent af985ae commit 614701d
Show file tree
Hide file tree
Showing 23 changed files with 56 additions and 46 deletions.
Expand Up @@ -82,11 +82,11 @@ func (r *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
}

// GetDependencies implements dependencystore.Reader
func (r *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
searchBody := getSearchBody(endTs, lookback)

indices := dailyIndices(r.indexPrefix, endTs, lookback)
response, err := r.client.Search(context.Background(), searchBody, defaultDocCount, indices...)
response, err := r.client.Search(ctx, searchBody, defaultDocCount, indices...)
if err != nil {
return nil, err
}
Expand Down
Expand Up @@ -86,7 +86,7 @@ func TestGetDependencies(t *testing.T) {
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
dependencies, err := store.GetDependencies(tsNow, time.Hour)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.NoError(t, err)
assert.Equal(t, timeDependencies, dbmodel.TimeDependencies{
Timestamp: tsNow,
Expand All @@ -108,7 +108,7 @@ func TestGetDependencies_err_unmarshall(t *testing.T) {
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
dependencies, err := store.GetDependencies(tsNow, time.Hour)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Contains(t, err.Error(), "invalid character")
assert.Nil(t, dependencies)
}
Expand All @@ -120,7 +120,7 @@ func TestGetDependencies_err_client(t *testing.T) {
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
tsNow := time.Now()
dependencies, err := store.GetDependencies(tsNow, time.Hour)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Error(t, err)
assert.Nil(t, dependencies)
assert.Contains(t, err.Error(), searchErr.Error())
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/grpc_handler.go
Expand Up @@ -165,7 +165,7 @@ func (g *GRPCHandler) GetOperations(
func (g *GRPCHandler) GetDependencies(ctx context.Context, r *api_v2.GetDependenciesRequest) (*api_v2.GetDependenciesResponse, error) {
startTime := r.StartTime
endTime := r.EndTime
dependencies, err := g.queryService.GetDependencies(startTime, endTime.Sub(startTime))
dependencies, err := g.queryService.GetDependencies(ctx, startTime, endTime.Sub(startTime))
if err != nil {
g.logger.Error("failed to fetch dependencies", zap.Error(err))
return nil, status.Errorf(codes.Internal, "failed to fetch dependencies: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/http_handler.go
Expand Up @@ -285,7 +285,7 @@ func (aH *APIHandler) dependencies(w http.ResponseWriter, r *http.Request) {
}
endTs := time.Unix(0, 0).Add(time.Duration(endTsMillis) * time.Millisecond)

dependencies, err := aH.queryService.GetDependencies(endTs, lookback)
dependencies, err := aH.queryService.GetDependencies(r.Context(), endTs, lookback)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/query_service.go
Expand Up @@ -121,8 +121,8 @@ func (qs QueryService) Adjust(trace *model.Trace) (*model.Trace, error) {
}

// GetDependencies implements dependencystore.Reader.GetDependencies
func (qs QueryService) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
return qs.dependencyReader.GetDependencies(endTs, lookback)
func (qs QueryService) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
return qs.dependencyReader.GetDependencies(ctx, endTs, lookback)
}

// InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them.
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/query_service_test.go
Expand Up @@ -259,7 +259,7 @@ func TestGetDependencies(t *testing.T) {
endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier)
depsMock.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(expectedDependencies, nil).Times(1)

actualDependencies, err := qs.GetDependencies(time.Unix(0, 1476374248550*millisToNanosMultiplier), defaultDependencyLookbackDuration)
actualDependencies, err := qs.GetDependencies(context.Background(), time.Unix(0, 1476374248550*millisToNanosMultiplier), defaultDependencyLookbackDuration)
assert.NoError(t, err)
assert.Equal(t, expectedDependencies, actualDependencies)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/ui/placeholder/gen_assets.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion plugin/storage/badger/dependencystore/storage.go
Expand Up @@ -35,7 +35,7 @@ func NewDependencyStore(store spanstore.Reader) *DependencyStore {
}

// GetDependencies returns all interservice dependencies, implements DependencyReader
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
deps := map[string]*model.DependencyLink{}

params := &spanstore.TraceQueryParameters{
Expand Down
5 changes: 3 additions & 2 deletions plugin/storage/badger/dependencystore/storage_test.go
Expand Up @@ -15,6 +15,7 @@
package dependencystore_test

import (
"context"
"fmt"
"io"
"testing"
Expand Down Expand Up @@ -66,7 +67,7 @@ func runFactoryTest(tb testing.TB, test func(tb testing.TB, sw spanstore.Writer,
func TestDependencyReader(t *testing.T) {
runFactoryTest(t, func(tb testing.TB, sw spanstore.Writer, dr dependencystore.Reader) {
tid := time.Now()
links, err := dr.GetDependencies(tid, time.Hour)
links, err := dr.GetDependencies(context.Background(), tid, time.Hour)
assert.NoError(t, err)
assert.Empty(t, links)

Expand Down Expand Up @@ -94,7 +95,7 @@ func TestDependencyReader(t *testing.T) {
assert.NoError(t, err)
}
}
links, err = dr.GetDependencies(time.Now(), time.Hour)
links, err = dr.GetDependencies(context.Background(), time.Now(), time.Hour)
assert.NoError(t, err)
assert.NotEmpty(t, links)
assert.Equal(t, spans-1, len(links)) // First span does not create a dependency
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/cassandra/dependencystore/storage.go
Expand Up @@ -16,6 +16,7 @@
package dependencystore

import (
"context"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -106,7 +107,7 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
startTs := endTs.Add(-1 * lookback)
var query cassandra.Query
switch s.version {
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/cassandra/dependencystore/storage_test.go
Expand Up @@ -16,6 +16,7 @@
package dependencystore

import (
"context"
"errors"
"strings"
"testing"
Expand Down Expand Up @@ -225,7 +226,7 @@ func TestDependencyStoreGetDependencies(t *testing.T) {

s.session.On("Query", mock.AnythingOfType("string"), matchEverything()).Return(query)

deps, err := s.storage.GetDependencies(time.Now(), 48*time.Hour)
deps, err := s.storage.GetDependencies(context.Background(), time.Now(), 48*time.Hour)

if testCase.expectedError == "" {
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/es/dependencystore/storage.go
Expand Up @@ -81,13 +81,13 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := getIndices(s.indexPrefix, endTs, lookback)
searchResult, err := s.client.Search(indices...).
Size(10000). // the default elasticsearch allowed limit
Query(buildTSQuery(endTs, lookback)).
IgnoreUnavailable(true).
Do(s.ctx)
Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to search for dependencies: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/es/dependencystore/storage_test.go
Expand Up @@ -16,6 +16,7 @@
package dependencystore

import (
"context"
"encoding/json"
"errors"
"strings"
Expand Down Expand Up @@ -171,7 +172,7 @@ func TestGetDependencies(t *testing.T) {
searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService)
searchService.On("Do", mock.Anything).Return(testCase.searchResult, testCase.searchError)

actual, err := r.storage.GetDependencies(fixedTime, 24*time.Hour)
actual, err := r.storage.GetDependencies(context.Background(), fixedTime, 24*time.Hour)
if testCase.expectedError != "" {
assert.EqualError(t, err, testCase.expectedError)
assert.Nil(t, actual)
Expand Down
14 changes: 7 additions & 7 deletions plugin/storage/es/mappings/gen_assets.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions plugin/storage/grpc/shared/grpc_client.go
Expand Up @@ -206,8 +206,8 @@ func (c *grpcClient) WriteSpan(span *model.Span) error {
}

// GetDependencies returns all interservice dependencies
func (c *grpcClient) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
resp, err := c.depsReaderClient.GetDependencies(context.Background(), &storage_v1.GetDependenciesRequest{
func (c *grpcClient) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
resp, err := c.depsReaderClient.GetDependencies(ctx, &storage_v1.GetDependenciesRequest{
EndTime: endTs,
StartTime: endTs.Add(-lookback),
})
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/grpc/shared/grpc_client_test.go
Expand Up @@ -297,7 +297,7 @@ func TestGRPCClientGetDependencies(t *testing.T) {
EndTime: end,
}).Return(&storage_v1.GetDependenciesResponse{Dependencies: deps}, nil)

s, err := r.client.GetDependencies(end, lookback)
s, err := r.client.GetDependencies(context.Background(), end, lookback)
assert.NoError(t, err)
assert.Equal(t, deps, s)
})
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/grpc/shared/grpc_server.go
Expand Up @@ -32,7 +32,7 @@ type grpcServer struct {

// GetDependencies returns all interservice dependencies
func (s *grpcServer) GetDependencies(ctx context.Context, r *storage_v1.GetDependenciesRequest) (*storage_v1.GetDependenciesResponse, error) {
deps, err := s.Impl.DependencyReader().GetDependencies(r.EndTime, r.EndTime.Sub(r.StartTime))
deps, err := s.Impl.DependencyReader().GetDependencies(ctx, r.EndTime, r.EndTime.Sub(r.StartTime))
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/integration/cassandra_test.go
Expand Up @@ -16,6 +16,7 @@
package integration

import (
"context"
"errors"
"os"
"testing"
Expand Down Expand Up @@ -134,7 +135,7 @@ func (s *StorageIntegration) testCassandraGetDependencies(t *testing.T) {
}
require.NoError(t, s.DependencyWriter.WriteDependencies(time.Now(), expected))
s.refresh(t)
actual, err := s.DependencyReader.GetDependencies(time.Now(), 5*time.Minute)
actual, err := s.DependencyReader.GetDependencies(context.Background(), time.Now(), 5*time.Minute)
assert.NoError(t, err)
assert.EqualValues(t, expected, actual)
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/integration.go
Expand Up @@ -364,7 +364,7 @@ func (s *StorageIntegration) testGetDependencies(t *testing.T) {
}
require.NoError(t, s.DependencyWriter.WriteDependencies(time.Now(), expected))
s.refresh(t)
actual, err := s.DependencyReader.GetDependencies(time.Now(), 5*time.Minute)
actual, err := s.DependencyReader.GetDependencies(context.Background(), time.Now(), 5*time.Minute)
assert.NoError(t, err)
assert.EqualValues(t, expected, actual)
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/memory/memory.go
Expand Up @@ -58,7 +58,7 @@ func WithConfiguration(configuration config.Configuration) *Store {
}

// GetDependencies returns dependencies between services
func (m *Store) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (m *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
// deduper used below can modify the spans, so we take an exclusive lock
m.Lock()
defer m.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/memory/memory_test.go
Expand Up @@ -139,7 +139,7 @@ func withMemoryStore(f func(store *Store)) {

func TestStoreGetEmptyDependencies(t *testing.T) {
withMemoryStore(func(store *Store) {
links, err := store.GetDependencies(time.Now(), time.Hour)
links, err := store.GetDependencies(context.Background(), time.Now(), time.Hour)
assert.NoError(t, err)
assert.Empty(t, links)
})
Expand All @@ -151,11 +151,11 @@ func TestStoreGetDependencies(t *testing.T) {
assert.NoError(t, store.WriteSpan(childSpan1))
assert.NoError(t, store.WriteSpan(childSpan2))
assert.NoError(t, store.WriteSpan(childSpan2_1))
links, err := store.GetDependencies(time.Now(), time.Hour)
links, err := store.GetDependencies(context.Background(), time.Now(), time.Hour)
assert.NoError(t, err)
assert.Empty(t, links)

links, err = store.GetDependencies(time.Unix(0, 0).Add(time.Hour), time.Hour)
links, err = store.GetDependencies(context.Background(), time.Unix(0, 0).Add(time.Hour), time.Hour)
assert.NoError(t, err)
assert.Equal(t, []model.DependencyLink{{
Parent: "serviceName",
Expand Down
3 changes: 2 additions & 1 deletion storage/dependencystore/interface.go
Expand Up @@ -16,6 +16,7 @@
package dependencystore

import (
"context"
"time"

"github.com/jaegertracing/jaeger/model"
Expand All @@ -28,5 +29,5 @@ type Writer interface {

// Reader can load service dependencies from storage.
type Reader interface {
GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error)
GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error)
}

0 comments on commit 614701d

Please sign in to comment.