This repository has been archived by the owner on Jan 4, 2022. It is now read-only.
/
database.go
206 lines (189 loc) · 4.69 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package reiner
import (
"database/sql"
"strings"
)
// connection 重現了一個資料庫的連線。
type connection struct {
db *sql.DB
tx *sql.Tx
lastCheck int
isHealth bool
dataSourceName string
}
// DB 是一個擁有許多連線的資料庫來源。
type DB struct {
slaves []*connection
master *connection
hasSlave bool
lastSlaveIndex int
}
// openDatabase 會開啟一個新的資料庫連線。
func openDatabase(dataSourceName string) (*sql.DB, error) {
db, err := sql.Open("mysql", dataSourceName)
if err != nil {
return db, err
}
if err = db.Ping(); err != nil {
return db, err
}
return db, nil
}
// newDatabase 會建立一個新的資料庫,當有主從來源時會替這個資料庫建立多個連線。
// 如果僅有單個主要來源的話則會建立一個最主要的連線。
func newDatabase(master string, slaves []string) (*DB, error) {
d := &DB{}
// 如果沒有主從來源就建立一個最主要的連線。
if len(slaves) == 0 {
db, err := openDatabase(master)
if err != nil {
return d, err
}
d.master = &connection{
db: db,
dataSourceName: master,
}
return d, nil
}
d.hasSlave = true
// 連線到 Slave 資料庫。
for _, v := range slaves {
db, err := openDatabase(v)
if err != nil {
return d, err
}
d.slaves = append(d.slaves, &connection{
db: db,
dataSourceName: v,
})
}
return d, nil
}
// roundRobin 會輪詢資料庫連線來避免不斷地呼叫同個資料庫連線。
// 簡單來說就是個簡易型的負載平衡器。
func (d *DB) roundRobin(pool []*connection, currentIndex int) (index int) {
length := len(pool) - 1
index = currentIndex + 1
if index > length {
index = 0
}
return
}
// getSlave 會取得一個可用的 Slave 資料庫連線。
func (d *DB) getSlave() (db *sql.DB) {
index := d.roundRobin(d.slaves, d.lastSlaveIndex)
db = d.slaves[index].db
// Set the last index.
d.lastSlaveIndex = index
return
}
// getDB 會基於 SQL 查詢指令來取得一個適用的資料庫連線,這會被用在讀/寫區分的資料庫上。
func (d *DB) getDB(query ...string) (db *sql.DB) {
if len(query) == 0 || !d.hasSlave {
db = d.master.db
return
}
action := strings.Split(query[0], " ")[0]
switch action {
case "SELECT":
db = d.getSlave()
default:
db = d.master.db
}
return
}
// Begin 會基於目前的資料庫連線來開始一段新的交易過程。
func (d *DB) begin() (*sql.Tx, error) {
return d.master.db.Begin()
}
// Rollback 會回溯交易時所發生的事情。
func (d *DB) rollback() error {
if d.master.tx == nil {
return ErrUnbegunTransaction
}
err := d.master.tx.Rollback()
if err != nil {
return err
}
d.master.tx = nil
return nil
}
// Commit 會結束一個交易過程並保存其變更為永久資料。
func (d *DB) commit() error {
if d.master.tx == nil {
return ErrUnbegunTransaction
}
err := d.master.tx.Commit()
if err != nil {
return err
}
d.master.tx = nil
return nil
}
// Ping 會以 ping 來檢查所有的資料庫連線(包括 Slave 連線)。
func (d *DB) ping() error {
var err error
err = d.master.db.Ping()
if err != nil {
return err
}
for _, v := range d.slaves {
err = v.db.Ping()
if err != nil {
return err
}
}
return nil
}
// Disconnect 會斷開所有連線(包括 Slave 連線)。
func (d *DB) disconnect() error {
var err error
err = d.master.db.Close()
if err != nil {
return err
}
for _, v := range d.slaves {
err = v.db.Close()
if err != nil {
return err
}
}
return nil
}
// Connect 會重新連接所有資料庫連線(包括 Slave 連線)。
func (d *DB) connect() error {
db, err := sql.Open("mysql", d.master.dataSourceName)
if err != nil {
return err
}
d.master.db = db
for k, v := range d.slaves {
db, err := sql.Open("mysql", v.dataSourceName)
if err != nil {
return err
}
d.slaves[k].db = db
}
return nil
}
// Prepare 會準備 SQL 查詢指令。
func (d *DB) prepare(query string) (*sql.Stmt, error) {
if d.master.tx != nil {
return d.master.tx.Prepare(query)
}
return d.getDB(query).Prepare(query)
}
// Exec 會執行 SQL 查詢指令並且回傳一個原生結果表示影響的行列數和插入的編號。
func (d *DB) exec(query string, args ...interface{}) (sql.Result, error) {
if d.master.tx != nil {
return d.master.tx.Exec(query, args...)
}
return d.getDB(query).Exec(query, args...)
}
// Query 會執行 SQL 查詢指令並且回傳一個原生的行列結果供後續掃描列出。
func (d *DB) query(query string, args ...interface{}) (*sql.Rows, error) {
if d.master.tx != nil {
return d.master.tx.Query(query, args...)
}
return d.getDB(query).Query(query, args...)
}