-
Notifications
You must be signed in to change notification settings - Fork 117
/
table_columns.go
94 lines (80 loc) · 2.24 KB
/
table_columns.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package queries
import (
"context"
"fmt"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
)
type TableColumns struct {
TableName string
Result []*runtimev1.ProfileColumn
}
var _ runtime.Query = &TableColumns{}
func (q *TableColumns) Key() string {
return fmt.Sprintf("TableColumns:%s", q.TableName)
}
func (q *TableColumns) Deps() []string {
return []string{q.TableName}
}
func (q *TableColumns) MarshalResult() any {
return q.Result
}
func (q *TableColumns) UnmarshalResult(v any) error {
res, ok := v.([]*runtimev1.ProfileColumn)
if !ok {
return fmt.Errorf("TableColumns: mismatched unmarshal input")
}
q.Result = res
return nil
}
func (q *TableColumns) Resolve(ctx context.Context, rt *runtime.Runtime, instanceID string, priority int) error {
olap, err := rt.OLAP(ctx, instanceID)
if err != nil {
return err
}
if olap.Dialect() != drivers.DialectDuckDB {
return fmt.Errorf("not available for dialect '%s'", olap.Dialect())
}
return olap.WithConnection(ctx, priority, func(ctx context.Context, ensuredCtx context.Context) error {
// views return duplicate column names, so we need to create a temporary table
temporaryTableName := tempName("profile_columns_")
err = olap.Exec(ctx, &drivers.Statement{
Query: fmt.Sprintf(`CREATE TEMPORARY TABLE "%s" AS (SELECT * FROM "%s" LIMIT 1)`, temporaryTableName, q.TableName),
Priority: priority,
})
if err != nil {
return err
}
defer func() {
// NOTE: Using ensuredCtx
_ = olap.Exec(ensuredCtx, &drivers.Statement{
Query: `DROP TABLE "` + temporaryTableName + `"`,
Priority: priority,
})
}()
rows, err := olap.Execute(ctx, &drivers.Statement{
Query: fmt.Sprintf(`
SELECT column_name AS name, data_type AS type
FROM information_schema.columns
WHERE table_catalog = 'temp' AND table_name = '%s'`, temporaryTableName),
Priority: priority,
})
if err != nil {
return err
}
defer rows.Close()
var pcs []*runtimev1.ProfileColumn
i := 0
for rows.Next() {
pc := runtimev1.ProfileColumn{}
if err := rows.StructScan(&pc); err != nil {
return err
}
pcs = append(pcs, &pc)
i++
}
q.Result = pcs[0:i]
return nil
})
}