/
discoverer_client_service.go
124 lines (104 loc) · 3.76 KB
/
discoverer_client_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package discovery
import (
"encoding/json"
"fmt"
"net/http"
"os"
"github.com/julienschmidt/httprouter"
"github.com/polarstreams/polar/internal/conf"
. "github.com/polarstreams/polar/internal/types"
"github.com/polarstreams/polar/internal/utils"
"github.com/rs/zerolog/log"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
const noGenerationsStatusMessage = "Broker is unavailable to handle producer/consumer requests"
type topologyClientMessage struct {
BaseName string `json:"baseName,omitempty"` // When defined, base name to build the broker names, e.g. "polar-"
ServiceName string `json:"serviceName,omitempty"` // The name of the service to build the broker names: "<baseName><ordinal>.<service>"
Length int `json:"length"` // The ring size
BrokerNames []string `json:"names,omitempty"`
ProducerPort int `json:"producerPort"`
ProducerBinaryPort int `json:"producerBinaryPort"`
ConsumerPort int `json:"consumerPort"`
}
func (d *discoverer) startClientDiscoveryServer() error {
port := d.config.ClientDiscoveryPort()
address := utils.GetServiceAddress(port, d.LocalInfo(), d.config)
router := httprouter.New()
router.GET(conf.StatusUrl, func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
generations := d.generations.Load().(genMap)
if len(generations) == 0 {
w.Header().Set("Retry-After", "1")
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprint(w, noGenerationsStatusMessage)
return
}
fmt.Fprintf(w, "Client discovery server listening on %d\n", port)
})
router.GET(conf.ClientDiscoveryUrl, utils.ToHandle(d.getTopologyHandler))
h2s := &http2.Server{}
server := &http.Server{
Addr: address,
Handler: h2c.NewHandler(router, h2s),
}
if err := http2.ConfigureServer(server, h2s); err != nil {
return err
}
c := make(chan bool, 1)
go func() {
c <- true
if err := server.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
log.Info().Msgf("Client discovery server stopped")
} else {
log.Err(err).Msgf("Client discovery stopped serving")
}
}
}()
d.clientDiscoveryServer = server
<-c
log.Info().Msgf("Start listening to clients for discovery on %s", address)
return nil
}
func (d *discoverer) getTopologyHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) error {
w.Header().Set("Content-Type", "application/json")
t := d.Topology()
var result *topologyClientMessage
if names := os.Getenv(envBrokerNames); len(t.Brokers) <= 3 || names != "" {
result = d.newResponseTopology(t)
} else {
result = d.newResponseTopologyUsingOrdinals(t)
}
utils.PanicIfErr(json.NewEncoder(w).Encode(result), "Unexpected error when serializing generation")
return nil
}
func (d *discoverer) newResponseTopology(t *TopologyInfo) *topologyClientMessage {
brokerNames := make([]string, len(t.Brokers))
for i, b := range t.Brokers {
brokerNames[i] = b.HostName
}
result := topologyClientMessage{
Length: len(t.Brokers),
ProducerPort: d.config.ProducerPort(),
ProducerBinaryPort: d.config.ProducerBinaryPort(),
ConsumerPort: d.config.ConsumerPort(),
BrokerNames: brokerNames,
}
return &result
}
func (d *discoverer) newResponseTopologyUsingOrdinals(t *TopologyInfo) *topologyClientMessage {
serviceName := d.config.ServiceName()
if serviceName != "" && d.config.PodNamespace() != "" {
serviceName += "." + d.config.PodNamespace()
}
result := topologyClientMessage{
BaseName: d.config.BaseHostName(),
ServiceName: serviceName,
Length: len(t.Brokers),
ProducerPort: d.config.ProducerPort(),
ProducerBinaryPort: d.config.ProducerBinaryPort(),
ConsumerPort: d.config.ConsumerPort(),
}
return &result
}