-
Notifications
You must be signed in to change notification settings - Fork 11
/
trigger.go
122 lines (98 loc) · 2.49 KB
/
trigger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package trigger
import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/project-flogo/core/data/coerce"
"github.com/project-flogo/core/data/metadata"
"github.com/project-flogo/core/support/log"
"github.com/project-flogo/core/trigger"
)
var triggerMd = trigger.NewMetadata(&Settings{}, &HandlerSettings{}, &Output{})
func init() {
_ = trigger.Register(&Trigger{}, &Factory{})
}
type Trigger struct {
client pulsar.Client
handlers []*Handler
}
type Handler struct {
handler trigger.Handler
consumer pulsar.Consumer
}
type Factory struct {
}
var logger log.Logger
func (*Factory) New(config *trigger.Config) (trigger.Trigger, error) {
s := &Settings{}
err := metadata.MapToStruct(config.Settings, s, true)
if err != nil {
return nil, err
}
pulsarConn, err := coerce.ToConnection(s.Connection)
if err != nil {
return nil, err
}
return &Trigger{client: pulsarConn.GetConnection().(pulsar.Client)}, nil
}
func (f *Factory) Metadata() *trigger.Metadata {
return triggerMd
}
// Metadata implements trigger.Trigger.Metadata
func (t *Trigger) Metadata() *trigger.Metadata {
return triggerMd
}
func (t *Trigger) Initialize(ctx trigger.InitContext) error {
logger = ctx.Logger()
// Init handlers
for _, handler := range ctx.GetHandlers() {
s := &HandlerSettings{}
err := metadata.MapToStruct(handler.Settings(), s, true)
if err != nil {
return err
}
consumer, err := t.client.Subscribe(pulsar.ConsumerOptions{
Topic: s.Topic,
SubscriptionName: s.Subscription,
})
if err != nil {
return err
}
t.handlers = append(t.handlers, &Handler{handler: handler, consumer: consumer})
}
return nil
}
// Start implements util.Managed.Start
func (t *Trigger) Start() error {
for _, handler := range t.handlers {
go consume(handler)
}
return nil
}
// Stop implements util.Managed.Stop
func (t *Trigger) Stop() error {
for _, handler := range t.handlers {
handler.consumer.Close()
}
return nil
}
func consume(handler *Handler) {
for {
msg, err := handler.consumer.Receive(context.Background())
if err != nil {
logger.Debugf("Error while recieveing message")
return
}
out := &Output{}
out.Message = string(msg.Payload())
logger.Debugf("Message recieved [%v]", out.Message)
// Do something with the message
_, err = handler.handler.Handle(context.Background(), out)
if err == nil {
// Message processed successfully
handler.consumer.Ack(msg)
} else {
// Failed to process messages
handler.consumer.Nack(msg)
}
}
}