-
Notifications
You must be signed in to change notification settings - Fork 117
/
column_topk.go
114 lines (96 loc) · 2.46 KB
/
column_topk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package queries
import (
"context"
"fmt"
"io"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/pbutil"
)
type ColumnTopK struct {
TableName string
ColumnName string
Agg string
K int
Result *runtimev1.TopK
}
var _ runtime.Query = &ColumnTopK{}
func (q *ColumnTopK) Key() string {
return fmt.Sprintf("ColumnTopK:%s:%s:%s:%d", q.TableName, q.ColumnName, q.Agg, q.K)
}
func (q *ColumnTopK) Deps() []*runtimev1.ResourceName {
return []*runtimev1.ResourceName{
{Kind: runtime.ResourceKindSource, Name: q.TableName},
{Kind: runtime.ResourceKindModel, Name: q.TableName},
}
}
func (q *ColumnTopK) MarshalResult() *runtime.QueryResult {
return &runtime.QueryResult{
Value: q.Result,
Bytes: sizeProtoMessage(q.Result),
}
}
func (q *ColumnTopK) UnmarshalResult(v any) error {
res, ok := v.(*runtimev1.TopK)
if !ok {
return fmt.Errorf("topk: mismatched unmarshal input")
}
q.Result = res
return nil
}
func (q *ColumnTopK) Resolve(ctx context.Context, rt *runtime.Runtime, instanceID string, priority int) error {
// Get OLAP connection
olap, release, err := rt.OLAP(ctx, instanceID)
if err != nil {
return err
}
defer release()
// Check dialect
if olap.Dialect() != drivers.DialectDuckDB {
return fmt.Errorf("not available for dialect '%s'", olap.Dialect())
}
// Build SQL
qry := fmt.Sprintf("SELECT %s AS value, %s AS count FROM %s GROUP BY %s ORDER BY count DESC, value ASC LIMIT %d",
safeName(q.ColumnName),
q.Agg,
safeName(q.TableName),
safeName(q.ColumnName),
q.K,
)
// Run query
rows, err := olap.Execute(ctx, &drivers.Statement{
Query: qry,
Priority: priority,
ExecutionTimeout: defaultExecutionTimeout,
})
if err != nil {
return err
}
defer rows.Close()
// Scan result
res := &runtimev1.TopK{}
for rows.Next() {
entry := &runtimev1.TopK_Entry{}
var val interface{}
err = rows.Scan(&val, &entry.Count)
if err != nil {
return err
}
entry.Value, err = pbutil.ToValue(val, safeFieldType(rows.Schema, 0))
if err != nil {
return err
}
res.Entries = append(res.Entries, entry)
}
err = rows.Err()
if err != nil {
return err
}
// Save result
q.Result = res
return nil
}
func (q *ColumnTopK) Export(ctx context.Context, rt *runtime.Runtime, instanceID string, w io.Writer, opts *runtime.ExportOptions) error {
return ErrExportNotSupported
}