This repository has been archived by the owner on Nov 16, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 58
/
manager.go
154 lines (132 loc) · 4.85 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
///////////////////////////////////////////////////////////////////////
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
///////////////////////////////////////////////////////////////////////
package subscriptions
import (
"context"
"fmt"
"sync"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/vmware/dispatch/pkg/client"
"github.com/vmware/dispatch/pkg/event-manager/helpers"
"github.com/vmware/dispatch/pkg/event-manager/subscriptions/entities"
"github.com/vmware/dispatch/pkg/events"
"github.com/vmware/dispatch/pkg/function-manager/gen/models"
"github.com/vmware/dispatch/pkg/trace"
)
// Manager defines the subscription manager interface
type Manager interface {
Run([]*entities.Subscription) error
Create(context.Context, *entities.Subscription) error
Update(context.Context, *entities.Subscription) error
Delete(context.Context, *entities.Subscription) error
}
type defaultManager struct {
queue events.Transport
fnClient client.FunctionsClient
sync.RWMutex
activeSubs map[string]events.Subscription
}
// NewManager creates a new subscription manager
func NewManager(mq events.Transport, fnClient client.FunctionsClient) (Manager, error) {
defer trace.Trace("")()
ec := defaultManager{
queue: mq,
fnClient: fnClient,
activeSubs: make(map[string]events.Subscription),
}
return &ec, nil
}
func (m *defaultManager) Run(subscriptions []*entities.Subscription) error {
defer trace.Trace("")()
log.Debugf("event consumer initializing")
for _, sub := range subscriptions {
log.Debugf("Processing sub %s", sub.Name)
m.Create(context.Background(), sub)
}
return nil
}
// Create creates an active subscription to Message Queue. Active subscription connects
// to Message Queue and executes a handler for every event received.
func (m *defaultManager) Create(ctx context.Context, sub *entities.Subscription) error {
defer trace.Tracef("event %s, function %s", sub.EventType, sub.Function)()
m.Lock()
defer m.Unlock()
if eventSub, ok := m.activeSubs[sub.ID]; ok {
log.Debugf("types.Subscription for %s/%s already existed, unsubscribing", sub.EventType, sub.Function)
eventSub.Unsubscribe()
delete(m.activeSubs, sub.ID)
}
topic := fmt.Sprintf("%s.%s", sub.SourceType, sub.EventType)
eventSub, err := m.queue.Subscribe(ctx, topic, m.handler(sub))
if err != nil {
err = errors.Wrapf(err, "unable to create a subscription for event %s and function %s", sub.EventType, sub.Function)
log.Error(err)
return err
}
m.activeSubs[sub.ID] = eventSub
return nil
}
// Update updates a subscription
func (m *defaultManager) Update(ctx context.Context, sub *entities.Subscription) error {
return m.Create(ctx, sub)
}
// Delete deletes a subscription from pool of active subscriptions.
func (m *defaultManager) Delete(ctx context.Context, sub *entities.Subscription) error {
defer trace.Tracef("event %s", sub.EventType)()
m.Lock()
defer m.Unlock()
if eventSub, ok := m.activeSubs[sub.ID]; ok {
eventSub.Unsubscribe()
delete(m.activeSubs, sub.ID)
}
log.Debugf("Deleting subscription topic=%s id=%s revision=%d", sub.EventType, sub.Name, sub.Revision)
return nil
}
// Shutdown ends event controller loop
func (m *defaultManager) Shutdown() {
defer trace.Trace("")()
log.Infof("Event controller shutdown")
m.Lock()
defer m.Unlock()
for _, sub := range m.activeSubs {
sub.Unsubscribe()
}
}
// handler creates a function to handle the incoming event. it takes name of the function to be invoked as an argument.
func (m *defaultManager) handler(sub *entities.Subscription) func(context.Context, *events.CloudEvent) {
defer trace.Tracef("function name:%s", sub.Function)()
return func(ctx context.Context, event *events.CloudEvent) {
trace.Tracef("HandlerClosure(). function name:%s, event:%s", sub.Name, event.EventID)()
sp, _ := opentracing.StartSpanFromContext(
ctx,
"EventManager.EventHandler",
opentracing.Tag{Key: "subscriptionName", Value: sub.Name},
opentracing.Tag{Key: "eventID", Value: event.EventID},
)
defer sp.Finish()
// TODO: Pass tracing context once Function Manager is tracing-aware
m.runFunction(sub.Function, event, sub.Secrets)
}
}
// executes a function by connecting to function manager
func (m *defaultManager) runFunction(fnName string, event *events.CloudEvent, secrets []string) {
defer trace.Tracef("function:%s", fnName)()
run := client.FunctionRun{}
run.Blocking = false
run.FunctionName = fnName
run.Input = event.Data
eventCopy := *event
eventCopy.Data = ""
run.Event = (*models.CloudEvent)(helpers.CloudEventToSwagger(&eventCopy))
result, err := m.fnClient.RunFunction(context.Background(), &run)
if err != nil {
log.Warnf("Unable to run function %s, error from function manager: %+v", fnName, err)
return
}
log.Debugf("Function %s returned %+v", result.FunctionName, result.Output)
return
}