Skip to content

Commit

Permalink
graph: Align time-series before grouping them.
Browse files Browse the repository at this point in the history
Previously, an error was returned if the inputs where not perfectly aligned.
  • Loading branch information
tokkee committed Aug 1, 2016
1 parent e2085bf commit 4ebe90f
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 12 deletions.
63 changes: 51 additions & 12 deletions graph/graph.go
Expand Up @@ -112,24 +112,63 @@ func (p *pl) addTimeseries(c *client.Client, metric Metric, verbose bool) error
return nil
}

// sum is an aggregation function that adds ts2 to ts1.
func sum(ts1, ts2 *sysdb.Timeseries) error {
if !ts1.Start.Equal(ts1.Start) || !ts1.End.Equal(ts2.End) {
return fmt.Errorf("Timeseries cover different ranges: [%s, %s] != [%s, %s]",
ts1.Start, ts1.End, ts2.Start, ts2.End)
}
// align aligns two timeseries such that start and end times and the step
// sizes match.
func align(ts1, ts2 *sysdb.Timeseries) error {
if len(ts1.Data) != len(ts2.Data) {
return fmt.Errorf("Incompatible time-series: %v != %v", ts1.Data, ts2.Data)
return fmt.Errorf("mismatching data sources: %v != %v", ts1.Data, ts2.Data)
}

start := time.Time(ts1.Start)
if t := time.Time(ts2.Start); t.After(start) {
start = t
}
end := time.Time(ts1.End)
if t := time.Time(ts2.End); t.Before(end) {
end = t
}
if end.Before(start) {
return fmt.Errorf("non-overlapping ranges: [%v, %v] <-> [%v, %v]",
ts1.Start, ts1.End, ts2.Start, ts2.End)
}

range1 := time.Time(ts1.End).Sub(time.Time(ts1.Start))
range2 := time.Time(ts2.End).Sub(time.Time(ts2.Start))
for name := range ts1.Data {
if len(ts1.Data[name]) != len(ts2.Data[name]) {
return fmt.Errorf("Time-series %q is not aligned", name)
l1, l2 := len(ts1.Data[name]), len(ts2.Data[name])
if l1 <= 1 || l2 <= 1 {
if l1 == l2 && range1 == range2 {
continue
}
return fmt.Errorf("invalid value count for %q: %d != %d", name, l1, l2)
}

step1, step2 := range1/time.Duration(l1-1), range2/time.Duration(l2-1)
if step1 != step2 || step1 <= 0 {
return fmt.Errorf("mismatching steps sizes for %q: %v != %v", name, step1, step2)
}

for _, ts := range []*sysdb.Timeseries{ts1, ts2} {
a := start.Sub(time.Time(ts.Start)) / step1
b := end.Sub(time.Time(ts.Start)) / step1
ts.Data[name] = ts.Data[name][a : b+1]
}
}

ts1.Start, ts2.Start = sysdb.Time(start), sysdb.Time(start)
ts1.End, ts2.End = sysdb.Time(end), sysdb.Time(end)
return nil
}

// sum is an aggregation function that adds ts2 to ts1. The timeseries will be
// aligned.
func sum(ts1, ts2 *sysdb.Timeseries) error {
if err := align(ts1, ts2); err != nil {
return fmt.Errorf("Incompatible time-series: %v", err)
}

for name := range ts1.Data {
for i := range ts1.Data[name] {
if !ts1.Data[name][i].Timestamp.Equal(ts2.Data[name][i].Timestamp) {
return fmt.Errorf("Time-series %q is not aligned", name)
}
ts1.Data[name][i].Value += ts2.Data[name][i].Value
}
}
Expand Down
193 changes: 193 additions & 0 deletions graph/graph_test.go
@@ -0,0 +1,193 @@
//
// Copyright (C) 2016 Sebastian 'tokkee' Harl <sh@tokkee.org>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
// TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
// OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

package graph

import (
"reflect"
"testing"
"time"

"github.com/sysdb/go/sysdb"
)

func TestAlign(t *testing.T) {
for _, test := range []struct {
ts1, ts2 *sysdb.Timeseries
want *sysdb.Timeseries
}{
{
ts1: &sysdb.Timeseries{
Start: ts(4, 5, 0),
End: ts(4, 10, 0),
Data: map[string][]sysdb.DataPoint{
"value": []sysdb.DataPoint{
{ts(4, 5, 0), 0.0},
{ts(4, 6, 0), 0.0},
{ts(4, 7, 0), 1.0},
{ts(4, 8, 0), 1.0},
{ts(4, 9, 0), 0.0},
{ts(4, 10, 0), 0.0},
},
},
},
ts2: &sysdb.Timeseries{
Start: ts(4, 7, 0),
End: ts(4, 8, 0),
Data: map[string][]sysdb.DataPoint{
"value": []sysdb.DataPoint{
{ts(4, 7, 0), 1.0},
{ts(4, 8, 0), 1.0},
},
},
},
want: &sysdb.Timeseries{
Start: ts(4, 7, 0),
End: ts(4, 8, 0),
Data: map[string][]sysdb.DataPoint{
"value": []sysdb.DataPoint{
{ts(4, 7, 0), 1.0},
{ts(4, 8, 0), 1.0},
},
},
},
},
{
ts1: &sysdb.Timeseries{
Start: ts(4, 7, 0),
End: ts(4, 8, 0),
Data: map[string][]sysdb.DataPoint{
"value": []sysdb.DataPoint{
{ts(4, 7, 0), 1.0},
{ts(4, 8, 0), 1.0},
},
},
},
ts2: &sysdb.Timeseries{
Start: ts(4, 7, 0),
End: ts(4, 8, 0),
Data: map[string][]sysdb.DataPoint{
"value": []sysdb.DataPoint{
{ts(4, 7, 0), 1.0},
{ts(4, 8, 0), 1.0},
},
},
},
want: &sysdb.Timeseries{
Start: ts(4, 7, 0),
End: ts(4, 8, 0),
Data: map[string][]sysdb.DataPoint{
"value": []sysdb.DataPoint{
{ts(4, 7, 0), 1.0},
{ts(4, 8, 0), 1.0},
},
},
},
},
{
ts1: &sysdb.Timeseries{
Start: ts(4, 5, 0),
End: ts(4, 10, 0),
Data: map[string][]sysdb.DataPoint{
"value": []sysdb.DataPoint{
{ts(4, 5, 0), 0.0},
{ts(4, 6, 0), 0.0},
{ts(4, 7, 0), 0.0},
{ts(4, 8, 0), 1.0},
{ts(4, 9, 0), 1.0},
{ts(4, 10, 0), 1.0},
},
},
},
ts2: &sysdb.Timeseries{
Start: ts(4, 8, 0),
End: ts(4, 12, 0),
Data: map[string][]sysdb.DataPoint{
"value": []sysdb.DataPoint{
{ts(4, 8, 0), 1.0},
{ts(4, 9, 0), 1.0},
{ts(4, 10, 0), 1.0},
{ts(4, 11, 0), 0.0},
{ts(4, 12, 0), 0.0},
},
},
},
want: &sysdb.Timeseries{
Start: ts(4, 8, 0),
End: ts(4, 10, 0),
Data: map[string][]sysdb.DataPoint{
"value": []sysdb.DataPoint{
{ts(4, 8, 0), 1.0},
{ts(4, 9, 0), 1.0},
{ts(4, 10, 0), 1.0},
},
},
},
},
{
ts1: &sysdb.Timeseries{
Start: ts(4, 7, 0),
End: ts(4, 7, 0),
Data: map[string][]sysdb.DataPoint{
"value": []sysdb.DataPoint{
{ts(4, 7, 0), 1.0},
},
},
},
ts2: &sysdb.Timeseries{
Start: ts(4, 7, 0),
End: ts(4, 7, 0),
Data: map[string][]sysdb.DataPoint{
"value": []sysdb.DataPoint{
{ts(4, 7, 0), 1.0},
},
},
},
want: &sysdb.Timeseries{
Start: ts(4, 7, 0),
End: ts(4, 7, 0),
Data: map[string][]sysdb.DataPoint{
"value": []sysdb.DataPoint{
{ts(4, 7, 0), 1.0},
},
},
},
},
} {
if err := align(test.ts1, test.ts2); err != nil {
t.Errorf("align(%v, %v) = %v; want <nil>", test.ts1, test.ts2, err)
}

if !reflect.DeepEqual(test.ts1, test.want) || !reflect.DeepEqual(test.ts2, test.want) {
t.Errorf("align() unexpected result %v, %v; want %v", test.ts1, test.ts2, test.want)
}
}
}

func ts(hour, min, sec int) sysdb.Time {
return sysdb.Time(time.Date(2016, 1, 1, hour, min, sec, 0, time.UTC))
}

// vim: set tw=78 sw=4 sw=4 noexpandtab :

0 comments on commit 4ebe90f

Please sign in to comment.