forked from araddon/qlbridge
-
Notifications
You must be signed in to change notification settings - Fork 0
/
exec.go
111 lines (100 loc) · 3.57 KB
/
exec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Exec contains execution tasks to run each of the separate tasks
// (Source, Project, Where, Having, etc) of a SQL data of tasks. It does
// by defining interface, and base tasks, and a single-machine channel
// oriented DAG runner (Executor). The Executor interface allows
// other downstreams to implement a multi-node message passing oriented
// version while re-using most Tasks.
package exec
import (
"fmt"
"github.com/araddon/qlbridge/plan"
"github.com/araddon/qlbridge/schema"
)
var (
// Standard errors
ErrShuttingDown = fmt.Errorf("Received Shutdown Signal")
ErrNotSupported = fmt.Errorf("QLBridge: Not supported")
ErrNotImplemented = fmt.Errorf("QLBridge: Not implemented")
ErrUnknownCommand = fmt.Errorf("QLBridge: Unknown Command")
ErrInternalError = fmt.Errorf("QLBridge: Internal Error")
ErrNoSchemaSelected = fmt.Errorf("No Schema Selected")
)
type (
//Task channel types
SigChan chan bool
ErrChan chan error
MessageChan chan schema.Message
// Handle/Forward a message for this Task
MessageHandler func(ctx *plan.Context, msg schema.Message) bool
)
type (
// Job Factory
JobMaker func(ctx *plan.Context) (Executor, error)
// Job Runner is the main RunTime interface for running a SQL Job of tasks
JobRunner interface {
Setup() error
Run() error
Close() error
}
// exec Tasks are inherently DAG's of task's implementing Run(), Close() etc
// to allow them to be executeable
Task interface {
Run() error
Close() error
CloseFinal() error
Children() []Task // children sub-tasks
Add(Task) error // Add a child to this dag
}
// TaskRunner is an interface for a single task in Dag of Tasks necessary to execute a Job
// - it may have children tasks
// - it may be parallel, distributed, etc
TaskRunner interface {
Task
Setup(depth int) error
MessageIn() MessageChan
MessageOut() MessageChan
MessageInSet(MessageChan)
MessageOutSet(MessageChan)
ErrChan() ErrChan
SigChan() SigChan
Quit()
}
TaskPrinter interface {
PrintDag(depth int)
}
// Executor defines standard Walk() pattern to create a executeable task dag from a plan dag
//
// An implementation of WalkPlan() will be be able to execute/run a Statement
// - inproc: ie, in process. qlbridge/exec package implements a non-distributed query-planner
// - distributed: ie, run this job across multiple servers
// dataux/planner implements a distributed query-planner
// that distributes/runs tasks across multiple nodes
//
Executor interface {
NewTask(p plan.Task) Task
WalkPlan(p plan.Task) (Task, error)
WalkSelect(p *plan.Select) (Task, error)
WalkInsert(p *plan.Insert) (Task, error)
WalkUpsert(p *plan.Upsert) (Task, error)
WalkUpdate(p *plan.Update) (Task, error)
WalkDelete(p *plan.Delete) (Task, error)
WalkCommand(p *plan.Command) (Task, error)
WalkPreparedStatement(p *plan.PreparedStatement) (Task, error)
// Child Tasks
WalkSource(p *plan.Source) (Task, error)
WalkJoin(p *plan.JoinMerge) (Task, error)
WalkJoinKey(p *plan.JoinKey) (Task, error)
WalkWhere(p *plan.Where) (Task, error)
WalkHaving(p *plan.Having) (Task, error)
WalkGroupBy(p *plan.GroupBy) (Task, error)
WalkOrder(p *plan.Order) (Task, error)
WalkProjection(p *plan.Projection) (Task, error)
}
// Sources can often do their own execution-plan for sub-select statements
// ie mysql can do its own (select, projection) mongo, es can as well
// - provide interface to allow passing down select planning to source
ExecutorSource interface {
// given our plan, turn that into a Task.
WalkExecSource(p *plan.Source) (Task, error)
}
)