/
information_schema.go
167 lines (144 loc) · 4.22 KB
/
information_schema.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package clickhouse
import (
"context"
"errors"
"github.com/jmoiron/sqlx"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers"
)
type informationSchema struct {
c *connection
}
func (c *connection) InformationSchema() drivers.InformationSchema {
return informationSchema{c: c}
}
func (i informationSchema) All(ctx context.Context) ([]*drivers.Table, error) {
conn, release, err := i.c.acquireMetaConn(ctx)
if err != nil {
return nil, err
}
defer func() { _ = release() }()
// Clickhouse does not have a concept of schemas. Both table_catalog and table_schema refer to the database where table is located.
// Given the usual way of querying table in clickhouse is `SELECT * FROM table_name` or `SELECT * FROM database.table_name`.
// We map clickhouse database to `database schema` and table_name to `table name`.
q := `
SELECT
T.table_schema AS SCHEMA,
T.table_schema = currentDatabase() AS is_default_schema,
T.table_name AS NAME,
T.table_type AS TABLE_TYPE,
C.column_name AS COLUMNS,
C.data_type AS COLUMN_TYPE,
C.ordinal_position as ORDINAL_POSITION
FROM information_schema.tables T
JOIN information_schema.columns C ON T.table_schema = C.table_schema AND T.table_name = C.table_name
WHERE lower(T.table_schema) NOT IN ('information_schema', 'system')
ORDER BY SCHEMA, NAME, TABLE_TYPE, ORDINAL_POSITION
`
rows, err := conn.QueryxContext(ctx, q)
if err != nil {
return nil, err
}
defer rows.Close()
tables, err := i.scanTables(rows)
if err != nil {
return nil, err
}
return tables, nil
}
func (i informationSchema) Lookup(ctx context.Context, db, schema, name string) (*drivers.Table, error) {
conn, release, err := i.c.acquireMetaConn(ctx)
if err != nil {
return nil, err
}
defer func() { _ = release() }()
var q string
var args []any
q = `
SELECT
T.table_schema AS SCHEMA,
T.table_schema = currentDatabase() AS is_default_schema,
T.table_name AS NAME,
T.table_type AS TABLE_TYPE,
C.column_name AS COLUMNS,
C.data_type AS COLUMN_TYPE,
C.ordinal_position as ORDINAL_POSITION
FROM information_schema.tables T
JOIN information_schema.columns C ON T.table_schema = C.table_schema AND T.table_name = C.table_name
WHERE T.table_schema = coalesce(?, currentDatabase()) AND T.table_name = ?
ORDER BY SCHEMA, NAME, TABLE_TYPE, ORDINAL_POSITION
`
if schema == "" {
args = append(args, nil, name)
} else {
args = append(args, schema, name)
}
rows, err := conn.QueryxContext(ctx, q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
tables, err := i.scanTables(rows)
if err != nil {
return nil, err
}
if len(tables) == 0 {
return nil, drivers.ErrNotFound
}
return tables[0], nil
}
func (i informationSchema) scanTables(rows *sqlx.Rows) ([]*drivers.Table, error) {
var res []*drivers.Table
for rows.Next() {
var databaseSchema string
var isDefaultSchema bool
var name string
var tableType string
var columnName string
var columnType string
var oridinalPosition int
err := rows.Scan(&databaseSchema, &isDefaultSchema, &name, &tableType, &columnName, &columnType, &oridinalPosition)
if err != nil {
return nil, err
}
// set t to res[len(res)-1] if it's the same table, else set t to a new table and append it
var t *drivers.Table
if len(res) > 0 {
t = res[len(res)-1]
if !(t.DatabaseSchema == databaseSchema && t.Name == name) {
t = nil
}
}
if t == nil {
t = &drivers.Table{
DatabaseSchema: databaseSchema,
IsDefaultDatabaseSchema: isDefaultSchema,
Name: name,
View: tableType == "VIEW",
Schema: &runtimev1.StructType{},
}
res = append(res, t)
}
// parse column type
colType, err := databaseTypeToPB(columnType, false)
if err != nil {
if !errors.Is(err, errUnsupportedType) {
return nil, err
}
if t.UnsupportedCols == nil {
t.UnsupportedCols = make(map[string]string)
}
t.UnsupportedCols[columnName] = columnType
continue
}
// append column
t.Schema.Fields = append(t.Schema.Fields, &runtimev1.StructType_Field{
Name: columnName,
Type: colType,
})
}
if err := rows.Err(); err != nil {
return nil, err
}
return res, nil
}