-
Notifications
You must be signed in to change notification settings - Fork 173
/
msg.go
119 lines (105 loc) · 3.34 KB
/
msg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package mysqlbatch
import (
"database/sql"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
_ "github.com/pingcap/tidb/types/parser_driver"
log "github.com/sirupsen/logrus"
"github.com/moiot/gravity/pkg/core"
"github.com/moiot/gravity/pkg/metrics"
"github.com/moiot/gravity/pkg/mysql"
"github.com/moiot/gravity/pkg/schema_store"
"github.com/moiot/gravity/pkg/utils"
)
// NewMsg creates a job, it converts sql.NullString to interface{}
// based on the column type.
// If the column type is time, then we parse the time
func NewMsg(
rowPtrs []interface{},
columnTypes []*sql.ColumnType,
sourceTableDef *schema_store.Table,
callbackFunc core.MsgCallbackFunc,
positions []TablePosition,
scanTime time.Time) *core.Msg {
columnDataMap := mysql.SQLDataPtrs2Val(rowPtrs, columnTypes)
msg := core.Msg{
Host: "",
Database: sourceTableDef.Schema,
Table: sourceTableDef.Name,
Timestamp: time.Now(),
}
dmlMsg := &core.DMLMsg{}
dmlMsg.Operation = core.Insert
dmlMsg.Data = columnDataMap
// pk related
pkColumns := sourceTableDef.PrimaryKeyColumns
pkDataMap, err := mysql.GenPrimaryKeys(pkColumns, columnDataMap)
if err != nil {
log.Warnf("failed to generate primary keys, worker route will always be the same, err: %v", err)
}
var pkColumnsString []string
for i := range pkColumns {
pkColumnsString = append(pkColumnsString, pkColumns[i].Name)
}
dmlMsg.Pks = pkDataMap
msg.DmlMsg = dmlMsg
msg.Type = core.MsgDML
msg.InputStreamKey = utils.NewStringPtr(utils.TableIdentity(sourceTableDef.Schema, sourceTableDef.Name))
msg.Done = make(chan struct{})
msg.AfterCommitCallback = callbackFunc
msg.InputContext = positions
msg.Phase = core.Phase{
Start: scanTime,
}
metrics.InputCounter.WithLabelValues(core.PipelineName, msg.Database, msg.Table, string(msg.Type), string(dmlMsg.Operation)).Add(1)
return &msg
}
func NewCreateTableMsg(parser *parser.Parser, table *schema_store.Table, createTblStmt string) *core.Msg {
stmt, err := parser.ParseOneStmt(createTblStmt, "", "")
if err != nil {
log.Fatal(errors.Trace(err))
}
msg := core.Msg{
Host: "",
Database: table.Schema,
Table: table.Name,
Timestamp: time.Now(),
DdlMsg: &core.DDLMsg{
Statement: createTblStmt,
AST: stmt.(ast.DDLNode),
},
}
msg.Type = core.MsgDDL
msg.InputStreamKey = utils.NewStringPtr(utils.TableIdentity(table.Schema, table.Name))
msg.Done = make(chan struct{})
msg.Phase = core.Phase{
Start: time.Now(),
}
metrics.InputCounter.WithLabelValues(core.PipelineName, msg.Database, msg.Table, string(msg.Type), "create-table").Add(1)
return &msg
}
func NewBarrierMsg(tableDef *schema_store.Table) *core.Msg {
msg := core.Msg{
Phase: core.Phase{
Start: time.Now(),
},
Type: core.MsgCtl,
InputStreamKey: utils.NewStringPtr(utils.TableIdentity(tableDef.Schema, tableDef.Name)),
Done: make(chan struct{}),
}
return &msg
}
func NewCloseInputStreamMsg(tableDef *schema_store.Table) *core.Msg {
msg := core.Msg{
Phase: core.Phase{
Start: time.Now(),
},
Type: core.MsgCloseInputStream,
InputStreamKey: utils.NewStringPtr(utils.TableIdentity(tableDef.Schema, tableDef.Name)),
Done: make(chan struct{}),
}
metrics.InputCounter.WithLabelValues(core.PipelineName, msg.Database, msg.Table, string(msg.Type), "").Add(1)
return &msg
}