-
Notifications
You must be signed in to change notification settings - Fork 111
/
column_desc_stats.go
116 lines (99 loc) · 2.8 KB
/
column_desc_stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package queries
import (
"context"
"database/sql"
"fmt"
"io"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
)
type ColumnDescriptiveStatistics struct {
TableName string
ColumnName string
Result *runtimev1.NumericStatistics
}
var _ runtime.Query = &ColumnDescriptiveStatistics{}
func (q *ColumnDescriptiveStatistics) Key() string {
return fmt.Sprintf("ColumnDescriptiveStatistics:%s:%s", q.TableName, q.ColumnName)
}
func (q *ColumnDescriptiveStatistics) Deps() []string {
return []string{q.TableName}
}
func (q *ColumnDescriptiveStatistics) MarshalResult() *runtime.QueryResult {
return &runtime.QueryResult{
Value: q.Result,
Bytes: sizeProtoMessage(q.Result),
}
}
func (q *ColumnDescriptiveStatistics) UnmarshalResult(v any) error {
res, ok := v.(*runtimev1.NumericStatistics)
if !ok {
return fmt.Errorf("ColumnDescriptiveStatistics: mismatched unmarshal input")
}
q.Result = res
return nil
}
func (q *ColumnDescriptiveStatistics) Resolve(ctx context.Context, rt *runtime.Runtime, instanceID string, priority int) error {
olap, release, err := rt.OLAP(ctx, instanceID)
if err != nil {
return err
}
defer release()
if olap.Dialect() != drivers.DialectDuckDB {
return fmt.Errorf("not available for dialect '%s'", olap.Dialect())
}
sanitizedColumnName := safeName(q.ColumnName)
descriptiveStatisticsSQL := fmt.Sprintf("SELECT "+
"min(%s)::DOUBLE as min, "+
"approx_quantile(%s, 0.25)::DOUBLE as q25, "+
"approx_quantile(%s, 0.5)::DOUBLE as q50, "+
"approx_quantile(%s, 0.75)::DOUBLE as q75, "+
"max(%s)::DOUBLE as max, "+
"avg(%s)::DOUBLE as mean, "+
"stddev_pop(%s)::DOUBLE as sd "+
"FROM %s",
sanitizedColumnName,
sanitizedColumnName,
sanitizedColumnName,
sanitizedColumnName,
sanitizedColumnName,
sanitizedColumnName,
sanitizedColumnName,
safeName(q.TableName))
rows, err := olap.Execute(ctx, &drivers.Statement{
Query: descriptiveStatisticsSQL,
Priority: priority,
ExecutionTimeout: defaultExecutionTimeout,
})
if err != nil {
return err
}
defer rows.Close()
stats := new(runtimev1.NumericStatistics)
var min, q25, q50, q75, max, mean, sd sql.NullFloat64
if rows.Next() {
err = rows.Scan(&min, &q25, &q50, &q75, &max, &mean, &sd)
if err != nil {
return err
}
}
err = rows.Err()
if err != nil {
return err
}
if min.Valid {
stats.Min = min.Float64
stats.Max = max.Float64
stats.Q25 = q25.Float64
stats.Q50 = q50.Float64
stats.Q75 = q75.Float64
stats.Mean = mean.Float64
stats.Sd = sd.Float64
q.Result = stats
}
return nil
}
func (q *ColumnDescriptiveStatistics) Export(ctx context.Context, rt *runtime.Runtime, instanceID string, w io.Writer, opts *runtime.ExportOptions) error {
return ErrExportNotSupported
}