Skip to content

Commit

Permalink
clean up code
Browse files Browse the repository at this point in the history
  • Loading branch information
mh-park committed Jun 20, 2017
1 parent 3904adc commit aa4c8b0
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 257 deletions.
50 changes: 31 additions & 19 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,25 @@ package spanstore

import (
"context"
"errors"
"time"

"github.com/olivere/elastic"
"go.uber.org/zap"

"github.com/pkg/errors"
"github.com/uber/jaeger/model"
"github.com/uber/jaeger/pkg/es"
"github.com/uber/jaeger/storage/spanstore"
)

const (
serviceName = "serviceName"
indexPrefix = "jaeger-"
operationsAggregation = "distinct_operations"
servicesAggregation = "distinct_services"
defaultDocCount = 3000
)

// SpanReader can query for and load traces from ElasticSearch
type SpanReader struct {
ctx context.Context
Expand All @@ -59,18 +67,18 @@ func (s *SpanReader) GetTrace(traceID model.TraceID) (*model.Trace, error) {
// Returns the array of indices that we need to query, based on query params
func (s *SpanReader) findIndices(traceQuery spanstore.TraceQueryParameters) []string {
today := time.Now()
threeDaysAgo := today.AddDate(0, 0, -3)
threeDaysAgo := today.AddDate(0, 0, -3) // TODO: make this configurable

if traceQuery.StartTimeMax.IsZero() || traceQuery.StartTimeMin.IsZero() {
traceQuery.StartTimeMax = time.Now()
traceQuery.StartTimeMin = time.Now().AddDate(0, 0, -3)
traceQuery.StartTimeMax = today
traceQuery.StartTimeMin = threeDaysAgo
}

var indices []string
current := traceQuery.StartTimeMax
for current.After(traceQuery.StartTimeMin) && current.After(threeDaysAgo) {
index := "jaeger-" + current.Format("2006-01-02")
exists, _ := s.client.IndexExists(index).Do(s.ctx)
index := s.indexWithDate(current)
exists, _ := s.client.IndexExists(index).Do(s.ctx) // Don't care about error, if it's an error, exists will be false anyway
if exists {
indices = append(indices, index)
}
Expand All @@ -79,6 +87,10 @@ func (s *SpanReader) findIndices(traceQuery spanstore.TraceQueryParameters) []st
return indices
}

func (s *SpanReader) indexWithDate(date time.Time) string {
return indexPrefix + date.Format("2006-01-02")
}

// GetServices returns all services traced by Jaeger, ordered by frequency
func (s *SpanReader) GetServices() ([]string, error) {
serviceAggregation := s.getServicesAggregation()
Expand All @@ -88,44 +100,43 @@ func (s *SpanReader) GetServices() ([]string, error) {
searchService := s.client.Search(jaegerIndices...).
Type(serviceType).
Size(0). // set to 0 because we don't want actual documents.
Aggregation("distinct_services", serviceAggregation)
Aggregation(servicesAggregation, serviceAggregation)

searchResult, err := searchService.Do(s.ctx)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "Search service failed")
}

bucket, found := searchResult.Aggregations.Terms("distinct_services")
bucket, found := searchResult.Aggregations.Terms(servicesAggregation)
if !found {
err = errors.New("Could not find aggregation of services")
return nil, err
return nil, errors.New("Could not find aggregation of services")
}
serviceNamesBucket := bucket.Buckets
return s.bucketToStringArray(serviceNamesBucket)
}

func (s *SpanReader) getServicesAggregation() elastic.Query {
return elastic.NewTermsAggregation().
Field("serviceName").
Size(3000) // Must set to some large number. ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838
Field(serviceName).
Size(defaultDocCount) // Must set to some large number. ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838
}

// GetOperations returns all operations for a specific service traced by Jaeger
func (s *SpanReader) GetOperations(service string) ([]string, error) {
serviceQuery := elastic.NewTermQuery("serviceName", service)
serviceQuery := elastic.NewTermQuery(serviceName, service)
serviceFilter := elastic.NewFilterAggregation().Filter(serviceQuery)
jaegerIndices := s.findIndices(spanstore.TraceQueryParameters{})

searchService := s.client.Search(jaegerIndices...).
Type(serviceType).
Size(0).
Aggregation("distinct_operations", serviceFilter)
Aggregation(operationsAggregation, serviceFilter)

searchResult, err := searchService.Do(s.ctx)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "Search service failed")
}
bucket, found := searchResult.Aggregations.Terms("distinct_operations")
bucket, found := searchResult.Aggregations.Terms(operationsAggregation)
if !found {
err = errors.New("Could not find aggregation of operations")
return nil, err
Expand All @@ -137,16 +148,17 @@ func (s *SpanReader) GetOperations(service string) ([]string, error) {
func (s *SpanReader) bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string, error) {
strings := make([]string, len(buckets))
for i, keyitem := range buckets {
s, ok := keyitem.Key.(string)
str, ok := keyitem.Key.(string)
if !ok {
return nil, errors.New("Non-string key found in aggregation")
}
strings[i] = s
strings[i] = str
}
return strings, nil
}

// FindTraces retrieves traces that match the traceQuery
func (s *SpanReader) FindTraces(traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
// TODO
return nil, nil
}
Loading

0 comments on commit aa4c8b0

Please sign in to comment.