-
Notifications
You must be signed in to change notification settings - Fork 117
/
column_null_count.go
97 lines (81 loc) · 2.23 KB
/
column_null_count.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package queries
import (
"context"
"fmt"
"io"
"reflect"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
)
type ColumnNullCount struct {
Connector string
Database string
DatabaseSchema string
TableName string
ColumnName string
Result float64
}
var _ runtime.Query = &ColumnNullCount{}
func (q *ColumnNullCount) Key() string {
return fmt.Sprintf("ColumnNullCount:%s:%s", q.TableName, q.ColumnName)
}
func (q *ColumnNullCount) Deps() []*runtimev1.ResourceName {
return []*runtimev1.ResourceName{
{Kind: runtime.ResourceKindSource, Name: q.TableName},
{Kind: runtime.ResourceKindModel, Name: q.TableName},
}
}
func (q *ColumnNullCount) MarshalResult() *runtime.QueryResult {
return &runtime.QueryResult{
Value: q.Result,
Bytes: int64(reflect.TypeOf(q.Result).Size()),
}
}
func (q *ColumnNullCount) UnmarshalResult(v any) error {
res, ok := v.(float64)
if !ok {
return fmt.Errorf("ColumnNullCount: mismatched unmarshal input")
}
q.Result = res
return nil
}
func (q *ColumnNullCount) Resolve(ctx context.Context, rt *runtime.Runtime, instanceID string, priority int) error {
olap, release, err := rt.OLAP(ctx, instanceID, q.Connector)
if err != nil {
return err
}
defer release()
if olap.Dialect() != drivers.DialectDuckDB && olap.Dialect() != drivers.DialectClickHouse {
return fmt.Errorf("not available for dialect '%s'", olap.Dialect())
}
nullCountSQL := fmt.Sprintf("SELECT count(*) AS count FROM %s WHERE %s IS NULL",
olap.Dialect().EscapeTable(q.Database, q.DatabaseSchema, q.TableName),
safeName(q.ColumnName),
)
rows, err := olap.Execute(ctx, &drivers.Statement{
Query: nullCountSQL,
Priority: priority,
ExecutionTimeout: defaultExecutionTimeout,
})
if err != nil {
return err
}
defer rows.Close()
var count float64
for rows.Next() {
err = rows.Scan(&count)
if err != nil {
return err
}
}
err = rows.Err()
if err != nil {
return err
}
q.Result = count
return nil
}
func (q *ColumnNullCount) Export(ctx context.Context, rt *runtime.Runtime, instanceID string, w io.Writer, opts *runtime.ExportOptions) error {
return ErrExportNotSupported
}