/
bq.go
119 lines (105 loc) · 2.42 KB
/
bq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package bq
import (
"context"
"fmt"
"strings"
"cloud.google.com/go/bigquery"
"github.com/k1LoW/tbls/dict"
"github.com/k1LoW/tbls/schema"
"github.com/pkg/errors"
)
// Bigquery struct
type Bigquery struct {
ctx context.Context
client *bigquery.Client
datasetID string
}
// New return new Bigquery
func New(ctx context.Context, client *bigquery.Client, datasetID string) (*Bigquery, error) {
return &Bigquery{
ctx: ctx,
client: client,
datasetID: datasetID,
}, nil
}
func (b *Bigquery) Analyze(s *schema.Schema) error {
d, err := b.Info()
if err != nil {
return errors.WithStack(err)
}
s.Driver = d
ds := b.client.Dataset(b.datasetID)
m, err := ds.Metadata(b.ctx)
if err != nil {
return err
}
s.Desc = m.Description
for k, v := range m.Labels {
s.Labels = append(s.Labels, &schema.Label{Name: fmt.Sprintf("%s:%s", k, v), Virtual: false})
}
bt := ds.Tables(b.ctx)
// tables
tables := []*schema.Table{}
for {
t, err := bt.Next()
if err != nil {
if err.Error() == "no more items in iterator" {
break
}
return err
}
m, err := t.Metadata(b.ctx)
if err != nil {
return err
}
labels := schema.Labels{}
for k, v := range m.Labels {
labels = append(labels, &schema.Label{Name: fmt.Sprintf("%s:%s", k, v), Virtual: false})
}
splitted := strings.Split(m.FullID, fmt.Sprintf("%s.", b.datasetID))
table := &schema.Table{
Name: strings.Join(splitted[1:], ""),
Comment: m.Description,
Type: string(m.Type),
Def: m.ViewQuery,
Columns: listColumns(m.Schema, ""),
Labels: labels,
}
tables = append(tables, table)
}
s.Tables = tables
return nil
}
func listColumns(s bigquery.Schema, prefix string) []*schema.Column {
columns := []*schema.Column{}
for _, c := range s {
name := fmt.Sprintf("%s%s", prefix, c.Name)
column := &schema.Column{
Name: name,
Comment: c.Description,
Nullable: !c.Required,
Type: string(c.Type),
// TODO: c.Repeated
}
columns = append(columns, column)
if len(c.Schema) > 0 {
nestedColumns := listColumns(c.Schema, fmt.Sprintf("%s.", name))
columns = append(columns, nestedColumns...)
}
}
return columns
}
func (b *Bigquery) Info() (*schema.Driver, error) {
dct := dict.New()
dct.Merge(map[string]string{
"Comment": "Description",
})
d := &schema.Driver{
Name: "bigquery",
DatabaseVersion: "",
Meta: &schema.DriverMeta{
Dict: &dct,
},
}
return d, nil
}