forked from vmware-archive/atc
/
retryable_conn.go
66 lines (51 loc) · 1.34 KB
/
retryable_conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package db
import (
"net"
"reflect"
"github.com/jackc/pgx"
)
type RetryableConn struct {
Connector Connector
Conn DelegateConn // *pgx.Conn
}
//go:generate counterfeiter . DelegateConn
type DelegateConn interface {
Query(sql string, args ...interface{}) (*pgx.Rows, error)
QueryRow(sql string, args ...interface{}) *pgx.Row
Exec(sql string, arguments ...interface{}) (commandTag pgx.CommandTag, err error)
}
//go:generate counterfeiter . Connector
type Connector interface {
Connect() (DelegateConn, error)
}
type PgxConnector struct {
PgxConfig pgx.ConnConfig
}
func (c PgxConnector) Connect() (DelegateConn, error) {
return pgx.Connect(c.PgxConfig)
}
func (c *RetryableConn) Exec(sql string, arguments ...interface{}) (pgx.CommandTag, error) {
return c.Conn.Exec(sql, arguments...)
}
func (c *RetryableConn) QueryRow(sql string, args ...interface{}) *pgx.Row {
rows, queryErr := c.Conn.Query(sql, args...)
if queryErr != nil {
var connError *net.OpError
for queryErr != nil && reflect.TypeOf(queryErr) == reflect.TypeOf(connError) {
err := c.reconnect()
if err != nil {
continue
}
rows, queryErr = c.Conn.Query(sql, args...)
}
}
return (*pgx.Row)(rows)
}
func (c *RetryableConn) reconnect() error {
deleteConn, err := c.Connector.Connect()
if err != nil {
return err
}
c.Conn = deleteConn
return nil
}