/
eventmesh.go
156 lines (134 loc) · 5.63 KB
/
eventmesh.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package eventmesh
import (
"context"
"github.com/kelseyhightower/envconfig"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/application"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/cloudevents/builder"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/cloudevents/eventtype"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/env"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/handler"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/handler/health"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/informers"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/legacy"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/metrics"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/oauth"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/options"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/receiver"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/sender/eventmesh"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/signals"
"github.com/kyma-project/kyma/components/event-publisher-proxy/pkg/subscribed"
"github.com/kyma-project/kyma/components/eventing-controller/logger"
"github.com/kyma-project/kyma/components/eventing-controller/pkg/backend/cleaner"
"go.uber.org/zap"
"golang.org/x/xerrors"
"k8s.io/client-go/dynamic"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // TODO: remove as this is only used in a development setup
"sigs.k8s.io/controller-runtime/pkg/client/config"
)
const (
backend = "beb"
commanderName = backend + "-commander"
)
// Commander implements the Commander interface.
type Commander struct {
cancel context.CancelFunc
envCfg *env.EventMeshConfig
logger *logger.Logger
metricsCollector *metrics.Collector
opts *options.Options
}
// NewCommander creates the Commander for publisher to EventMesh.
func NewCommander(opts *options.Options, metricsCollector *metrics.Collector, logger *logger.Logger) *Commander {
return &Commander{
metricsCollector: metricsCollector,
logger: logger,
envCfg: new(env.EventMeshConfig),
opts: opts,
}
}
// Init implements the Commander interface and initializes the publisher to BEB.
func (c *Commander) Init() error {
if err := envconfig.Process("", c.envCfg); err != nil {
return xerrors.Errorf("failed to read configuration for %s : %v", commanderName, err)
}
return nil
}
// Start implements the Commander interface and starts the publisher.
func (c *Commander) Start() error {
c.namedLogger().Infow("Starting Event Publisher", "configuration", c.envCfg.String(), "startup arguments", c.opts)
// configure message receiver
messageReceiver := receiver.NewHTTPMessageReceiver(c.envCfg.Port)
// assure uniqueness
var ctx context.Context
ctx, c.cancel = context.WithCancel(signals.NewContext())
// configure auth client
client := oauth.NewClient(ctx, c.envCfg)
defer client.CloseIdleConnections()
// configure message sender
messageSender := eventmesh.NewSender(c.envCfg.EventMeshPublishURL, client, c.logger)
// cluster config
k8sConfig := config.GetConfigOrDie()
// setup application lister
var applicationLister *application.Lister
if c.envCfg.ApplicationCRDEnabled {
dynamicClient := dynamic.NewForConfigOrDie(k8sConfig)
applicationLister = application.NewLister(ctx, dynamicClient)
c.namedLogger().Info("Application CR lister is enabled!")
} else {
c.namedLogger().Info("Application CR lister is disabled!")
}
// configure legacyTransformer
legacyTransformer := legacy.NewTransformer(
c.envCfg.EventMeshNamespace,
c.envCfg.EventTypePrefix,
applicationLister,
)
// Configure Subscription Lister
subDynamicSharedInfFactory := subscribed.GenerateSubscriptionInfFactory(k8sConfig)
subLister := subDynamicSharedInfFactory.ForResource(subscribed.GVR).Lister()
subscribedProcessor := &subscribed.Processor{
SubscriptionLister: &subLister,
Prefix: c.envCfg.EventTypePrefix,
Namespace: c.envCfg.EventMeshNamespace,
Logger: c.logger,
}
// Sync informer cache or die
c.namedLogger().Info("Waiting for informers caches to sync")
informers.WaitForCacheSyncOrDie(ctx, subDynamicSharedInfFactory, c.logger)
c.namedLogger().Info("Informers were successfully synced")
// configure event type cleaner
eventTypeCleanerV1 := eventtype.NewCleaner(c.envCfg.EventTypePrefix, applicationLister, c.logger)
// configure event type cleaner for subscription CRD v1alpha2
eventTypeCleaner := cleaner.NewEventMeshCleaner(c.logger)
// configure cloud event builder for subscription CRD v1alpha2
ceBuilder := builder.NewEventMeshBuilder(c.envCfg.EventTypePrefix, c.envCfg.EventMeshNamespace, eventTypeCleaner,
applicationLister, c.logger)
// start handler which blocks until it receives a shutdown signal
if err := handler.New(
messageReceiver,
messageSender,
health.NewChecker(),
c.envCfg.RequestTimeout,
legacyTransformer,
c.opts,
subscribedProcessor,
c.logger,
c.metricsCollector,
eventTypeCleanerV1,
ceBuilder,
c.envCfg.EventTypePrefix,
env.EventMeshBackend,
).Start(ctx); err != nil {
return xerrors.Errorf("failed to start handler for %s : %v", commanderName, err)
}
c.namedLogger().Info("Event Publisher was shut down")
return nil
}
// Stop implements the Commander interface and stops the publisher.
func (c *Commander) Stop() error {
c.cancel()
return nil
}
func (c *Commander) namedLogger() *zap.SugaredLogger {
return c.logger.WithContext().Named(commanderName).With("backend", backend)
}