-
Notifications
You must be signed in to change notification settings - Fork 62
/
agent.psrpc.go
181 lines (145 loc) · 7.4 KB
/
agent.psrpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// Code generated by protoc-gen-psrpc v0.5.1, DO NOT EDIT.
// source: rpc/agent.proto
package rpc
import (
"context"
"github.com/livekit/psrpc"
"github.com/livekit/psrpc/pkg/client"
"github.com/livekit/psrpc/pkg/info"
"github.com/livekit/psrpc/pkg/rand"
"github.com/livekit/psrpc/pkg/server"
"github.com/livekit/psrpc/version"
)
import google_protobuf "google.golang.org/protobuf/types/known/emptypb"
import livekit1 "github.com/livekit/protocol/livekit"
var _ = version.PsrpcVersion_0_5
// ==============================
// AgentInternal Client Interface
// ==============================
type AgentInternalClient interface {
CheckEnabled(ctx context.Context, req *CheckEnabledRequest, opts ...psrpc.RequestOption) (<-chan *psrpc.Response[*CheckEnabledResponse], error)
JobRequest(ctx context.Context, namespace string, jobType string, req *livekit1.Job, opts ...psrpc.RequestOption) (*google_protobuf.Empty, error)
SubscribeWorkerRegistered(ctx context.Context, handlerNamespace string) (psrpc.Subscription[*google_protobuf.Empty], error)
}
// ==================================
// AgentInternal ServerImpl Interface
// ==================================
type AgentInternalServerImpl interface {
CheckEnabled(context.Context, *CheckEnabledRequest) (*CheckEnabledResponse, error)
JobRequest(context.Context, *livekit1.Job) (*google_protobuf.Empty, error)
JobRequestAffinity(context.Context, *livekit1.Job) float32
}
// ==============================
// AgentInternal Server Interface
// ==============================
type AgentInternalServer interface {
RegisterJobRequestTopic(namespace string, jobType string) error
DeregisterJobRequestTopic(namespace string, jobType string)
PublishWorkerRegistered(ctx context.Context, handlerNamespace string, msg *google_protobuf.Empty) error
// Close and wait for pending RPCs to complete
Shutdown()
// Close immediately, without waiting for pending RPCs
Kill()
}
// ====================
// AgentInternal Client
// ====================
type agentInternalClient struct {
client *client.RPCClient
}
// NewAgentInternalClient creates a psrpc client that implements the AgentInternalClient interface.
func NewAgentInternalClient(bus psrpc.MessageBus, opts ...psrpc.ClientOption) (AgentInternalClient, error) {
sd := &info.ServiceDefinition{
Name: "AgentInternal",
ID: rand.NewClientID(),
}
sd.RegisterMethod("CheckEnabled", false, true, false, false)
sd.RegisterMethod("JobRequest", true, false, true, false)
sd.RegisterMethod("WorkerRegistered", false, true, false, false)
rpcClient, err := client.NewRPCClient(sd, bus, opts...)
if err != nil {
return nil, err
}
return &agentInternalClient{
client: rpcClient,
}, nil
}
func (c *agentInternalClient) CheckEnabled(ctx context.Context, req *CheckEnabledRequest, opts ...psrpc.RequestOption) (<-chan *psrpc.Response[*CheckEnabledResponse], error) {
return client.RequestMulti[*CheckEnabledResponse](ctx, c.client, "CheckEnabled", nil, req, opts...)
}
func (c *agentInternalClient) JobRequest(ctx context.Context, namespace string, jobType string, req *livekit1.Job, opts ...psrpc.RequestOption) (*google_protobuf.Empty, error) {
return client.RequestSingle[*google_protobuf.Empty](ctx, c.client, "JobRequest", []string{namespace, jobType}, req, opts...)
}
func (c *agentInternalClient) SubscribeWorkerRegistered(ctx context.Context, handlerNamespace string) (psrpc.Subscription[*google_protobuf.Empty], error) {
return client.Join[*google_protobuf.Empty](ctx, c.client, "WorkerRegistered", []string{handlerNamespace})
}
// ====================
// AgentInternal Server
// ====================
type agentInternalServer struct {
svc AgentInternalServerImpl
rpc *server.RPCServer
}
// NewAgentInternalServer builds a RPCServer that will route requests
// to the corresponding method in the provided svc implementation.
func NewAgentInternalServer(svc AgentInternalServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOption) (AgentInternalServer, error) {
sd := &info.ServiceDefinition{
Name: "AgentInternal",
ID: rand.NewServerID(),
}
s := server.NewRPCServer(sd, bus, opts...)
sd.RegisterMethod("CheckEnabled", false, true, false, false)
var err error
err = server.RegisterHandler(s, "CheckEnabled", nil, svc.CheckEnabled, nil)
if err != nil {
s.Close(false)
return nil, err
}
sd.RegisterMethod("JobRequest", true, false, true, false)
sd.RegisterMethod("WorkerRegistered", false, true, false, false)
return &agentInternalServer{
svc: svc,
rpc: s,
}, nil
}
func (s *agentInternalServer) RegisterJobRequestTopic(namespace string, jobType string) error {
return server.RegisterHandler(s.rpc, "JobRequest", []string{namespace, jobType}, s.svc.JobRequest, s.svc.JobRequestAffinity)
}
func (s *agentInternalServer) DeregisterJobRequestTopic(namespace string, jobType string) {
s.rpc.DeregisterHandler("JobRequest", []string{namespace, jobType})
}
func (s *agentInternalServer) PublishWorkerRegistered(ctx context.Context, handlerNamespace string, msg *google_protobuf.Empty) error {
return s.rpc.Publish(ctx, "WorkerRegistered", []string{handlerNamespace}, msg)
}
func (s *agentInternalServer) Shutdown() {
s.rpc.Close(false)
}
func (s *agentInternalServer) Kill() {
s.rpc.Close(true)
}
var psrpcFileDescriptor0 = []byte{
// 365 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0xc1, 0x8a, 0xdb, 0x30,
0x10, 0x86, 0x91, 0x53, 0x8a, 0xa3, 0x26, 0xd4, 0x51, 0x9a, 0xe2, 0xba, 0x94, 0x26, 0x86, 0x42,
0x4e, 0x72, 0x69, 0x5f, 0x20, 0x4d, 0xc9, 0xa1, 0x81, 0x5e, 0x02, 0xa5, 0xd0, 0x8b, 0xb1, 0x94,
0xa9, 0xed, 0x8d, 0x2d, 0x69, 0x25, 0x79, 0x21, 0x8f, 0x90, 0xeb, 0x9e, 0xf7, 0x29, 0xf2, 0x84,
0x8b, 0x63, 0xc7, 0x64, 0x21, 0xbb, 0x47, 0xfd, 0xf3, 0xcf, 0xcc, 0x37, 0x3f, 0xc2, 0x6f, 0xb5,
0xe2, 0x51, 0x92, 0x82, 0xb0, 0x54, 0x69, 0x69, 0x25, 0xe9, 0x69, 0xc5, 0x83, 0x8f, 0xa9, 0x94,
0x69, 0x01, 0xd1, 0x49, 0x62, 0xd5, 0xff, 0x08, 0x4a, 0x65, 0xf7, 0x8d, 0x23, 0x18, 0x4a, 0x65,
0x73, 0x29, 0x4c, 0xfb, 0x1c, 0x17, 0xf9, 0x1d, 0xec, 0x72, 0x1b, 0x5f, 0x4c, 0x09, 0x27, 0x78,
0xfc, 0x33, 0x03, 0xbe, 0x5b, 0x89, 0x84, 0x15, 0xb0, 0xdd, 0xc0, 0x6d, 0x05, 0xc6, 0x86, 0xf7,
0x08, 0xbf, 0x7b, 0xaa, 0x1b, 0x25, 0x85, 0x01, 0xf2, 0x05, 0x0f, 0xb4, 0x94, 0x65, 0x0c, 0x8d,
0xee, 0xa3, 0x29, 0x9a, 0xbb, 0x4b, 0xc7, 0x47, 0x9b, 0x37, 0xb5, 0xde, 0xda, 0x49, 0x84, 0x47,
0xaa, 0x62, 0x45, 0x6e, 0x32, 0xd0, 0x9d, 0xd7, 0xe9, 0xbc, 0x5e, 0x57, 0x3c, 0x37, 0x84, 0x18,
0x8b, 0xa4, 0x04, 0xa3, 0x12, 0x0e, 0xc6, 0xef, 0x4d, 0x7b, 0xf3, 0xfe, 0xc9, 0x79, 0xa1, 0x7e,
0x7b, 0x70, 0xf0, 0xf0, 0x47, 0xcd, 0xfe, 0x4b, 0x58, 0xd0, 0x22, 0x29, 0xc8, 0x6f, 0x3c, 0xb8,
0xa4, 0x24, 0x3e, 0xd5, 0x8a, 0xd3, 0x2b, 0x07, 0x05, 0x1f, 0xae, 0x54, 0x9a, 0x93, 0x42, 0xf7,
0x78, 0x40, 0xaf, 0x16, 0xce, 0x1c, 0x91, 0x3f, 0x18, 0xaf, 0x25, 0x6b, 0x5b, 0xc8, 0x80, 0xb6,
0x81, 0xd1, 0xb5, 0x64, 0xc1, 0x7b, 0xda, 0x44, 0x4d, 0xcf, 0x51, 0xd3, 0x55, 0x1d, 0x75, 0x38,
0x3b, 0x1e, 0xd0, 0x27, 0x0f, 0x05, 0x13, 0xd2, 0xef, 0x50, 0x89, 0x7b, 0x23, 0x59, 0x6c, 0xf7,
0x0a, 0x16, 0xe8, 0x2b, 0x22, 0x80, 0xbd, 0xbf, 0x52, 0xef, 0x40, 0x6f, 0x20, 0xcd, 0x8d, 0x05,
0x0d, 0x5b, 0xf2, 0xcc, 0xb8, 0x97, 0xd7, 0xb8, 0xc8, 0x43, 0xc1, 0x98, 0x8c, 0xb2, 0x44, 0x6c,
0x0b, 0xd0, 0x71, 0xb7, 0xb0, 0xa6, 0x5f, 0xce, 0xfe, 0x7d, 0x4e, 0x73, 0x9b, 0x55, 0x8c, 0x72,
0x59, 0x46, 0x2d, 0x7b, 0xf3, 0x33, 0xb8, 0x2c, 0x22, 0xad, 0x38, 0x7b, 0x7d, 0x7a, 0x7d, 0x7f,
0x0c, 0x00, 0x00, 0xff, 0xff, 0x80, 0x9c, 0xda, 0xb3, 0x4d, 0x02, 0x00, 0x00,
}