/
registry.go
115 lines (96 loc) · 2.7 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package databroker
import (
"context"
"fmt"
"io"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/registry"
"github.com/pomerium/pomerium/internal/registry/inmemory"
"github.com/pomerium/pomerium/internal/telemetry/trace"
registrypb "github.com/pomerium/pomerium/pkg/grpc/registry"
"github.com/pomerium/pomerium/pkg/storage"
)
type registryWatchServer struct {
registrypb.Registry_WatchServer
ctx context.Context
}
func (stream registryWatchServer) Context() context.Context {
return stream.ctx
}
// Report calls the registry Report method.
func (srv *Server) Report(ctx context.Context, req *registrypb.RegisterRequest) (*registrypb.RegisterResponse, error) {
ctx, span := trace.StartSpan(ctx, "databroker.grpc.Report")
defer span.End()
r, err := srv.getRegistry()
if err != nil {
return nil, err
}
return r.Report(ctx, req)
}
// List calls the registry List method.
func (srv *Server) List(ctx context.Context, req *registrypb.ListRequest) (*registrypb.ServiceList, error) {
ctx, span := trace.StartSpan(ctx, "databroker.grpc.List")
defer span.End()
r, err := srv.getRegistry()
if err != nil {
return nil, err
}
return r.List(ctx, req)
}
// Watch calls the registry Watch method.
func (srv *Server) Watch(req *registrypb.ListRequest, stream registrypb.Registry_WatchServer) error {
ctx := stream.Context()
ctx, span := trace.StartSpan(ctx, "databroker.grpc.Watch")
defer span.End()
r, err := srv.getRegistry()
if err != nil {
return err
}
return r.Watch(req, registryWatchServer{
Registry_WatchServer: stream,
ctx: ctx,
})
}
func (srv *Server) getRegistry() (registry.Interface, error) {
backend, err := srv.getBackend()
if err != nil {
return nil, err
}
// double-checked locking
srv.mu.RLock()
r := srv.registry
srv.mu.RUnlock()
if r == nil {
srv.mu.Lock()
r = srv.registry
var err error
if r == nil {
r, err = srv.newRegistryLocked(backend)
srv.registry = r
}
srv.mu.Unlock()
if err != nil {
return nil, err
}
}
return r, nil
}
func (srv *Server) newRegistryLocked(backend storage.Backend) (registry.Interface, error) {
ctx := context.Background()
if hasRegistryServer, ok := backend.(interface {
RegistryServer() registrypb.RegistryServer
}); ok {
log.Info(ctx).Msg("using registry via storage")
return struct {
io.Closer
registrypb.RegistryServer
}{backend, hasRegistryServer.RegistryServer()}, nil
}
switch srv.cfg.storageType {
case config.StorageInMemoryName:
log.Info(ctx).Msg("using in-memory registry")
return inmemory.New(ctx, srv.cfg.registryTTL), nil
}
return nil, fmt.Errorf("unsupported registry type: %s", srv.cfg.storageType)
}