-
Notifications
You must be signed in to change notification settings - Fork 0
/
common.go
144 lines (123 loc) · 3.26 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package mqhub
import (
"context"
"reflect"
)
// MessageSinkFunc is func form of MessageSink
type MessageSinkFunc func(Message) Future
// ConsumeMessage implements MessageSink
func (f MessageSinkFunc) ConsumeMessage(msg Message) Future {
future := f(msg)
if future == nil {
future = &ImmediateFuture{}
}
return future
}
// MessageSinkAs converts a func with arbitrary parameter to MessageSink
func MessageSinkAs(handler interface{}) MessageSink {
v := reflect.ValueOf(handler)
if v.Kind() != reflect.Func {
panic("handler must be a func")
}
t := v.Type()
switch t.NumIn() {
case 0:
return MessageSinkFunc(func(_ Message) Future {
v.Call(nil)
return &ImmediateFuture{}
})
case 1:
paramType := t.In(0)
return MessageSinkFunc(func(msg Message) Future {
val := reflect.New(paramType)
err := msg.As(val.Interface())
if err == nil {
v.Call([]reflect.Value{val.Elem()})
}
return &ImmediateFuture{Error: err}
})
default:
panic("no more than 1 parameter is allowed")
}
}
// DataPoint implements Endpoint for a data point
type DataPoint struct {
Name string
Retain bool
Sink MessageSink
}
// NewDataPoint creates a new datapoint
func NewDataPoint(name string) *DataPoint {
return &DataPoint{Name: name}
}
// NewRetainDataPoint creates a new retain datapoint
func NewRetainDataPoint(name string) *DataPoint {
return &DataPoint{Name: name, Retain: true}
}
// NewDataPointFromEndpointRef creates a datapoint using an EndpointRef
func NewDataPointFromEndpointRef(ref EndpointRef) *DataPoint {
return &DataPoint{Sink: ref}
}
// ID implements Endpoint
func (p *DataPoint) ID() string {
return p.Name
}
// SinkMessage implements MessageSource
func (p *DataPoint) SinkMessage(sink MessageSink) {
p.Sink = sink
}
// Update updates the state
func (p *DataPoint) Update(state interface{}) Future {
sink := p.Sink
if sink == nil {
return &ImmediateFuture{Error: ErrNoMessageSink}
}
msg, ok := state.(Message)
if !ok {
msg = MakeMsg(state, p.Retain)
}
return p.Sink.ConsumeMessage(msg)
}
// Reactor implements Endpoint for a reactor to an update
type Reactor struct {
Name string
Handler MessageSink
}
// ReactorFunc creates a Reactor from MessageSinkFunc
func ReactorFunc(name string, handler MessageSinkFunc) *Reactor {
return &Reactor{Name: name, Handler: handler}
}
// ReactorAs accepts a func with arbitrary parameter
func ReactorAs(name string, handler interface{}) *Reactor {
return &Reactor{Name: name, Handler: MessageSinkAs(handler)}
}
// ID implements Endpoint
func (a *Reactor) ID() string {
return a.Name
}
// ConsumeMessage implements MessageSink
func (a *Reactor) ConsumeMessage(msg Message) Future {
return a.Handler.ConsumeMessage(msg)
}
// Do sets the message handler
func (a *Reactor) Do(sink MessageSink) *Reactor {
a.Handler = sink
return a
}
// DoFunc is same as Do but accepts a func
func (a *Reactor) DoFunc(handler MessageSinkFunc) *Reactor {
return a.Do(handler)
}
// ContextRunner defines a runner accepts a context
// the runner should be started using go runner.Run(ctx)
type ContextRunner interface {
Run(context.Context)
}
// ImmediateFuture implements a future with immediate result
type ImmediateFuture struct {
Error error
}
// Wait implements Future
func (f *ImmediateFuture) Wait() error {
return f.Error
}