-
Notifications
You must be signed in to change notification settings - Fork 2
/
database.go
211 lines (167 loc) · 6.1 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
/*
Copyright 2018 The go-eam Authors
This file is part of the go-eam library.
database
封装数据库相关操作
wanglei.ok@foxmail.com
1.0
版本时间:2018年4月13日18:32:12
*/
package main
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
"strconv"
)
//数据库操作对象
var db *sql.DB
const (
//连接池属性
POOL_MAXOPENCONNS = 10 //最大连接数
POOL_MAXIDLECONNS = 2 //空闲连接数
)
//打开数据库
func OpenDatabase(dsn string) error {
db1, err := sql.Open("mysql", dsn)
if err != nil {
return err
}
//连接池
db1.SetMaxOpenConns(POOL_MAXOPENCONNS)
db1.SetMaxIdleConns(POOL_MAXIDLECONNS)
//连接
if err = db1.Ping(); err != nil {
return err
}
db = db1
return nil
}
//关闭数据库
func CloseDatabase() {
db.Close()
}
//自定义事务结构
type MyTx struct {
Tx *sql.Tx
}
//开始事务并返回事务对象
func TxBegin() (*MyTx, error) {
tx, err := db.Begin()
return &MyTx{tx}, err
}
//提交事务
func (x *MyTx)Commit() error {
return x.Tx.Commit()
}
//回滚事务
func (x *MyTx)Rollback() error {
return x.Tx.Rollback()
}
//使用事务插入一条交易记录
func (x *MyTx) InsertTx(tx *TxJson) error {
stmt, err := x.Tx.Prepare("INSERT ec_ethdata SET block_number=?,time_stamp=?,tx_hash=?, nonce=?, block_hash=?, tx_index=?, from_addr=?, to_addr=?, contract_addr=?, amount=?")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(tx.BlockNumber, tx.TimeStamp, tx.Hash, tx.Nonce, tx.BlockHash, tx.TransactionIndex, tx.From, tx.To, tx.ContractAddress, tx.Value )
if err != nil {
return err
}
return err
}
//使用事务更新最后一个块计数
//返回收到影响的记录数
func (x *MyTx) UpdateLastBlock(addr string, block int) (affect int64) {
affect = 0
stmt, err := x.Tx.Prepare("update ec_address_log set last_block = ? where address = ?")
if err != nil {
return
}
defer stmt.Close()
res, err := stmt.Exec(strconv.Itoa(block), addr)
if err != nil {
return
}
affect, err = res.RowsAffected()
return
}
type EthAddressInfo struct {
Address string
LastBlock int
}
//获取以太坊地址及最后块计数器
func GetEthAddress() ([]EthAddressInfo, error){
eais := make([]EthAddressInfo,0)
//查询数据
rows, err := db.Query("select address, last_block from ec_address_log where type = 'eth' and state <> '1'")
if err != nil {
return eais, err
}
for rows.Next() {
a := ""
b := 0
err = rows.Scan(&a, &b)
if err != nil {
return eais, err
}
eais = append(eais, EthAddressInfo{a,b})
}
return eais, nil
}
func UpdateAddressLogByCrowdOrder() error {
//-- 用订单数据更新监控地址列表
//-- 1.订单列表中不存在的地址,不再监控
//UPDATE ec_address_log SET state = '1' WHERE type='eth' AND address NOT IN ( SELECT DISTINCT pay_url AS address FROM `ec_crowd_order` WHERE order_type='eth' AND is_delete=0 AND pay_status IN (0,2) );
//
//-- 2. 订单列表中的地址,开始监控
//UPDATE ec_address_log SET state = '0' WHERE type='eth' AND address IN ( SELECT DISTINCT pay_url AS address FROM `ec_crowd_order` WHERE order_type='eth' AND is_delete=0 AND pay_status IN (0,2) );
//
//-- 3. 插入新的监控地址
//INSERT INTO ec_address_log (address,type,state,last_block) SELECT DISTINCT pay_url AS address, 'eth' AS type, '0' AS state, 0 AS last_block FROM `ec_crowd_order` WHERE order_type='eth' AND is_delete=0 AND pay_status IN (0,2) AND NOT EXISTS (SELECT address FROM ec_address_log WHERE address = ec_crowd_order.pay_url);
var sqls = [...]string {
"UPDATE ec_address_log SET state = '1' WHERE type='eth' AND address NOT IN ( SELECT DISTINCT pay_url AS address FROM `ec_crowd_order` WHERE order_type='eth' AND is_delete=0 AND pay_status IN (0,2) );",
"UPDATE ec_address_log SET state = '0' WHERE type='eth' AND address IN ( SELECT DISTINCT pay_url AS address FROM `ec_crowd_order` WHERE order_type='eth' AND is_delete=0 AND pay_status IN (0,2) );",
"INSERT INTO ec_address_log (address,type,state,last_block) SELECT DISTINCT pay_url AS address, 'eth' AS type, '0' AS state, 0 AS last_block FROM `ec_crowd_order` WHERE order_type='eth' AND is_delete=0 AND pay_status IN (0,2) AND NOT EXISTS (SELECT address FROM ec_address_log WHERE address = ec_crowd_order.pay_url);",
}
tx, err := db.Begin()
if err != nil {
return err
}
for _, sql := range sqls {
_, err := tx.Exec(sql)
if err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}
func UpdateAddressLogBySetting() error {
//-- 用设置更新监控地址列表
//-- 1.设置中不存在地址,不再监控
//UPDATE ec_address_log SET state = '1' WHERE type='eth' AND LOWER(address) NOT IN ( SELECT DISTINCT LOWER(set_value) FROM ec_setting WHERE set_key = 'eth_acceptAddress' );
//
//-- 2. 设置中的地址,开始监控
//UPDATE ec_address_log SET state = '0' WHERE type='eth' AND LOWER(address) IN ( SELECT DISTINCT LOWER(set_value) FROM ec_setting WHERE set_key = 'eth_acceptAddress' );
//
//-- 3. 插入新的监控地址
//INSERT INTO ec_address_log (address,type,state,last_block) SELECT DISTINCT set_value AS address, 'eth' AS type, '0' AS state, 0 AS last_block FROM ec_setting WHERE set_key = 'eth_acceptAddress' AND NOT EXISTS (SELECT LOWER(address) FROM ec_address_log WHERE LOWER(address) = LOWER(ec_setting.set_value)) LIMIT 1;
var sqls = [...]string {
"UPDATE ec_address_log SET state = '1' WHERE type='eth' AND LOWER(address) NOT IN ( SELECT DISTINCT LOWER(set_value) FROM ec_setting WHERE set_key = 'eth_acceptAddress' );",
"UPDATE ec_address_log SET state = '0' WHERE type='eth' AND LOWER(address) IN ( SELECT DISTINCT LOWER(set_value) FROM ec_setting WHERE set_key = 'eth_acceptAddress' );",
"INSERT INTO ec_address_log (address,type,state,last_block) SELECT DISTINCT set_value AS address, 'eth' AS type, '0' AS state, 0 AS last_block FROM ec_setting WHERE set_key = 'eth_acceptAddress' AND NOT EXISTS (SELECT LOWER(address) FROM ec_address_log WHERE LOWER(address) = LOWER(ec_setting.set_value)) LIMIT 1;",
}
tx, err := db.Begin()
if err != nil {
return err
}
for _, sql := range sqls {
_, err := tx.Exec(sql)
if err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}