Skip to content

Commit

Permalink
refactor reader to have split schema
Browse files Browse the repository at this point in the history
  • Loading branch information
mh-park committed Jul 11, 2017
1 parent db202fc commit 545c3c3
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 25 deletions.
19 changes: 10 additions & 9 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import (

const (
serviceName = "serviceName"
indexPrefix = "jaeger-"
spanIndexPrefix = "jaeger-span-"
serviceIndexPrefix = "jaeger-service-"
operationsAggregation = "distinct_operations"
servicesAggregation = "distinct_services"
traceIDAggregation = "traceIDs"
Expand Down Expand Up @@ -110,7 +111,7 @@ func (s *SpanReader) readTrace(traceID string, traceQuery *spanstore.TraceQueryP

traceQuery.StartTimeMax = traceQuery.StartTimeMax.Add(time.Hour)
traceQuery.StartTimeMin = traceQuery.StartTimeMin.Add(-time.Hour)
indices := s.findIndices(traceQuery)
indices := s.findIndices(spanIndexPrefix, traceQuery)
esSpansRaw, err := s.executeQuery(query, indices...)
if err != nil {
return nil, errors.Wrap(err, "Query execution failed")
Expand Down Expand Up @@ -165,7 +166,7 @@ func (s *SpanReader) unmarshallJSONSpan(esSpanRaw *elastic.SearchHit) (*jModel.S
}

// Returns the array of indices that we need to query, based on query params
func (s *SpanReader) findIndices(traceQuery *spanstore.TraceQueryParameters) []string {
func (s *SpanReader) findIndices(typeOfIndex string, traceQuery *spanstore.TraceQueryParameters) []string {
today := time.Now()
threeDaysAgo := today.AddDate(0, 0, -3) // TODO: make this configurable

Expand All @@ -177,7 +178,7 @@ func (s *SpanReader) findIndices(traceQuery *spanstore.TraceQueryParameters) []s
var indices []string
current := traceQuery.StartTimeMax
for current.After(traceQuery.StartTimeMin) && current.After(threeDaysAgo) {
index := IndexWithDate(current)
index := indexWithDate(typeOfIndex, current)
exists, _ := s.client.IndexExists(index).Do(s.ctx) // Don't care about error, if it's an error, exists will be false anyway
if exists {
indices = append(indices, index)
Expand All @@ -188,15 +189,15 @@ func (s *SpanReader) findIndices(traceQuery *spanstore.TraceQueryParameters) []s
}

// IndexWithDate returns the index name formatted to date.
func IndexWithDate(date time.Time) string {
return indexPrefix + date.Format("2006-01-02")
func indexWithDate(typeOfIndex string, date time.Time) string {
return typeOfIndex + date.Format("2006-01-02")
}

// GetServices returns all services traced by Jaeger, ordered by frequency
func (s *SpanReader) GetServices() ([]string, error) {
serviceAggregation := s.getServicesAggregation()

jaegerIndices := s.findIndices(&spanstore.TraceQueryParameters{})
jaegerIndices := s.findIndices(serviceIndexPrefix, &spanstore.TraceQueryParameters{})

searchService := s.client.Search(jaegerIndices...).
Type(serviceType).
Expand Down Expand Up @@ -226,7 +227,7 @@ func (s *SpanReader) getServicesAggregation() elastic.Query {
func (s *SpanReader) GetOperations(service string) ([]string, error) {
serviceQuery := elastic.NewTermQuery(serviceName, service)
serviceFilter := s.getOperationsAggregation()
jaegerIndices := s.findIndices(&spanstore.TraceQueryParameters{})
jaegerIndices := s.findIndices(serviceIndexPrefix, &spanstore.TraceQueryParameters{})

searchService := s.client.Search(jaegerIndices...).
Type(serviceType).
Expand Down Expand Up @@ -359,7 +360,7 @@ func (s *SpanReader) findTraceIDs(traceQuery *spanstore.TraceQueryParameters) ([
aggregation := s.buildTraceIDAggregation(traceQuery.NumTraces)
boolQuery := s.buildFindTraceIDsQuery(traceQuery)

jaegerIndices := s.findIndices(traceQuery)
jaegerIndices := s.findIndices(spanIndexPrefix, traceQuery)

searchService := s.client.Search(jaegerIndices...).
Type(spanType).
Expand Down
40 changes: 24 additions & 16 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,16 +280,16 @@ func TestSpanReader_findIndicesEmptyQuery(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
mockExistsService(r)

actual := r.reader.findIndices(&spanstore.TraceQueryParameters{})
actual := r.reader.findIndices(spanIndexPrefix, &spanstore.TraceQueryParameters{})

today := time.Now()
yesterday := today.AddDate(0, 0, -1)
twoDaysAgo := today.AddDate(0, 0, -2)

expected := []string{
IndexWithDate(today),
IndexWithDate(yesterday),
IndexWithDate(twoDaysAgo),
indexWithDate(spanIndexPrefix, today),
indexWithDate(spanIndexPrefix, yesterday),
indexWithDate(spanIndexPrefix, twoDaysAgo),
}

assert.EqualValues(t, expected, actual)
Expand All @@ -301,10 +301,13 @@ func TestSpanReader_findIndicesNoIndices(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
mockExistsService(r)

actual := r.reader.findIndices(&spanstore.TraceQueryParameters{
StartTimeMin: time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC),
StartTimeMax: time.Date(2017, time.April, 21, 4, 21, 19, 95, time.UTC),
})
actual := r.reader.findIndices(
spanIndexPrefix,
&spanstore.TraceQueryParameters{
StartTimeMin: time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC),
StartTimeMax: time.Date(2017, time.April, 21, 4, 21, 19, 95, time.UTC),
},
)

var expected []string

Expand All @@ -318,17 +321,20 @@ func TestSpanReader_findIndicesOnlyRecent(t *testing.T) {

today := time.Now()

actual := r.reader.findIndices(&spanstore.TraceQueryParameters{
StartTimeMin: today.AddDate(0, 0, -7),
StartTimeMax: today.AddDate(0, 0, -1),
})
actual := r.reader.findIndices(
spanIndexPrefix,
&spanstore.TraceQueryParameters{
StartTimeMin: today.AddDate(0, 0, -7),
StartTimeMax: today.AddDate(0, 0, -1),
},
)

yesterday := today.AddDate(0, 0, -1)
twoDaysAgo := today.AddDate(0, 0, -2)

expected := []string{
IndexWithDate(yesterday),
IndexWithDate(twoDaysAgo),
indexWithDate(spanIndexPrefix, yesterday),
indexWithDate(spanIndexPrefix, twoDaysAgo),
}

assert.EqualValues(t, expected, actual)
Expand All @@ -337,8 +343,10 @@ func TestSpanReader_findIndicesOnlyRecent(t *testing.T) {

func TestSpanReader_indexWithDate(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
actual := IndexWithDate(time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC))
assert.Equal(t, "jaeger-1995-04-21", actual)
actual := indexWithDate(spanIndexPrefix, time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC))
assert.Equal(t, "jaeger-span-1995-04-21", actual)
actual = indexWithDate(serviceIndexPrefix, time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC))
assert.Equal(t, "jaeger-service-1995-04-21", actual)
})
}

Expand Down

0 comments on commit 545c3c3

Please sign in to comment.