-
Notifications
You must be signed in to change notification settings - Fork 1
/
types.go
119 lines (103 loc) · 2.58 KB
/
types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package totem
import (
"context"
"sync"
"golang.org/x/exp/slices"
grpc "google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/descriptorpb"
)
type ServerStream interface {
Stream
grpc.ServerStream
}
type ClientStream interface {
Stream
grpc.ClientStream
}
type Stream interface {
Send(*RPC) error
Recv() (*RPC, error)
Context() context.Context
}
type ServiceHandler struct {
controllerContext context.Context
Descriptor *descriptorpb.ServiceDescriptorProto
MethodInvokers map[string]MethodInvoker
MethodQOS map[string]*QOS
IsLocal bool
}
func (s *ServiceHandler) Done() <-chan struct{} {
return s.controllerContext.Done()
}
func NewDefaultServiceHandler(
ctx context.Context,
descriptor *descriptorpb.ServiceDescriptorProto,
invoker MethodInvoker,
) *ServiceHandler {
sh := &ServiceHandler{
controllerContext: ctx,
Descriptor: descriptor,
MethodInvokers: make(map[string]MethodInvoker),
MethodQOS: make(map[string]*QOS),
IsLocal: invoker.IsLocal(),
}
for _, method := range descriptor.Method {
if proto.HasExtension(method.GetOptions(), E_Qos) {
qos := proto.GetExtension(method.GetOptions(), E_Qos).(*QOS)
if qos.ReplicationStrategy == ReplicationStrategy_Broadcast && method.GetOutputType() != ".google.protobuf.Empty" {
// todo: temporary restriction
panic("methods with Broadcast ReplicationStrategy must have a response type of google.protobuf.Empty")
}
sh.MethodQOS[method.GetName()] = qos
}
sh.MethodInvokers[method.GetName()] = invoker
}
return sh
}
type ServiceHandlerList struct {
mu sync.RWMutex
data []*ServiceHandler
}
func (s *ServiceHandlerList) Append(sh *ServiceHandler) {
s.mu.Lock()
defer s.mu.Unlock()
for _, existing := range s.data {
if !proto.Equal(existing.Descriptor, sh.Descriptor) {
panic("entries in ServiceHandlerLists must have the same service descriptors")
}
}
s.data = append(s.data, sh)
go func() {
<-sh.Done()
s.mu.Lock()
defer s.mu.Unlock()
if i := slices.Index(s.data, sh); i != -1 {
s.data = slices.Delete(s.data, i, i+1)
}
}()
}
func (s *ServiceHandlerList) Range(fn func(sh *ServiceHandler) bool) bool {
s.mu.RLock()
data := slices.Clone(s.data)
s.mu.RUnlock()
for _, sh := range data {
if !fn(sh) {
return false
}
}
return true
}
func (s *ServiceHandlerList) Len() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.data)
}
func (s *ServiceHandlerList) First() *ServiceHandler {
s.mu.RLock()
defer s.mu.RUnlock()
if len(s.data) == 0 {
return nil
}
return s.data[0]
}