/
sampling.go
230 lines (197 loc) · 6.53 KB
/
sampling.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
/*
* Copyright 2018 Xiaomi, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package database
import (
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/XiaoMi/soar/common"
"github.com/ziutek/mymysql/mysql"
)
/*--------------------
* The following choice of minrows is based on the paper
* "Random sampling for histogram construction: how much is enough?"
* by Surajit Chaudhuri, Rajeev Motwani and Vivek Narasayya, in
* Proceedings of ACM SIGMOD International Conference on Management
* of Data, 1998, Pages 436-447. Their Corollary 1 to Theorem 5
* says that for table size n, histogram size k, maximum relative
* error in bin size f, and error probability gamma, the minimum
* random sample size is
* r = 4 * k * ln(2*n/gamma) / f^2
* Taking f = 0.5, gamma = 0.01, n = 10^6 rows, we obtain
* r = 305.82 * k
* Note that because of the log function, the dependence on n is
* quite weak; even at n = 10^12, a 300*k sample gives <= 0.66
* bin size error with probability 0.99. So there's no real need to
* scale for n, which is a good thing because we don't necessarily
* know it at this point.
*--------------------
*/
// SamplingData 将数据从Remote拉取到 db 中
func (db *Connector) SamplingData(remote Connector, tables ...string) error {
// 计算需要泵取的数据量
wantRowsCount := 300 * common.Config.SamplingStatisticTarget
// 设置数据采样单条SQL中value的数量
// 该数值越大,在内存中缓存的data就越多,但相对的,插入时速度就越快
maxValCount := 200
// 获取数据库连接对象
conn := remote.NewConnection()
localConn := db.NewConnection()
// 连接数据库
err := conn.Connect()
defer conn.Close()
if err != nil {
return err
}
err = localConn.Connect()
defer localConn.Close()
if err != nil {
return err
}
for _, table := range tables {
// 表类型检查
if remote.IsView(table) {
return nil
}
tableStatus, err := remote.ShowTableStatus(table)
if err != nil {
return err
}
if len(tableStatus.Rows) == 0 {
common.Log.Info("SamplingData, Table %s with no data, stop sampling", table)
return nil
}
tableRows := tableStatus.Rows[0].Rows
if tableRows == 0 {
common.Log.Info("SamplingData, Table %s with no data, stop sampling", table)
return nil
}
factor := float64(wantRowsCount) / float64(tableRows)
common.Log.Debug("SamplingData, tableRows: %d, wantRowsCount: %d, factor: %f", tableRows, wantRowsCount, factor)
err = startSampling(conn, localConn, db.Database, table, factor, wantRowsCount, maxValCount)
if err != nil {
common.Log.Error("(db *Connector) SamplingData Error : %v", err)
}
}
return nil
}
// 开始从环境中泵取数据
// 因为涉及到的数据量问题,所以泵取与插入时同时进行的
// TODO 加 ref link
func startSampling(conn, localConn mysql.Conn, database, table string, factor float64, wants, maxValCount int) error {
// 从线上数据库获取所需dump的表中所有列的数据类型,备用
// 由于测试库中的库表为刚建立的,所以在information_schema中很可能没有这个表的信息
var dataTypes []string
q := fmt.Sprintf("select DATA_TYPE from information_schema.COLUMNS where TABLE_SCHEMA='%s' and TABLE_NAME = '%s'",
database, table)
common.Log.Debug("Sampling data execute: %s", q)
rs, _, err := localConn.Query(q)
if err != nil {
common.Log.Debug("Sampling data got data type Err: %v", err)
} else {
for _, r := range rs {
dataTypes = append(dataTypes, r.Str(0))
}
}
// 生成where条件
where := fmt.Sprintf("where RAND()<=%f", factor)
if factor >= 1 {
where = ""
}
sql := fmt.Sprintf("select * from `%s` %s limit %d;", table, where, wants)
res, err := conn.Start(sql)
if err != nil {
return err
}
// GetRow method allocates a new chunk of memory for every received row.
row := res.MakeRow()
rowCount := 0
valCount := 0
// 获取所有的列名
columns := make([]string, len(res.Fields()))
for i, filed := range res.Fields() {
columns[i] = filed.Name
}
colDef := strings.Join(columns, ",")
// 开始填充数据
var valList []string
for {
err := res.ScanRow(row)
if err == io.EOF {
// 扫描结束
if len(valList) > 0 {
// 如果缓存中还存在未插入的数据,则把缓存中的数据刷新到DB中
doSampling(localConn, database, table, colDef, strings.Join(valList, ","))
}
break
}
if err != nil {
return err
}
values := make([]string, len(columns))
for i := range row {
// TODO 不支持坐标类型的导出
switch data := row[i].(type) {
case nil:
// str = ""
case []byte:
// 先尝试转成数字,如果报错则转换成string
v, err := row.Int64Err(i)
values[i] = strconv.FormatInt(v, 10)
if err != nil {
values[i] = string(data)
}
case time.Time:
values[i] = mysql.TimeString(data)
case time.Duration:
values[i] = mysql.DurationString(data)
default:
values[i] = fmt.Sprint(data)
}
// 非text/varchar类的数据类型,如果dump出的数据为空,则说明该值为null值
// 应转换其value为null,如果用空('')进行替代,会导致出现语法错误。
if len(dataTypes) == len(res.Fields()) && values[i] == "" &&
(!strings.Contains(dataTypes[i], "char") ||
!strings.Contains(dataTypes[i], "text")) {
values[i] = "null"
} else {
values[i] = "'" + values[i] + "'"
}
}
valuesStr := fmt.Sprintf(`(%s)`, strings.Join(values, `,`))
valList = append(valList, valuesStr)
rowCount++
valCount++
if rowCount%maxValCount == 0 {
doSampling(localConn, database, table, colDef, strings.Join(valList, ","))
valCount = 0
valList = make([]string, 0)
}
}
common.Log.Debug("%d rows sampling out", rowCount)
return nil
}
// 将泵取的数据转换成Insert语句并在数据库中执行
func doSampling(conn mysql.Conn, dbName, table, colDef, values string) {
sql := fmt.Sprintf("Insert into `%s`.`%s`(%s) values%s;", dbName, table,
colDef, values)
_, _, err := conn.Query(sql)
if err != nil {
common.Log.Error("doSampling Error from %s.%s: %v", dbName, table, err)
}
}