-
Notifications
You must be signed in to change notification settings - Fork 100
/
column_cardinality.go
100 lines (85 loc) · 2.46 KB
/
column_cardinality.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package queries
import (
"context"
"errors"
"fmt"
"io"
"reflect"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
)
type ColumnCardinality struct {
Connector string
Database string
DatabaseSchema string
TableName string
ColumnName string
Result float64
}
var _ runtime.Query = &ColumnCardinality{}
func (q *ColumnCardinality) Key() string {
return fmt.Sprintf("ColumnCardinality:%s:%s", q.TableName, q.ColumnName)
}
func (q *ColumnCardinality) Deps() []*runtimev1.ResourceName {
return []*runtimev1.ResourceName{
{Kind: runtime.ResourceKindSource, Name: q.TableName},
{Kind: runtime.ResourceKindModel, Name: q.TableName},
}
}
func (q *ColumnCardinality) MarshalResult() *runtime.QueryResult {
return &runtime.QueryResult{
Value: q.Result,
Bytes: int64(reflect.TypeOf(q.Result).Size()),
}
}
func (q *ColumnCardinality) UnmarshalResult(v any) error {
res, ok := v.(float64)
if !ok {
return fmt.Errorf("ColumnCardinality: mismatched unmarshal input")
}
q.Result = res
return nil
}
func (q *ColumnCardinality) Resolve(ctx context.Context, rt *runtime.Runtime, instanceID string, priority int) error {
olap, release, err := rt.OLAP(ctx, instanceID, q.Connector)
if err != nil {
return err
}
defer release()
var requestSQL string
switch olap.Dialect() {
case drivers.DialectDuckDB:
requestSQL = fmt.Sprintf("SELECT approx_count_distinct(%s) AS count FROM %s", safeName(q.ColumnName), olap.Dialect().EscapeTable(q.Database, q.DatabaseSchema, q.TableName))
case drivers.DialectClickHouse:
requestSQL = fmt.Sprintf("SELECT uniq(%s) AS count FROM %s", safeName(q.ColumnName), olap.Dialect().EscapeTable(q.Database, q.DatabaseSchema, q.TableName))
default:
return fmt.Errorf("not available for dialect '%s'", olap.Dialect())
}
rows, err := olap.Execute(ctx, &drivers.Statement{
Query: requestSQL,
Priority: priority,
ExecutionTimeout: defaultExecutionTimeout,
})
if err != nil {
return err
}
defer rows.Close()
var count float64
if rows.Next() {
err = rows.Scan(&count)
if err != nil {
return err
}
q.Result = count
return nil
}
err = rows.Err()
if err != nil {
return err
}
return errors.New("no rows returned")
}
func (q *ColumnCardinality) Export(ctx context.Context, rt *runtime.Runtime, instanceID string, w io.Writer, opts *runtime.ExportOptions) error {
return ErrExportNotSupported
}