/
sync.go
282 lines (262 loc) · 8.67 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
package engine
import (
"context"
"fmt"
"strings"
"github.com/seerx/gpa/engine/sql/metas/rflt"
"github.com/seerx/gpa/engine/sql/metas/schema"
"github.com/seerx/gpa/engine/sql/types"
"github.com/seerx/logo/log"
)
func (e *Engine) Sync(beans ...interface{}) error {
dbTables, err := e.dialect.GetTables(e.provider.Executor(), context.Background())
if err != nil {
e.logger.Error(err, "get tables from database")
return err
}
for _, bean := range beans {
table, err := rflt.Parse(bean, e.propsParser)
if err != nil {
e.logger.Error(err, "parse table bean")
return err
}
tbName := table.Name
tbNameWithSchema := e.dialect.TableNameWithSchema(tbName)
var dbTable *schema.Table
// 从 tables 中查找 table 是否存在
for _, tb := range dbTables {
if strings.EqualFold(e.dialect.TableNameWithSchema(tb.Name), tbNameWithSchema) {
dbTable = tb
break
}
}
if dbTable == nil {
// 创建表
if err := e.CreateTable(table); err != nil {
e.logger.Errorf(err, "create table [%s]", tbName)
return err
}
break
}
// 从数据库加载表结构
if err := e.loadTableInfo(dbTable); err != nil {
e.logger.Errorf(err, "load table [%s] info from database", tbName)
return err
}
// 检查列变化
if err := e.checkColumns(tbNameWithSchema, dbTable, table); err != nil {
return err
}
// 检查索引变化
if err := e.checkIndexes(tbNameWithSchema, dbTable, table); err != nil {
return err
}
// 检查所有从结构体中移除的字段,但在数据库中仍然存在
for _, colName := range dbTable.ColumnNames {
if table.GetColumn(colName) == nil {
log.Warnf("Table %s has column %s but struct has not related field", tbNameWithSchema, colName)
}
}
}
return nil
}
func (e *Engine) checkIndexes(tbNameWithSchema string, oriTable, table *schema.Table) error {
var existsIndexInDB = make(map[string]bool)
var indexesTobeCreate = make(map[string]*schema.Index)
// 遍历结构体定义的索引
for name, index := range table.Indexes {
var oriIndex *schema.Index
// 查找表中的索引
for name2, index2 := range oriTable.Indexes {
if index.Equal(index2) {
// 找到索引
oriIndex = index2
existsIndexInDB[name2] = true
break
}
}
// 索引存在
// if oriIndex != nil {
// if oriIndex.Type != index.Type {
// sql := s.dialect.SQLDropIndex(tbNameWithSchema, oriIndex)
// if _, err := s.Exec(sql); err != nil {
// return err
// }
// oriIndex = nil
// }
// }
// 索引不存在
if oriIndex == nil {
indexesTobeCreate[name] = index
}
}
// 遍历数据库中的索引
for name2, index2 := range oriTable.Indexes {
if _, ok := existsIndexInDB[name2]; !ok {
// 该索引在结构体中不存在,删除索引
sql := e.dialect.SQLDropIndex(tbNameWithSchema, index2)
if _, err := e.provider.Executor().Exec(sql); err != nil {
e.logger.Error(err, sql)
return err
}
}
}
for _, index := range indexesTobeCreate {
// s.SetTable(table)
sql := e.dialect.SQLCreateIndex(tbNameWithSchema, index)
if _, err := e.provider.Executor().Exec(sql); err != nil {
e.logger.Error(err, sql)
return err
}
// if index.Type == types.UniqueType {
// s.SetTable(table)
// sql := s.dialect.SQLCreateIndex(tbNameWithSchema, index)
// if _, err := s.Exec(sql); err != nil {
// return err
// }
// } else if index.Type == types.IndexType {
// s.SetTable(table)
// sql := s.dialect.SQLCreateIndex(tbNameWithSchema, index)
// if _, err := s.Exec(sql); err != nil {
// return err
// }
// }
}
return nil
}
func (e *Engine) addColumn(tbNameWithSchema string, col *schema.Column) error {
// col := s.refTable.GetColumn(colName)
sql := e.dialect.SQLAddColumn(tbNameWithSchema, col)
_, err := e.provider.Executor().Exec(sql)
if err != nil {
e.logger.Errorf(err, "exec sql: %s", sql)
}
return err
}
func (e *Engine) checkColumns(tbNameWithSchema string, oriTable, table *schema.Table) error {
// check columns
for _, col := range table.Columns {
var dbCol *schema.Column
for _, col2 := range oriTable.Columns {
if strings.EqualFold(col.FieldName(), col2.FieldName()) {
dbCol = col2
break
}
}
// 表中不存在此列,添加列
if dbCol == nil {
// s.SetTable(table)
if err := e.addColumn(tbNameWithSchema, col); err != nil {
return err
}
continue
}
// 数据类型检查
var err error
expectedType := e.dialect.SQLType(col)
curType := e.dialect.SQLType(dbCol)
if expectedType != curType {
// 数据类型不一致
if expectedType == types.Text &&
strings.HasPrefix(curType, types.Varchar) {
// 从 varchar 改为 text
// currently only support mysql & postgres
if e.dialect.URI().SupportColumnVarchar2Text() {
e.logger.Infof("Table %s column %s change type from %s to %s\n",
tbNameWithSchema, col.FieldName(), curType, expectedType)
_, err = e.provider.Executor().Exec(e.dialect.SQLModifyColumn(tbNameWithSchema, col))
} else {
e.logger.Warnf("Table %s column %s db type is %s, struct type is %s\n",
tbNameWithSchema, col.FieldName(), curType, expectedType)
}
} else if strings.HasPrefix(curType, types.Varchar) && strings.HasPrefix(expectedType, types.Varchar) {
// varchar 数据长度不一致,在数据库支持的情况下,只允许增加长度
if e.dialect.URI().SupportColumnVarcharIncLength() &&
dbCol.Length < col.Length {
e.logger.Infof("table %s column %s change type from varchar(%d) to varchar(%d)\n",
tbNameWithSchema, col.FieldName(), dbCol.Length, col.Length)
_, err = e.provider.Executor().Exec(e.dialect.SQLModifyColumn(tbNameWithSchema, col))
} else {
e.logger.Warnf("table %s column %s db type is varchar(%d), struct type is varchar(%d)\n",
tbNameWithSchema, col.FieldName(), dbCol.Length, col.Length)
}
} else {
// 不支持的类型转换
if !(strings.HasPrefix(curType, expectedType) && curType[len(expectedType)] == '(') {
e.logger.Error(fmt.Errorf("table %s column %s db type is %s, struct type is %s",
tbNameWithSchema, col.FieldName(), curType, expectedType))
}
}
} else if expectedType == types.Varchar {
// 都是 varchar 类型
if e.dialect.URI().SupportColumnVarcharIncLength() &&
dbCol.Length < col.Length {
e.logger.Infof("table %s column %s change type from varchar(%d) to varchar(%d)\n",
tbNameWithSchema, col.FieldName(), dbCol.Length, col.Length)
_, err = e.provider.Executor().Exec(e.dialect.SQLModifyColumn(tbNameWithSchema, col))
} else {
e.logger.Warnf("table %s column %s db type is varchar(%d), struct type is varchar(%d)\n",
tbNameWithSchema, col.FieldName(), dbCol.Length, col.Length)
}
}
// 默认值发生变化
if col.Default != dbCol.Default {
switch {
case col.IsAutoIncrement: // For autoincrement column, don't check default
case (col.Type.Name == types.Bool || col.Type.Name == types.Boolean) &&
((strings.EqualFold(col.Default, "true") && dbCol.Default == "1") ||
(strings.EqualFold(col.Default, "false") && dbCol.Default == "0")):
default:
e.logger.Warnf("Table %s Column %s db default is %s, struct default is %s",
tbNameWithSchema, col.FieldName(), dbCol.Default, col.Default)
}
}
if col.Nullable != dbCol.Nullable {
e.logger.Warnf("Table %s Column %s db nullable is %v, struct nullable is %v",
tbNameWithSchema, col.FieldName(), dbCol.Nullable, col.Nullable)
}
if err != nil {
e.logger.Errorf(err, "check table %s columns", tbNameWithSchema)
return err
}
}
return nil
}
func (e *Engine) loadTableInfo(table *schema.Table) error {
colSeq, cols, err := e.dialect.GetColumns(e.provider.Executor(), context.Background(), table.Name)
if err != nil {
e.logger.Errorf(err, "get table %s columns", table.Name)
return err
}
for _, name := range colSeq {
table.AddColumn(cols[name])
}
indexes, err := e.dialect.GetIndexes(e.provider.Executor(), context.Background(), table.Name)
if err != nil {
e.logger.Errorf(err, "get table %s indexes", table.Name)
return err
}
table.Indexes = indexes
var seq int
for _, index := range indexes {
for _, name := range index.Cols {
parts := strings.Split(strings.TrimSpace(name), " ")
if len(parts) > 1 {
if parts[1] == "DESC" {
seq = 1
} else if parts[1] == "ASC" {
seq = 0
}
}
var colName = strings.Trim(parts[0], `"`)
if col := table.GetColumn(colName); col != nil {
col.Indexes[index.Name] = index.Type
} else {
err := fmt.Errorf("unknown col %s seq %d, in index %v of table %v, columns %v", name, seq, index.Name, table.Name, table.ColumnNames)
e.logger.Errorf(err, "table %s column %s not found", table.Name, colName)
return err
}
}
}
return nil
}