-
Notifications
You must be signed in to change notification settings - Fork 20
/
dispatchService.go
110 lines (96 loc) · 3.26 KB
/
dispatchService.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package service
import (
"Envoy-Pilot/cmd/server/constant"
"Envoy-Pilot/cmd/server/mapper"
"Envoy-Pilot/cmd/server/model"
"Envoy-Pilot/cmd/server/storage"
"Envoy-Pilot/cmd/server/util"
"context"
"log"
"github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/google/uuid"
)
var singletonDispatchService *DispatchService
type DispatchService struct {
xdsConfigDao storage.XdsConfigDao
subscriberDao *storage.SubscriberDao
clusterMapper mapper.ClusterMapper
listenerMapper mapper.ListenerMapper
v2HelperService *V2HelperService
}
func GetDispatchService() *DispatchService {
if singletonDispatchService == nil {
singletonDispatchService = &DispatchService{
xdsConfigDao: storage.GetXdsConfigDao(),
subscriberDao: storage.GetSubscriberDao(),
clusterMapper: mapper.ClusterMapper{},
v2HelperService: &V2HelperService{},
}
}
return singletonDispatchService
}
// XDSStreamServer common data type for xDS stream
type XDSStreamServer interface {
Send(*v2.DiscoveryResponse) error
Recv() (*v2.DiscoveryRequest, error)
Context() context.Context
}
func (c *DispatchService) buildDiscoveryResponseFor(subscriber *model.EnvoySubscriber) (*v2.DiscoveryResponse, error) {
mapper := mapper.GetMapperFor(subscriber.SubscribedTo)
configJson, version := c.xdsConfigDao.GetConfigJson(subscriber)
clusterObj, err := mapper.GetResources(configJson)
if err != nil {
log.Printf("Unable to build discovery response for %s\n", subscriber.BuildInstanceKey2())
log.Println(err)
return nil, err
}
responseUUID := uuid.New().String()
response := &v2.DiscoveryResponse{
VersionInfo: version,
Resources: clusterObj,
TypeUrl: c.v2HelperService.GetTypeUrlFor(subscriber.SubscribedTo),
Nonce: responseUUID,
}
return response, nil
}
func (c *DispatchService) dispatchData(ctx context.Context, stream XDSStreamServer,
dispatchChannel chan model.ConfigMeta) {
for updateInfo := range dispatchChannel {
select {
case <-ctx.Done():
return
default:
}
subscriber := ctx.Value(constant.ENVOY_SUBSCRIBER_KEY).(*model.EnvoySubscriber)
// var currentSubscriber *model.EnvoySubscriber
if subscriber.IsADS() {
subscriber = subscriber.GetAdsSubscriber(updateInfo.Topic)
util.CheckNil(subscriber)
}
response, err := c.buildDiscoveryResponseFor(subscriber)
if err != nil {
log.Panicf("Unable to dispatch for %s\n", subscriber.BuildInstanceKey2())
continue
}
// TODO add log level
// log.Printf("%+v\n", response)
// log.Printf("Sending config to %s \n %+v \n", subscriber.BuildInstanceKey2(), response)
c.subscriberDao.SaveNonce(subscriber, response.Nonce)
err = stream.Send(response)
if err != nil {
log.Println("error sending to client")
log.Println(err)
c.subscriberDao.RemoveNonce(subscriber, response.Nonce)
} else {
log.Printf("Successfully Sent config to %s \n", subscriber.BuildInstanceKey2())
}
}
}
// HandleACK check if the response is an ACK
// if not ignore
// if yes update the last updated version
func (c *DispatchService) HandleACK(subscriber *model.EnvoySubscriber, req *v2.DiscoveryRequest) {
log.Printf("Received ACK %s from %s", req.ResponseNonce, subscriber.BuildInstanceKey2())
c.subscriberDao.RemoveNonce(subscriber, req.ResponseNonce)
subscriber.LastUpdatedVersion = req.VersionInfo
}