-
Notifications
You must be signed in to change notification settings - Fork 23
/
create_mv_command.go
310 lines (271 loc) · 8.8 KB
/
create_mv_command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
package command
import (
"fmt"
"github.com/squareup/pranadb/cluster"
"github.com/squareup/pranadb/command/parser"
"github.com/squareup/pranadb/common"
"github.com/squareup/pranadb/errors"
"github.com/squareup/pranadb/interruptor"
"github.com/squareup/pranadb/meta"
"github.com/squareup/pranadb/parplan"
"github.com/squareup/pranadb/push"
"strings"
"sync"
)
type CreateMVCommand struct {
lock sync.Mutex
e *Executor
pl *parplan.Planner
schema *common.Schema
createMVSQL string
tableSequences []uint64
mv *push.MaterializedView
ast *parser.CreateMaterializedView
toDeleteBatch *cluster.ToDeleteBatch
interruptor interruptor.Interruptor
leadersMap map[uint64]uint64
}
func (c *CreateMVCommand) CommandType() DDLCommandType {
return DDLCommandTypeCreateMV
}
func (c *CreateMVCommand) SchemaName() string {
return c.schema.Name
}
func (c *CreateMVCommand) SQL() string {
return c.createMVSQL
}
func (c *CreateMVCommand) TableSequences() []uint64 {
return c.tableSequences
}
func (c *CreateMVCommand) Cancel() {
c.interruptor.Interrupt()
}
func NewOriginatingCreateMVCommand(e *Executor, pl *parplan.Planner, schema *common.Schema, sql string,
tableSequences []uint64, ast *parser.CreateMaterializedView) (*CreateMVCommand, error) {
leadersMap, err := e.cluster.GetLeadersMap()
if err != nil {
return nil, err
}
return &CreateMVCommand{
e: e,
schema: schema,
pl: pl,
ast: ast,
createMVSQL: sql,
tableSequences: tableSequences,
leadersMap: leadersMap,
}, nil
}
func NewCreateMVCommand(e *Executor, schemaName string, createMVSQL string, tableSequences []uint64, extraData []byte) *CreateMVCommand {
schema := e.metaController.GetOrCreateSchema(schemaName)
pl := parplan.NewPlanner(schema)
return &CreateMVCommand{
e: e,
schema: schema,
pl: pl,
createMVSQL: createMVSQL,
tableSequences: tableSequences,
leadersMap: deserializeLeadersMap(extraData),
}
}
func (c *CreateMVCommand) OnPhase(phase int32) error {
switch phase {
case 0:
return c.onPhase0()
case 1:
return c.onPhase1()
case 2:
return c.onPhase2()
default:
panic("invalid phase")
}
}
func (c *CreateMVCommand) NumPhases() int {
return 3
}
func (c *CreateMVCommand) Before() error {
c.lock.Lock()
defer c.lock.Unlock()
// Mainly validation
mv, err := c.createMVFromAST(c.ast)
if err != nil {
return errors.WithStack(err)
}
c.mv = mv
if err := c.e.metaController.ExistsMvOrSource(c.schema, mv.Info.Name); err != nil {
return err
}
rows, err := c.e.pullEngine.ExecuteQuery("sys",
fmt.Sprintf("select id from tables where schema_name='%s' and name='%s' and kind='%s'", c.mv.Info.SchemaName, c.mv.Info.Name, meta.TableKindMaterializedView))
if err != nil {
return errors.WithStack(err)
}
if rows.RowCount() != 0 {
return errors.Errorf("materialized view with name %s.%s already exists in storage", c.mv.Info.SchemaName, c.mv.Info.Name)
}
if mv.Info.OriginInfo.InitialState != "" {
err := validateInitState(mv.Info.OriginInfo.InitialState, mv.Info.TableInfo, c.e.metaController)
if err != nil {
return err
}
}
return err
}
func (c *CreateMVCommand) onPhase0() error {
c.lock.Lock()
defer c.lock.Unlock()
// If phase0 on the originating node, mv will already be set
// this means we do not have to parse the ast twice
if c.mv == nil {
mv, err := c.createMV()
if err != nil {
return errors.WithStack(err)
}
c.mv = mv
}
// We store rows in the to_delete table - if MV creation fails (e.g. node crashes) then on restart the MV state will
// be cleaned up - we have to add a prefix for each shard as the shard id comes first in the key
var err error
c.toDeleteBatch, err = storeToDeleteBatch(c.mv.Info.ID, c.e.cluster)
if err != nil {
return err
}
// We can now load initial state from initial state table (if any)
var initTable *common.TableInfo
if c.mv.Info.OriginInfo.InitialState != "" {
var err error
initTable, err = getInitialiseFromTable(c.mv.Info.SchemaName, c.mv.Info.OriginInfo.InitialState, c.e.metaController)
if err != nil {
return err
}
shardIDs := c.e.cluster.GetLocalShardIDs()
if err := c.e.pushEngine.LoadInitialStateForTable(shardIDs, initTable.ID, c.mv.Info.ID, &c.interruptor); err != nil {
return err
}
}
// We must first connect any aggregations in the MV as remote consumers as they might have rows forwarded to them
// during the MV fill process. This must be done on all nodes before we start the fill
// We do not join the MV up to it's feeding sources or MVs at this point
return c.mv.Connect(false, true)
}
func (c *CreateMVCommand) onPhase1() error {
c.lock.Lock()
defer c.lock.Unlock()
var localLeaderShards []uint64
for shardID, nodeID := range c.leadersMap {
if nodeID == uint64(c.e.cluster.GetNodeID()) {
localLeaderShards = append(localLeaderShards, shardID)
}
}
if err := c.e.cluster.RegisterStartFill(c.leadersMap, &c.interruptor); err != nil {
return err
}
// Fill the MV from it's feeding sources and MVs
if err := c.mv.Fill(localLeaderShards, &c.interruptor); err != nil {
return err
}
c.e.cluster.RegisterEndFill()
return nil
}
func (c *CreateMVCommand) onPhase2() error {
c.lock.Lock()
defer c.lock.Unlock()
// The fill can cause rows to be forwarded - to make sure they're all processed we must wait for all schedulers
// on all nodes - this must be done after fill has completed on all nodes
if err := c.e.pushEngine.WaitForSchedulers(); err != nil {
return err
}
// The MV is now created and filled on all nodes but it isn't currently registered so it can't be used by clients
// We register it now
if err := c.e.pushEngine.RegisterMV(c.mv); err != nil {
return errors.WithStack(err)
}
if err := c.e.metaController.RegisterMaterializedView(c.mv.Info, c.mv.InternalTables); err != nil {
return err
}
// Maybe inject an error after fill and after row in tables table is persisted but before to_delete rows removed
if err := c.e.FailureInjector().GetFailpoint("create_mv_2").CheckFail(); err != nil {
return err
}
// Now delete rows from the to_delete table
return c.e.cluster.RemoveToDeleteBatch(c.toDeleteBatch)
}
func (c *CreateMVCommand) AfterPhase(phase int32) error {
c.lock.Lock()
defer c.lock.Unlock()
if phase == 1 {
// Maybe inject an error after fill but before row in tables table is persisted
if err := c.e.FailureInjector().GetFailpoint("create_mv_1").CheckFail(); err != nil {
return err
}
// We add the MV to the tables table once the fill phase is complete
// We only do this on the originating node
// We need to do this *before* the MV is available to clients otherwise a node failure and restart could cause
// the MV to disappear after it's been used
return c.e.metaController.PersistMaterializedView(c.mv.Info, c.mv.InternalTables)
}
return nil
}
func (c *CreateMVCommand) Cleanup() {
c.lock.Lock()
defer c.lock.Unlock()
if c.mv == nil {
return
}
if err := c.mv.Disconnect(); err != nil {
// Ignore
}
if err := c.e.pushEngine.RemoveMV(c.mv.Info.ID); err != nil {
// Ignore
}
c.e.cluster.RegisterEndFill()
}
func (c *CreateMVCommand) createMVFromAST(ast *parser.CreateMaterializedView) (*push.MaterializedView, error) {
mvName := strings.ToLower(ast.Name.String())
querySQL := ast.Query.String()
seqGenerator := common.NewPreallocSeqGen(c.tableSequences)
tableID := seqGenerator.GenerateSequence()
var initTable string
if ast.OriginInformation != nil {
for _, info := range ast.OriginInformation {
if info.InitialState != "" {
initTable = info.InitialState
break
}
}
}
return push.CreateMaterializedView(c.e.pushEngine, c.pl, c.schema, mvName, querySQL, initTable, tableID, seqGenerator)
}
func (c *CreateMVCommand) createMV() (*push.MaterializedView, error) {
ast, err := parser.Parse(c.createMVSQL)
if err != nil {
return nil, errors.WithStack(err)
}
if ast.Create == nil || ast.Create.MaterializedView == nil {
return nil, errors.Errorf("not a create materialized view %s", c.createMVSQL)
}
return c.createMVFromAST(ast.Create.MaterializedView)
}
func (c *CreateMVCommand) GetExtraData() []byte {
return serializeLeadersMap(c.leadersMap)
}
func serializeLeadersMap(m map[uint64]uint64) []byte {
var buff []byte
buff = common.AppendUint32ToBufferLE(buff, uint32(len(m)))
for shardID, nodeID := range m {
buff = common.AppendUint64ToBufferLE(buff, shardID)
buff = common.AppendUint64ToBufferLE(buff, nodeID)
}
return buff
}
func deserializeLeadersMap(buff []byte) map[uint64]uint64 {
l, offset := common.ReadUint32FromBufferLE(buff, 0)
m := make(map[uint64]uint64, l)
for i := 0; i < int(l); i++ {
var shardID, nodeID uint64
shardID, offset = common.ReadUint64FromBufferLE(buff, offset)
nodeID, offset = common.ReadUint64FromBufferLE(buff, offset)
m[shardID] = nodeID
}
return m
}