-
Notifications
You must be signed in to change notification settings - Fork 567
/
sdata.go
173 lines (159 loc) · 4.52 KB
/
sdata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package sdata
import (
"database/sql"
"io"
"reflect"
"time"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/pachsql"
)
// Tuple is an alias for []interface{}.
// It is used for passing around rows of data.
// The elements of a tuple will always be pointers so the Tuple can
// be passed to sql.Rows.Scan
type Tuple = []interface{}
// TupleWriter is the type of Writers for structured data.
type TupleWriter interface {
WriteTuple(row Tuple) error
Flush() error
}
// TupleReader is a stream of Tuples
type TupleReader interface {
// Next attempts to read one Tuple into x.
// If the next data is the wrong shape for x then an error is returned.
Next(x Tuple) error
}
// MaterializationResult is returned by MaterializeSQL
type MaterializationResult struct {
ColumnNames []string
RowCount uint64
}
// MaterializeSQL reads all the rows from a *sql.Rows, and writes them to tw.
// It flushes tw and returns a MaterializationResult
func MaterializeSQL(tw TupleWriter, rows *sql.Rows) (*MaterializationResult, error) {
colNames, err := rows.Columns()
if err != nil {
return nil, errors.EnsureStack(err)
}
cTypes, err := rows.ColumnTypes()
if err != nil {
return nil, errors.EnsureStack(err)
}
row, err := NewTupleFromColumnTypes(cTypes)
if err != nil {
return nil, errors.EnsureStack(err)
}
var count uint64
for rows.Next() {
if err := rows.Scan(row...); err != nil {
return nil, errors.EnsureStack(err)
}
if err := tw.WriteTuple(row); err != nil {
return nil, errors.EnsureStack(err)
}
count++
}
if err := rows.Err(); err != nil {
return nil, errors.EnsureStack(err)
}
if err := tw.Flush(); err != nil {
return nil, errors.EnsureStack(err)
}
return &MaterializationResult{
ColumnNames: colNames,
RowCount: count,
}, nil
}
func NewTupleFromColumnTypes(cTypes []*sql.ColumnType) (Tuple, error) {
row := make(Tuple, len(cTypes))
for i, cType := range cTypes {
dbType := cType.DatabaseTypeName()
nullable, ok := cType.Nullable()
if !ok {
nullable = true
}
var err error
row[i], err = makeTupleElement(dbType, nullable)
if err != nil {
return nil, err
}
}
return row, nil
}
// Copy copies a tuple from r to w. Row is used to indicate the correct shape of read data.
func Copy(w TupleWriter, r TupleReader, row Tuple) (n int, _ error) {
for {
err := r.Next(row)
if errors.Is(err, io.EOF) {
w.Flush()
break
} else if err != nil {
return n, errors.EnsureStack(err)
}
if err := w.WriteTuple(row); err != nil {
return n, errors.EnsureStack(err)
}
n++
}
return n, nil
}
func NewTupleFromTableInfo(info *pachsql.TableInfo) (Tuple, error) {
tuple := make(Tuple, len(info.Columns))
for i, ci := range info.Columns {
var err error
tuple[i], err = makeTupleElement(ci.DataType, ci.IsNullable)
if err != nil {
return nil, err
}
}
return tuple, nil
}
func makeTupleElement(dbType string, nullable bool) (interface{}, error) {
switch dbType {
case "BOOL", "BOOLEAN":
if nullable {
return new(sql.NullBool), nil
}
return new(bool), nil
// Handle number types with string to avoid losing precision.
// FIXED is returned by Snowflake's Go driver, while NUMBER is in INFORMATION_SCHEMA
// DECIMAL is used by MySQL
case
"SMALLINT", "INT2", "INTEGER", "INT", "INT4", "BIGINT", "INT8",
"UNSIGNED SMALLINT", "UNSIGNED INT2", "UNSIGNED INTEGER", "UNSIGNED INT",
"UNSIGNED INT4", "UNSIGNED BIGINT", "UNSIGNED INT8",
"FLOAT", "FLOAT4", "FLOAT8", "REAL", "DOUBLE PRECISION",
"NUMERIC", "DECIMAL", "NUMBER", "FIXED":
if nullable {
return new(sql.NullString), nil
}
return new(string), nil
case "VARCHAR", "TEXT", "CHARACTER VARYING":
if nullable {
return new(sql.NullString), nil
}
return new(string), nil
// TIMESTAMP means different things in different databases
// - postgres doesn't store time zone related info
// - mysql stores time zone
case "DATE", "TIME", "TIMESTAMP", "TIMESTAMP_LTZ", "TIMESTAMP_NTZ", "TIMESTAMP_TZ", "TIMESTAMPTZ", "TIMESTAMP WITH TIME ZONE", "TIMESTAMP WITHOUT TIME ZONE":
if nullable {
return new(sql.NullTime), nil
}
return new(time.Time), nil
case "VARIANT":
return new(interface{}), nil
default:
return nil, errors.Errorf("unrecognized type: %v", dbType)
}
}
// CloneTuple uses Go reflection to make a copy of a Tuple.
func CloneTuple(t Tuple) Tuple {
newTuple := make(Tuple, len(t))
for i := range t {
v := reflect.New(reflect.TypeOf(t[i]).Elem())
v.Elem().Set(reflect.ValueOf(t[i]).Elem())
newTuple[i] = v.Interface()
}
return newTuple
}