-
Notifications
You must be signed in to change notification settings - Fork 25
/
main.go
119 lines (101 loc) · 3.05 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
Copyright 2020 VMware, Inc.
SPDX-License-Identifier: Apache-2.0
*/
package main
import (
"context"
"errors"
"fmt"
"time"
ce "github.com/cloudevents/sdk-go/v2"
"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"
)
const (
ceVSphereAPIKey = "vsphereapiversion"
ceVSphereEventClass = "eventclass"
)
type envConfig struct {
ExpectedEventType string `envconfig:"EVENT_TYPE" required:"true"`
ExpectedEventCount int `envconfig:"EVENT_COUNT" required:"true"`
}
func main() {
var env envConfig
if err := envconfig.Process("", &env); err != nil {
panic("unable to read environment config: " + err.Error())
}
ctx := signals.NewContext()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
logger := logging.FromContext(ctx)
numExpectedEvents := env.ExpectedEventCount
client, err := ce.NewClientHTTP()
if err != nil {
logger.Fatalw("could not create cloudevents client", zap.Error(err))
}
eg, egCtx := errgroup.WithContext(ctx)
events := make(chan ce.Event, numExpectedEvents)
// cloudevents http receiver
eg.Go(func() error {
logger.Info("starting cloudevents listener")
// receive events, putting them into the channel only if they meet the type we are expecting
return client.StartReceiver(egCtx, func(event ce.Event) {
logger.Infow("received cloud event on listener", zap.String("event", event.String()))
if event.Type() == env.ExpectedEventType {
select {
case events <- event:
default:
logger.Warn("could not send on events channel")
// artificial throttle to not spam logs in case of hot loop
time.Sleep(time.Second)
}
return
}
logger.Warnw(
"ignoring event: unexpected event type received",
zap.String("received", event.Type()),
zap.String("expected", env.ExpectedEventType),
)
})
})
// thread-safe counter
eg.Go(func() error {
count := 0
// Process events one by one, keeping count. Exit when count is reached, and cancel the start receiver
for {
select {
case <-egCtx.Done():
return egCtx.Err()
case event := <-events:
logger.Infow("received event on events channel", zap.String("message", event.String()))
// assert required CE extension attributes are always present
class := event.Extensions()[ceVSphereEventClass]
if class == nil || class == "" {
return fmt.Errorf("cloudevent extension %q not set", ceVSphereEventClass)
}
apiKey := event.Extensions()[ceVSphereAPIKey]
if apiKey == nil || apiKey == "" {
return fmt.Errorf("cloudevent extension %q not set", ceVSphereAPIKey)
}
count++
if count == numExpectedEvents {
logger.Infow(
"cancelling context: received expected number of events",
zap.Int("expected", numExpectedEvents),
zap.Int("received", count),
)
cancel()
return nil
}
}
}
})
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
logger.Fatalw("Could not successfully receive expected events", zap.Error(err))
}
logger.Info("shutdown complete")
}