-
Notifications
You must be signed in to change notification settings - Fork 0
/
adsImpl.go
95 lines (83 loc) · 3.19 KB
/
adsImpl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package server
import (
"Envoy-Pilot/cmd/server/constant"
"Envoy-Pilot/cmd/server/metrics"
"Envoy-Pilot/cmd/server/model"
"Envoy-Pilot/cmd/server/util"
"context"
"errors"
"log"
"google.golang.org/grpc/peer"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
)
//IncrementalAggregatedResources - Not implemented
func (s *Server) IncrementalAggregatedResources(_ discovery.AggregatedDiscoveryService_IncrementalAggregatedResourcesServer) error {
panic("Not implemented")
}
// StreamAggregatedResources - ADS server impl
func (s *Server) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
clientPeer, ok := peer.FromContext(stream.Context())
clientIP := "unknown"
if ok {
clientIP = clientPeer.Addr.String()
}
log.Printf("[%s] -------------- Starting a %s stream from %s ------------------\n", constant.SUBSCRIBE_ADS, constant.SUBSCRIBE_ADS, clientIP)
serverCtx, cancel := context.WithCancel(context.Background())
dispatchChannel := make(chan model.ConfigMeta)
i := 0
var subscriber *model.EnvoySubscriber
for {
req, err := stream.Recv()
if err != nil {
log.Printf("[%s] Disconnecting client %s\n", constant.SUBSCRIBE_ADS, subscriber.BuildInstanceKey2())
log.Println(err)
cancel()
registerService.DeleteSubscriber(subscriber)
metrics.DecActiveConnections(subscriber)
metrics.DecActiveSubscribers(subscriber)
return err
}
if i == 0 {
if !IsValidSubscriber(req) {
log.Printf("[%s] Error: Invalid cluster or node id %+v\n", constant.SUBSCRIBE_ADS, req)
cancel()
return errors.New("Invalid cluster or node id")
}
subscriber = &model.EnvoySubscriber{
Cluster: req.Node.Cluster,
Node: req.Node.Id,
SubscribedTo: constant.SUBSCRIBE_ADS,
LastUpdatedVersion: util.TrimVersion(req.VersionInfo),
AdsList: make(map[string]*model.EnvoySubscriber),
IpAddress: clientIP,
}
serverCtx = context.WithValue(serverCtx, envoySubscriberKey, subscriber)
registerService.RegisterEnvoy(serverCtx, stream, subscriber, dispatchChannel)
metrics.IncActiveConnections(subscriber)
i++
}
topic := v2Helper.GetTopicFor(req.TypeUrl)
var currentSubscriber *model.EnvoySubscriber
if subscriber.AdsList[topic] == nil {
currentSubscriber = &model.EnvoySubscriber{
Cluster: req.Node.Cluster,
Node: req.Node.Id,
SubscribedTo: topic,
LastUpdatedVersion: util.TrimVersion(req.VersionInfo),
IpAddress: clientIP,
}
subscriber.AdsList[topic] = currentSubscriber
registerService.RegisterEnvoyADS(serverCtx, stream, currentSubscriber, dispatchChannel)
metrics.IncActiveSubscribers(subscriber, currentSubscriber.SubscribedTo)
} else {
currentSubscriber = subscriber.AdsList[topic]
}
log.Printf("[%s] Received Request from %s\n %s\n", constant.SUBSCRIBE_ADS, currentSubscriber.BuildInstanceKey2(), util.ToJson(req))
if subscriberDao.IsACK(currentSubscriber, req.ResponseNonce) {
dispatchService.HandleACK(currentSubscriber, req)
continue
} else {
log.Printf("[%s] Response nonce not recognized %s", constant.SUBSCRIBE_ADS, req.ResponseNonce)
}
}
}