Skip to content

Commit

Permalink
merge result in read adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
mtanda committed May 16, 2018
1 parent 3166d8b commit 6cc48cc
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 18 deletions.
11 changes: 8 additions & 3 deletions archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ func (archiver *Archiver) loadState() (*ArchiverState, error) {
return &state, nil
}

func (archiver *Archiver) query(q *prompb.Query) ([]*prompb.TimeSeries, error) {
result := []*prompb.TimeSeries{}
func (archiver *Archiver) query(q *prompb.Query) (resultMap, error) {
result := make(resultMap)

matchers, err := fromLabelMatchers(q.Matchers)
if err != nil {
Expand All @@ -480,8 +480,13 @@ func (archiver *Archiver) query(q *prompb.Query) ([]*prompb.TimeSeries, error) {
s := ss.At()

labels := s.Labels()
sort.Slice(labels, func(i, j int) bool {
return labels[i].Name < labels[j].Name
})
id := ""
for _, label := range labels {
ts.Labels = append(ts.Labels, &prompb.Label{Name: label.Name, Value: label.Value})
id = id + label.Name + label.Value
}

lastTimestamp := int64(0)
Expand All @@ -498,7 +503,7 @@ func (archiver *Archiver) query(q *prompb.Query) ([]*prompb.TimeSeries, error) {
ts.Samples = append(ts.Samples, &prompb.Sample{Value: math.Float64frombits(prom_value.StaleNaN), Timestamp: lastTimestamp + (60 * 1000)})
}

result = append(result, ts)
result[id] = ts
}

return result, nil
Expand Down
43 changes: 31 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,27 @@ type config struct {
storagePath string
}

type resultMap map[string]*prompb.TimeSeries

func (x resultMap) append(y resultMap) {
for id, yts := range y {
if xts, ok := x[id]; ok {
xts.Samples = append(xts.Samples, yts.Samples...)
} else {
x[id] = yts
}
}
}
func (x resultMap) slice() []*prompb.TimeSeries {
s := []*prompb.TimeSeries{}
for _, v := range x {
s = append(s, v)
}
return s
}

func runQuery(indexer *Indexer, archiver *Archiver, q *prompb.Query, logger log.Logger) []*prompb.TimeSeries {
result := []*prompb.TimeSeries{}
result := make(resultMap)

namespace := ""
jobIndex := -1
Expand All @@ -52,7 +71,7 @@ func runQuery(indexer *Indexer, archiver *Archiver, q *prompb.Query, logger log.
}
if namespace == "" {
level.Debug(logger).Log("msg", "namespace is required")
return result
return result.slice()
}

startTime := time.Unix(int64(q.StartTimestampMs/1000), int64(q.StartTimestampMs%1000*1000))
Expand Down Expand Up @@ -81,7 +100,7 @@ func runQuery(indexer *Indexer, archiver *Archiver, q *prompb.Query, logger log.
region, queries, err = getQueryWithIndex(&baq, indexer)
if err != nil {
level.Error(logger).Log("err", err)
return result
return result.slice()
}
}
if q.StartTimestampMs < q.EndTimestampMs {
Expand All @@ -93,9 +112,9 @@ func runQuery(indexer *Indexer, archiver *Archiver, q *prompb.Query, logger log.
archivedResult, err := archiver.query(&aq)
if err != nil {
level.Error(logger).Log("err", err)
return result
return result.slice()
}
result = append(result, archivedResult...)
result.append(archivedResult)
q.StartTimestampMs = aq.EndTimestampMs
level.Info(logger).Log("msg", fmt.Sprintf("Get %d time series from archive.", len(result)))
}
Expand All @@ -110,39 +129,39 @@ func runQuery(indexer *Indexer, archiver *Archiver, q *prompb.Query, logger log.
queries = append(queries, extraQueries...)
if err != nil {
level.Error(logger).Log("err", err)
return result
return result.slice()
}
} else {
level.Info(logger).Log("msg", "querying for CloudWatch with index", "query", fmt.Sprintf("%+v", q))
region, extraQueries, err = getQueryWithIndex(q, indexer)
queries = append(queries, extraQueries...)
if err != nil {
level.Error(logger).Log("err", err)
return result
return result.slice()
}
}
}

if len(queries) > 300 {
level.Warn(logger).Log("msg", "Too many concurrent queries")
return result
return result.slice()
}

cfg := &aws.Config{Region: aws.String(region)}
sess, err := session.NewSession(cfg)
if err != nil {
level.Error(logger).Log("err", err)
return result
return result.slice()
}
svc := cloudwatch.New(sess, cfg)

for _, query := range queries {
cwResult, err := queryCloudWatch(svc, region, query, q)
if err != nil {
level.Error(logger).Log("err", err)
return result
return result.slice()
}
result = append(result, cwResult...)
result.append(cwResult)
}
if originalJobLabel != "" {
for _, ts := range result {
Expand All @@ -152,7 +171,7 @@ func runQuery(indexer *Indexer, archiver *Archiver, q *prompb.Query, logger log.

level.Info(logger).Log("msg", fmt.Sprintf("Returned %d time series.", len(result)))

return result
return result.slice()
}

func GetDefaultRegion() (string, error) {
Expand Down
13 changes: 10 additions & 3 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func getQueryWithIndex(q *prompb.Query, indexer *Indexer) (string, []*cloudwatch
return region, queries, nil
}

func queryCloudWatch(svc *cloudwatch.CloudWatch, region string, query *cloudwatch.GetMetricStatisticsInput, q *prompb.Query) ([]*prompb.TimeSeries, error) {
result := []*prompb.TimeSeries{}
func queryCloudWatch(svc *cloudwatch.CloudWatch, region string, query *cloudwatch.GetMetricStatisticsInput, q *prompb.Query) (resultMap, error) {
result := make(resultMap)

if query.Namespace == nil || query.MetricName == nil {
return result, fmt.Errorf("missing parameter")
Expand Down Expand Up @@ -274,7 +274,14 @@ func queryCloudWatch(svc *cloudwatch.CloudWatch, region string, query *cloudwatc
}

for _, ts := range tsm {
result = append(result, ts)
id := ""
sort.Slice(ts.Labels, func(i, j int) bool {
return ts.Labels[i].Name < ts.Labels[j].Name
})
for _, label := range ts.Labels {
id = id + label.Name + label.Value
}
result[id] = ts
}

return result, nil
Expand Down

0 comments on commit 6cc48cc

Please sign in to comment.