/
conn_validate.go
121 lines (111 loc) · 3.36 KB
/
conn_validate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// 2022/1/14 Bin Liu <bin.liu@enmotech.com>
package pq
import (
"context"
"database/sql/driver"
"errors"
"strings"
)
type ValidateConnectFunc func(conn *conn) error
const (
showTransactionReadOnly = "show transaction_read_only"
pgIsInRecovery = "select pg_is_in_recovery()"
)
func validateConnectTargetSessionAttrsTransaction(cn *conn, expectedStatus string) (bool, error) {
cn.log(
context.Background(), LogLevelDebug, "Check server is transaction_read_only ?", map[string]interface{}{
"sql": showTransactionReadOnly,
paramHost: cn.config.Host, paramPort: cn.config.Port, paramTargetSessionAttrs: cn.config.targetSessionAttrs,
},
)
inReRows, err := cn.query(showTransactionReadOnly, nil)
defer inReRows.Close()
var dbTranReadOnly string
lastCols := []driver.Value{&dbTranReadOnly}
err = inReRows.Next(lastCols)
if err != nil {
cn.log(context.Background(), LogLevelDebug, "err:"+err.Error(), map[string]interface{}{})
return false, err
}
readOnly := lastCols[0].(string)
cn.log(
context.Background(), LogLevelDebug, "Check server is readOnly ?", map[string]interface{}{
"readOnly": readOnly,
paramHost: cn.config.Host, paramPort: cn.config.Port,
},
)
if strings.EqualFold(readOnly, expectedStatus) {
return true, nil
}
return false, nil
}
func ValidateConnectTargetSessionAttrsReadWrite(cn *conn) error {
// omm=# show transaction_read_only;
// transaction_read_only
// -----------------------
// off
// (1 row)
b, err := validateConnectTargetSessionAttrsTransaction(cn, "off")
if err != nil {
return err
}
if !b {
return errors.New("connection is not read write")
}
return nil
}
func ValidateConnectTargetSessionAttrsReadOnly(cn *conn) error {
// omm=# show transaction_read_only;
// transaction_read_only
// -----------------------
// on
// (1 row)
b, err := validateConnectTargetSessionAttrsTransaction(cn, "on")
if err != nil {
return err
}
if !b {
return errors.New("connection is not read only")
}
return nil
}
func validateConnectTargetSessionAttrsRecovery(cn *conn, expectedIsRecovery bool) (bool, error) {
cn.log(context.Background(), LogLevelDebug, "check pg_is_in_recovery", map[string]interface{}{"sql": pgIsInRecovery,
"host": cn.config.Host, "port": cn.config.Port, "target_session_attrs": cn.config.targetSessionAttrs})
inReRows, err := cn.query(pgIsInRecovery, nil)
defer inReRows.Close()
var dbTranReadOnly string
lastCols := []driver.Value{&dbTranReadOnly}
err = inReRows.Next(lastCols)
if err != nil {
cn.log(context.Background(), LogLevelDebug, "err:"+err.Error(), map[string]interface{}{})
return false, err
}
pgIsRecovery := lastCols[0].(bool)
cn.log(context.Background(), LogLevelDebug, "check pg_is_in_recovery ?", map[string]interface{}{"pg_is_in_recovery": pgIsRecovery,
"host": cn.config.Host, "port": cn.config.Port})
if expectedIsRecovery == pgIsRecovery {
return true, nil
}
return false, nil
}
func ValidateConnectTargetSessionAttrsPrimary(cn *conn) error {
b, err := validateConnectTargetSessionAttrsRecovery(cn, false)
if err != nil {
return err
}
if !b {
return errors.New("connection is not primary instance")
}
return nil
}
func ValidateConnectTargetSessionAttrsStandby(cn *conn) error {
b, err := validateConnectTargetSessionAttrsRecovery(cn, true)
if err != nil {
return err
}
if !b {
return errors.New("connection is not standby instance")
}
return nil
}