-
Notifications
You must be signed in to change notification settings - Fork 9
/
dao_mysql.go
148 lines (128 loc) · 3.52 KB
/
dao_mysql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package tgo
import (
"fmt"
"sync"
"time"
_ "github.com/go-sql-driver/mysql"
)
type DaoMysql struct {
TableName string
}
func NewDaoMysql() *DaoMysql {
return &DaoMysql{}
}
type Condition struct {
Field string
Oper string
Value interface{}
}
type Sort struct {
Field string
Asc bool
}
var (
MysqlReadPool *MysqlConnectionPool
mysqlReadPoolMux sync.Mutex
MysqlWritePool *MysqlConnectionPool
mysqlWritePoolMux sync.Mutex
)
func init() {
config := NewConfigDb()
configPool := config.Mysql.GetPool()
poolTicker := time.NewTicker(time.Second * 60)
//todo 优化动态控制池子大小
go monitorPool(configPool, poolTicker, true, MysqlReadPool)
go monitorPool(configPool, poolTicker, false, MysqlWritePool)
}
func monitorPool(configPool *ConfigDbPool, poolTicker *time.Ticker, isRead bool, mysqlPool *MysqlConnectionPool) {
var (
caps int
poolCaps int
oldWaitCount int64
waitCount int64
)
for {
if mysqlPool == nil || mysqlPool.IsClosed() {
mysqlPool = initMysqlPool(isRead)
}
waitCount = mysqlPool.WaitCount() - oldWaitCount
oldWaitCount = mysqlPool.WaitCount()
poolCaps = int(mysqlPool.Capacity())
if waitCount >= configPool.PoolWaitCount && poolCaps != configPool.PoolMaxCap { //定时循环内超出多少等待数目
caps = poolCaps + configPool.PoolExCap
} else if waitCount == 0 && poolCaps != configPool.PoolMinCap { //闲时减少池子容量
caps = poolCaps - configPool.PoolExCap
} else {
<-poolTicker.C
continue
}
if caps < configPool.PoolMinCap {
caps = configPool.PoolMinCap
}
if caps > configPool.PoolMaxCap {
caps = configPool.PoolMaxCap
}
mysqlPool.SetCapacity(caps)
<-poolTicker.C
}
}
func initMysqlPool(isRead bool) *MysqlConnectionPool {
config := NewConfigDb()
configPool := config.Mysql.GetPool()
if isRead {
if MysqlReadPool == nil || MysqlReadPool.IsClosed() {
mysqlReadPoolMux.Lock()
defer mysqlReadPoolMux.Unlock()
MysqlReadPool = NewMysqlConnectionPool(CreateMysqlConnectionRead, configPool.PoolMinCap,
configPool.PoolMaxCap, configPool.PoolIdleTimeout*time.Millisecond)
}
return MysqlReadPool
} else {
if MysqlWritePool == nil || MysqlWritePool.IsClosed() {
mysqlWritePoolMux.Lock()
defer mysqlWritePoolMux.Unlock()
MysqlWritePool = NewMysqlConnectionPool(CreateMysqlConnectionWrite, configPool.PoolMinCap,
configPool.PoolMaxCap, configPool.PoolIdleTimeout*time.Millisecond)
}
return MysqlWritePool
}
}
func initMysqlPoolConnection(isRead bool) (MysqlConnection, error) {
return initMysqlPool(isRead).Get(isRead)
}
func (d *DaoMysql) GetReadOrm() (MysqlConnection, error) {
return d.getOrm(true)
}
func (d *DaoMysql) GetWriteOrm() (MysqlConnection, error) {
return d.getOrm(false)
}
func (d *DaoMysql) getOrm(isRead bool) (MysqlConnection, error) {
return initMysqlPoolConnection(isRead)
}
func (d *DaoMysql) Insert(model interface{}) error {
orm, err := d.GetWriteOrm()
if err != nil {
return err
}
defer orm.Put()
errInsert := orm.Table(d.TableName).Create(model).Error
if errInsert != nil {
//记录
UtilLogError(fmt.Sprintf("insert data error:%s", errInsert.Error()))
}
return errInsert
}
func (d *DaoMysql) Select(condition string, data interface{}, field ...[]string) error {
orm, err := d.GetReadOrm()
if err != nil {
return err
}
defer orm.Put()
var errFind error
if len(field) == 0 {
errFind = orm.Table(d.TableName).Where(condition).Find(data).Error
} else {
errFind = orm.Table(d.TableName).Where(condition).Select(field[0]).Find(data).Error
}
return errFind
}