-
Notifications
You must be signed in to change notification settings - Fork 100
/
timeseries_interval.go
98 lines (85 loc) · 2.62 KB
/
timeseries_interval.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package queries
import (
"context"
"fmt"
"io"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
)
type RollupInterval struct {
Connector string
TableName string
ColumnName string
Result *runtimev1.ColumnRollupIntervalResponse
}
var _ runtime.Query = &RollupInterval{}
func (q *RollupInterval) Key() string {
return fmt.Sprintf("RollupInterval:%s:%s", q.TableName, q.ColumnName)
}
func (q *RollupInterval) Deps() []*runtimev1.ResourceName {
return []*runtimev1.ResourceName{
{Kind: runtime.ResourceKindSource, Name: q.TableName},
{Kind: runtime.ResourceKindModel, Name: q.TableName},
}
}
func (q *RollupInterval) MarshalResult() *runtime.QueryResult {
return &runtime.QueryResult{
Value: q.Result,
Bytes: sizeProtoMessage(q.Result),
}
}
func (q *RollupInterval) UnmarshalResult(v any) error {
res, ok := v.(*runtimev1.ColumnRollupIntervalResponse)
if !ok {
return fmt.Errorf("RollupInterval: mismatched unmarshal input")
}
q.Result = res
return nil
}
func (q *RollupInterval) Resolve(ctx context.Context, rt *runtime.Runtime, instanceID string, priority int) error {
ctr := &ColumnTimeRange{
Connector: q.Connector,
TableName: q.TableName,
ColumnName: q.ColumnName,
}
err := rt.Query(ctx, instanceID, ctr, priority)
if err != nil {
return err
}
if ctr.Result.Interval == nil {
q.Result = &runtimev1.ColumnRollupIntervalResponse{}
return nil
}
r := ctr.Result.Interval
const (
microsSecond = 1000 * 1000
microsMinute = 1000 * 1000 * 60
microsHour = 1000 * 1000 * 60 * 60
microsDay = 1000 * 1000 * 60 * 60 * 24
)
var rollupInterval runtimev1.TimeGrain
if r.Days == 0 && r.Micros <= microsMinute {
rollupInterval = runtimev1.TimeGrain_TIME_GRAIN_MILLISECOND
} else if r.Days == 0 && r.Micros > microsMinute && r.Micros <= microsHour {
rollupInterval = runtimev1.TimeGrain_TIME_GRAIN_SECOND
} else if r.Days == 0 && r.Micros <= microsDay {
rollupInterval = runtimev1.TimeGrain_TIME_GRAIN_MINUTE
} else if r.Days <= 7 {
rollupInterval = runtimev1.TimeGrain_TIME_GRAIN_HOUR
} else if r.Days <= 365*20 {
rollupInterval = runtimev1.TimeGrain_TIME_GRAIN_DAY
} else if r.Days <= 365*500 {
rollupInterval = runtimev1.TimeGrain_TIME_GRAIN_MONTH
} else {
rollupInterval = runtimev1.TimeGrain_TIME_GRAIN_YEAR
}
q.Result = &runtimev1.ColumnRollupIntervalResponse{
Interval: rollupInterval,
Start: ctr.Result.Min,
End: ctr.Result.Max,
}
return nil
}
func (q *RollupInterval) Export(ctx context.Context, rt *runtime.Runtime, instanceID string, w io.Writer, opts *runtime.ExportOptions) error {
return ErrExportNotSupported
}