-
Notifications
You must be signed in to change notification settings - Fork 567
/
sql.go
101 lines (92 loc) · 2.62 KB
/
sql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package sdata
import (
"fmt"
"strings"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/pachsql"
)
// rowLimit limits the number of rows per batch in an INSERT statement
const rowLimit = 1000
// SQLTupleWriter writes tuples to a SQL database.
type SQLTupleWriter struct {
tx *pachsql.Tx
tableInfo *pachsql.TableInfo
insertStatement string
buf []Tuple
}
func (m *SQLTupleWriter) WriteTuple(t Tuple) error {
if len(m.buf) >= rowLimit {
m.Flush()
}
m.buf = append(m.buf, CloneTuple(t))
return nil
}
func (m *SQLTupleWriter) Flush() error {
if len(m.buf) == 0 {
return nil
}
stmt, err := m.GeneratePreparedStatement()
if err != nil {
return errors.EnsureStack(err)
}
// flatten list of Tuple
var values Tuple
for r := range m.buf {
values = append(values, m.buf[r]...)
}
_, err = stmt.Exec(values...)
if err != nil {
return errors.EnsureStack(err)
}
m.buf = m.buf[:0]
return nil
}
// GeneratePreparedStatement generates a prepared statement based the amount of data in the buffer.
// This can be used to execute a batched INSERT.
func (m *SQLTupleWriter) GeneratePreparedStatement() (*pachsql.Stmt, error) {
if len(m.buf) == 0 {
return nil, nil
}
var placeholders []string // a list of (?, ?, ...)
for r := range m.buf {
// construct list of placeholders by accumulating elements into a placeholderRow first
var placeholderRow []string
for c := range m.buf[r] {
i := r*len(m.buf[r]) + c
placeholderRow = append(placeholderRow, pachsql.Placeholder(m.tableInfo.Driver, i))
}
placeholders = append(placeholders, fmt.Sprintf("(%s)", strings.Join(placeholderRow, ", ")))
}
sqlStr := m.insertStatement + strings.Join(placeholders, ", ")
stmt, err := m.tx.Preparex(sqlStr)
if err != nil {
return nil, errors.EnsureStack(err)
}
return stmt, nil
}
func NewSQLTupleWriter(tx *pachsql.Tx, tableInfo *pachsql.TableInfo) *SQLTupleWriter {
var s string
if tableInfo.Driver == "snowflake" {
var (
vv []string
cc []string
)
for i, col := range tableInfo.Columns {
var v string
if col.DataType == "VARIANT" {
v = fmt.Sprintf(`to_variant(COLUMN%d)`, i+1)
} else {
v = fmt.Sprintf(`COLUMN%d`, i+1)
}
vv = append(vv, v)
cc = append(cc, col.Name)
}
s = fmt.Sprintf(`INSERT INTO %s.%s (%s) SELECT %s FROM VALUES `, tableInfo.Schema, tableInfo.Name, strings.Join(cc, ","), strings.Join(vv, ","))
} else {
s = fmt.Sprintf("INSERT INTO %s.%s (%s) VALUES ",
tableInfo.Schema,
tableInfo.Name,
strings.Join(tableInfo.ColumnNames(), ", "))
}
return &SQLTupleWriter{tx, tableInfo, s, []Tuple{}}
}