forked from araddon/qlbridge
/
planner_mutate.go
102 lines (87 loc) · 2.23 KB
/
planner_mutate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package plan
import (
"fmt"
u "github.com/araddon/gou"
"github.com/araddon/qlbridge/schema"
)
var (
_ = u.EMPTY
)
func (m *PlannerDefault) WalkInto(p *Into) error {
u.Debugf("VisitInto %+v", p.Stmt)
return ErrNotImplemented
}
func upsertSource(ctx *Context, table string) (schema.ConnUpsert, error) {
conn, err := ctx.Schema.OpenConn(table)
if err != nil {
u.Warnf("%p no schema for %q err=%v", ctx.Schema, table, err)
return nil, err
}
mutatorSource, hasMutator := conn.(schema.ConnMutation)
if hasMutator {
mutator, err := mutatorSource.CreateMutator(ctx)
if err != nil {
u.Warnf("%p could not create mutator for %q err=%v", ctx.Schema, table, err)
//return nil, err
} else {
return mutator, nil
}
}
upsertDs, isUpsert := conn.(schema.ConnUpsert)
if !isUpsert {
return nil, fmt.Errorf("%T does not implement required schema.Upsert for upserts", conn)
}
return upsertDs, nil
}
func (m *PlannerDefault) WalkInsert(p *Insert) error {
u.Debugf("VisitInsert %s", p.Stmt)
src, err := upsertSource(m.Ctx, p.Stmt.Table)
if err != nil {
return err
}
p.Source = src
return nil
}
func (m *PlannerDefault) WalkUpdate(p *Update) error {
u.Debugf("VisitUpdate %+v", p.Stmt)
src, err := upsertSource(m.Ctx, p.Stmt.Table)
if err != nil {
return err
}
p.Source = src
return nil
}
func (m *PlannerDefault) WalkUpsert(p *Upsert) error {
u.Debugf("VisitUpsert %+v", p.Stmt)
src, err := upsertSource(m.Ctx, p.Stmt.Table)
if err != nil {
return err
}
p.Source = src
return nil
}
func (m *PlannerDefault) WalkDelete(p *Delete) error {
u.Debugf("VisitDelete %+v", p.Stmt)
conn, err := m.Ctx.Schema.OpenConn(p.Stmt.Table)
if err != nil {
u.Warnf("%p no schema for %q err=%v", m.Ctx.Schema, p.Stmt.Table, err)
return err
}
mutatorSource, hasMutator := conn.(schema.ConnMutation)
if hasMutator {
mutator, err := mutatorSource.CreateMutator(m.Ctx)
if err != nil {
u.Warnf("%p could not create mutator for %q err=%v", m.Ctx.Schema, p.Stmt.Table, err)
//return nil, err
} else {
p.Source = mutator
return nil
}
}
deleteDs, isDelete := conn.(schema.ConnDeletion)
if !isDelete {
return fmt.Errorf("%T does not implement required schema.Deletion for deletions", conn)
}
p.Source = deleteDs
return nil
}